from dataclasses import dataclass from typing import Any, Callable, Final, Optional from sqlalchemy import Column, func from sqlalchemy.orm import scoped_session, Query from files.helpers.const import LEADERBOARD_LIMIT from files.classes.badges import Badge from files.classes.marsey import Marsey from files.classes.user import User from files.classes.userblock import UserBlock from files.helpers.get import get_accounts_dict @dataclass(frozen=True, slots=True) class LeaderboardMeta: header_name:str table_header_name:str html_id:str table_column_name:str user_relative_url:Optional[str] limit:int=LEADERBOARD_LIMIT class Leaderboard: def __init__(self, v:Optional[User], meta:LeaderboardMeta) -> None: self.v:Optional[User] = v self.meta:LeaderboardMeta = meta @property def all_users(self) -> list[User]: raise NotImplementedError() @property def v_position(self) -> Optional[int]: raise NotImplementedError() @property def v_value(self) -> Optional[int]: raise NotImplementedError() @property def v_appears_in_ranking(self) -> bool: return self.v_position is not None and self.v_position <= len(self.all_users) @property def user_func(self) -> Callable[[Any], User]: return lambda u:u @property def value_func(self) -> Callable[[User], int]: raise NotImplementedError() class SimpleLeaderboard(Leaderboard): def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, users_query:Query, column:Column): super().__init__(v, meta) self.db:scoped_session = db self.users_query:Query = users_query self.column:Column = column self._calculate() def _calculate(self) -> None: self._all_users = self.users_query.order_by(self.column.desc()).limit(self.meta.limit).all() if self.v not in self._all_users: sq = self.db.query(User.id, self.column, func.rank().over(order_by=self.column.desc()).label("rank")).subquery() sq_data = self.db.query(sq.c.id, sq.c[self.column.name], sq.c.rank).filter(sq.c.id == self.v.id).limit(1).one() self._v_value:int = sq_data[1] self._v_position:int = sq_data[2] @property def all_users(self) -> list[User]: return self._all_users @property def v_position(self) -> int: return self._v_position @property def v_value(self) -> int: return self._v_value @property def value_func(self) -> Callable[[User], int]: return lambda u:getattr(u, self.column.name) class _CountedAndRankedLeaderboard(Leaderboard): @classmethod def count_and_label(cls, criteria): return func.count(criteria).label("count") @classmethod def rank_filtered_rank_label_by_desc(cls, criteria): return func.rank().over(order_by=func.count(criteria).desc()).label("rank") class BadgeMarseyLeaderboard(_CountedAndRankedLeaderboard): def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, column:Column): super().__init__(v, meta) self.db:scoped_session = db self.column = column self._calculate() def _calculate(self): sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery() sq_criteria = None if self.column == Badge.user_id: sq_criteria = User.id == sq.c.user_id elif self.column == Marsey.author_id: sq_criteria = User.id == sq.c.author_id else: raise ValueError("This leaderboard function only supports Badge.user_id and Marsey.author_id") leaderboard = self.db.query(User, sq.c.count).join(sq, sq_criteria).order_by(sq.c.count.desc()) position:Optional[tuple[int, int, int]] = self.db.query(User.id, sq.c.rank, sq.c.count).join(sq, sq_criteria).filter(User.id == self.v.id).one_or_none() if position and position[1]: self._v_position = position[1] self._v_value = position[2] else: self._v_position = leaderboard.count() + 1 self._v_value = 0 self._all_users = {k:v for k, v in leaderboard.limit(self.meta.limit).all()} @property def all_users(self) -> list[User]: return list(self._all_users.keys()) @property def v_position(self) -> int: return self._v_position @property def v_value(self) -> int: return self._v_value @property def value_func(self) -> Callable[[User], int]: return lambda u:self._all_users[u] class UserBlockLeaderboard(_CountedAndRankedLeaderboard): def __init__(self, v:User, meta:LeaderboardMeta, db:scoped_session, column:Column): super().__init__(v, meta) self.db:scoped_session = db self.column = column self._calculate() def _calculate(self): if self.column != UserBlock.target_id: raise ValueError("This leaderboard function only supports UserBlock.target_id") sq = self.db.query(self.column, self.count_and_label(self.column)).group_by(self.column).subquery() leaderboard = self.db.query(User, sq.c.count).join(User, User.id == sq.c.target_id).order_by(sq.c.count.desc()) sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery() position = self.db.query(sq.c.rank, sq.c.count).join(User, User.id == sq.c.target_id).filter(sq.c.target_id == self.v.id).limit(1).one_or_none() if not position: position = (leaderboard.count() + 1, 0) leaderboard = leaderboard.limit(self.meta.limit).all() self._all_users = {k:v for k, v in leaderboard} self._v_position = position[0] self._v_value = position[1] return (leaderboard, position[0], position[1]) @property def all_users(self) -> list[User]: return list(self._all_users.keys()) @property def v_position(self) -> int: return self._v_position @property def v_value(self) -> int: return self._v_value class RawSqlLeaderboard(Leaderboard): def __init__(self, meta:LeaderboardMeta, db:scoped_session, query:str) -> None: # should be LiteralString on py3.11+ super().__init__(None, meta) self.db = db self._calculate(query) def _calculate(self, query:str): self.result = {result[0]:list(result) for result in self.db.execute(query).all()} users = get_accounts_dict(self.result.keys(), db=self.db) if users is None: raise Exception("Some users don't exist when they should (was a user deleted?)") for user in users: # I know. self.result[user].append(users[user]) @property def all_users(self) -> list[User]: return [result[2] for result in self.result.values()] @property def v_position(self) -> Optional[int]: return None @property def v_value(self) -> Optional[int]: return None @property def v_appears_in_ranking(self) -> bool: return True # we set this to True here to try and not grab the data @property def user_func(self) -> Callable[[Any], User]: return lambda u:u @property def value_func(self) -> Callable[[User], int]: return lambda u:self.result[u.id][1] class ReceivedDownvotesLeaderboard(RawSqlLeaderboard): _query: Final[str] = """ WITH cv_for_user AS ( SELECT comments.author_id AS target_id, COUNT(*) FROM commentvotes cv JOIN comments ON comments.id = cv.comment_id WHERE vote_type = -1 GROUP BY comments.author_id ), sv_for_user AS ( SELECT submissions.author_id AS target_id, COUNT(*) FROM votes sv JOIN submissions ON submissions.id = sv.submission_id WHERE vote_type = -1 GROUP BY submissions.author_id ) SELECT COALESCE(cvfu.target_id, svfu.target_id) AS target_id, (COALESCE(cvfu.count, 0) + COALESCE(svfu.count, 0)) AS count FROM cv_for_user cvfu FULL OUTER JOIN sv_for_user svfu ON cvfu.target_id = svfu.target_id ORDER BY count DESC LIMIT 25 """ def __init__(self, meta:LeaderboardMeta, db:scoped_session) -> None: super().__init__(meta, db, self._query) class GivenUpvotesLeaderboard(RawSqlLeaderboard): _query: Final[str] = """ SELECT COALESCE(cvbu.user_id, svbu.user_id) AS user_id, (COALESCE(cvbu.count, 0) + COALESCE(svbu.count, 0)) AS count FROM (SELECT user_id, COUNT(*) FROM votes WHERE vote_type = 1 GROUP BY user_id) AS svbu FULL OUTER JOIN (SELECT user_id, COUNT(*) FROM commentvotes WHERE vote_type = 1 GROUP BY user_id) AS cvbu ON cvbu.user_id = svbu.user_id ORDER BY count DESC LIMIT 25 """ def __init__(self, meta:LeaderboardMeta, db:scoped_session) -> None: super().__init__(meta, db, self._query)