rDrama/files/classes/leaderboard.py
2023-02-24 06:31:17 -06:00

252 lines
8.1 KiB
Python

from dataclasses import dataclass
from typing import Any, Callable, Final, Optional
from sqlalchemy import Column, func
from sqlalchemy.orm import scoped_session, Query
from files.helpers.const import LEADERBOARD_LIMIT
from files.classes.badges import Badge
from files.classes.marsey import Marsey
from files.classes.user import User
from files.classes.userblock import UserBlock
from files.helpers.get import get_accounts_dict
@dataclass(frozen=True, slots=True)
class LeaderboardMeta:
header_name:str
table_header_name:str
html_id:str
table_column_name:str
user_relative_url:Optional[str]
limit:int=LEADERBOARD_LIMIT
class Leaderboard:
def __init__(self, v:Optional[User], meta:LeaderboardMeta) -> None:
self.v:Optional[User] = v
self.meta:LeaderboardMeta = meta
@property
def all_users(self) -> list[User]:
raise NotImplementedError()
@property
def v_position(self) -> Optional[int]:
raise NotImplementedError()
@property
def v_value(self) -> Optional[int]:
raise NotImplementedError()
@property
def v_appears_in_ranking(self) -> bool:
return self.v_position is not None and self.v_position <= len(self.all_users)
@property
def user_func(self) -> Callable[[Any], User]:
return lambda u:u
@property
def value_func(self) -> Callable[[User], int]:
raise NotImplementedError()
class SimpleLeaderboard(Leaderboard):
def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, users_query:Query, column:Column):
super().__init__(v, meta)
self.db:scoped_session = db
self.users_query:Query = users_query
self.column:Column = column
self._calculate()
def _calculate(self) -> None:
self._all_users = self.users_query.order_by(self.column.desc()).limit(self.meta.limit).all()
if self.v not in self._all_users:
sq = self.db.query(User.id, self.column, func.rank().over(order_by=self.column.desc()).label("rank")).subquery()
sq_data = self.db.query(sq.c.id, sq.c.column, sq.c.rank).filter(sq.c.id == self.v.id).limit(1).one()
self._v_value:int = sq_data[1]
self._v_position:int = sq_data[2]
@property
def all_users(self) -> list[User]:
return self._all_users
@property
def v_position(self) -> int:
return self._v_position
@property
def v_value(self) -> int:
return self._v_value
@property
def value_func(self) -> Callable[[User], int]:
return lambda u:getattr(u, self.column.name)
class _CountedAndRankedLeaderboard(Leaderboard):
@classmethod
def count_and_label(cls, criteria):
return func.count(criteria).label("count")
@classmethod
def rank_filtered_rank_label_by_desc(cls, criteria):
return func.rank().over(order_by=func.count(criteria).desc()).label("rank")
class BadgeMarseyLeaderboard(_CountedAndRankedLeaderboard):
def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, column:Column):
super().__init__(v, meta)
self.db:scoped_session = db
self.column = column
self._calculate()
def _calculate(self):
sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery()
sq_criteria = None
if self.column == Badge.user_id:
sq_criteria = User.id == sq.c.user_id
elif self.column == Marsey.author_id:
sq_criteria = User.id == sq.c.author_id
else:
raise ValueError("This leaderboard function only supports Badge.user_id and Marsey.author_id")
leaderboard = self.db.query(User, sq.c.count).join(sq, sq_criteria).order_by(sq.c.count.desc())
position:Optional[tuple[int, int, int]] = self.db.query(User.id, sq.c.rank, sq.c.count).join(sq, sq_criteria).filter(User.id == self.v.id).one_or_none()
if position and position[1]:
self._v_position = position[1]
self._v_value = position[2]
else:
self._v_position = leaderboard.count() + 1
self._v_value = 0
self._all_users = {k:v for k, v in leaderboard.limit(self.meta.limit).all()}
@property
def all_users(self) -> list[User]:
return list(self._all_users.keys())
@property
def v_position(self) -> int:
return self._v_position
@property
def v_value(self) -> int:
return self._v_value
@property
def value_func(self) -> Callable[[User], int]:
return lambda u:self._all_users[u]
class UserBlockLeaderboard(_CountedAndRankedLeaderboard):
def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, column:Column):
super().__init__(v, meta)
self.db:scoped_session = db
self.column = column
self._calculate()
def _calculate(self):
if self.column != UserBlock.target_id:
raise ValueError("This leaderboard function only supports UserBlock.target_id")
sq = self.db.query(self.column, self.count_and_label(self.column)).group_by(self.column).subquery()
leaderboard = self.db.query(User, sq.c.count).join(User, User.id == sq.c.target_id).order_by(sq.c.count.desc())
sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery()
position = self.db.query(sq.c.rank, sq.c.count).join(User, User.id == sq.c.target_id).filter(sq.c.target_id == self.v.id).limit(1).one_or_none()
if not position: position = (leaderboard.count() + 1, 0)
leaderboard = leaderboard.limit(self.meta.limit).all()
self._all_users = {k:v for k, v in leaderboard}
self._v_position = position[0]
self._v_value = position[1]
return (leaderboard, position[0], position[1])
@property
def all_users(self) -> list[User]:
return list(self._all_users.keys())
@property
def v_position(self) -> int:
return self._v_position
@property
def v_value(self) -> int:
return self._v_value
class RawSqlLeaderboard(Leaderboard):
def __init__(self, meta:LeaderboardMeta, db:scoped_session, query:str) -> None: # should be LiteralString on py3.11+
super().__init__(None, meta)
self.db = db
self._calculate(query)
def _calculate(self, query:str):
self.result = {result[0]:list(result) for result in self.db.execute(query).all()}
users = get_accounts_dict(self.result.keys(), db=self.db)
if users is None:
raise Exception("Some users don't exist when they should (was a user deleted?)")
for user in users: # I know.
self.result[user].append(users[user])
@property
def all_users(self) -> list[User]:
return [result[2] for result in self.result.values()]
@property
def v_position(self) -> Optional[int]:
return None
@property
def v_value(self) -> Optional[int]:
return None
@property
def v_appears_in_ranking(self) -> bool:
return True # we set this to True here to try and not grab the data
@property
def user_func(self) -> Callable[[Any], User]:
return lambda u:u
@property
def value_func(self) -> Callable[[User], int]:
return lambda u:self.result[u.id][1]
class ReceivedDownvotesLeaderboard(RawSqlLeaderboard):
_query: Final[str] = """
WITH cv_for_user AS (
SELECT
comments.author_id AS target_id,
COUNT(*)
FROM commentvotes cv
JOIN comments ON comments.id = cv.comment_id
WHERE vote_type = -1
GROUP BY comments.author_id
), sv_for_user AS (
SELECT
submissions.author_id AS target_id,
COUNT(*)
FROM votes sv
JOIN submissions ON submissions.id = sv.submission_id
WHERE vote_type = -1
GROUP BY submissions.author_id
)
SELECT
COALESCE(cvfu.target_id, svfu.target_id) AS target_id,
(COALESCE(cvfu.count, 0) + COALESCE(svfu.count, 0)) AS count
FROM cv_for_user cvfu
FULL OUTER JOIN sv_for_user svfu
ON cvfu.target_id = svfu.target_id
ORDER BY count DESC LIMIT 25
"""
def __init__(self, meta:LeaderboardMeta, db:scoped_session) -> None:
super().__init__(meta, db, self._query)
class GivenUpvotesLeaderboard(RawSqlLeaderboard):
_query: Final[str] = """
SELECT
COALESCE(cvbu.user_id, svbu.user_id) AS user_id,
(COALESCE(cvbu.count, 0) + COALESCE(svbu.count, 0)) AS count
FROM (SELECT user_id, COUNT(*) FROM votes WHERE vote_type = 1 GROUP BY user_id) AS svbu
FULL OUTER JOIN (SELECT user_id, COUNT(*) FROM commentvotes WHERE vote_type = 1 GROUP BY user_id) AS cvbu
ON cvbu.user_id = svbu.user_id
ORDER BY count DESC LIMIT 25
"""
def __init__(self, meta:LeaderboardMeta, db:scoped_session) -> None:
super().__init__(meta, db, self._query)