
* leaderboard: correct LB set when !ENABLE_SERVICES * leaderboard: fix NotImplementedError UserBlockLeaderboard did not, in fact, implement `value_func`. Nor did its superclass. The bug was replicated by having at least one UserBlock in the test data.
256 lines
8.1 KiB
Python
256 lines
8.1 KiB
Python
from dataclasses import dataclass
|
|
from typing import Any, Callable, Final, Optional
|
|
|
|
from sqlalchemy import Column, func
|
|
from sqlalchemy.orm import Session, Query
|
|
|
|
from files.helpers.config.const import LEADERBOARD_LIMIT
|
|
|
|
from files.classes.badges import Badge
|
|
from files.classes.marsey import Marsey
|
|
from files.classes.user import User
|
|
from files.classes.userblock import UserBlock
|
|
from files.helpers.get import get_accounts_dict
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class LeaderboardMeta:
|
|
header_name:str
|
|
table_header_name:str
|
|
html_id:str
|
|
table_column_name:str
|
|
user_relative_url:Optional[str]
|
|
limit:int=LEADERBOARD_LIMIT
|
|
|
|
class Leaderboard:
|
|
def __init__(self, v:Optional[User], meta:LeaderboardMeta) -> None:
|
|
self.v:Optional[User] = v
|
|
self.meta:LeaderboardMeta = meta
|
|
|
|
@property
|
|
def all_users(self) -> list[User]:
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
def v_position(self) -> Optional[int]:
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
def v_value(self) -> Optional[int]:
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
def v_appears_in_ranking(self) -> bool:
|
|
return self.v_position is not None and self.v_position <= len(self.all_users)
|
|
|
|
@property
|
|
def user_func(self) -> Callable[[Any], User]:
|
|
return lambda u:u
|
|
|
|
@property
|
|
def value_func(self) -> Callable[[User], int]:
|
|
raise NotImplementedError()
|
|
|
|
class SimpleLeaderboard(Leaderboard):
|
|
def __init__(self, v:User, meta:LeaderboardMeta, db:Session, users_query:Query, column:Column):
|
|
super().__init__(v, meta)
|
|
self.db:Session = db
|
|
self.users_query:Query = users_query
|
|
self.column:Column = column
|
|
self._calculate()
|
|
|
|
def _calculate(self) -> None:
|
|
self._all_users = self.users_query.order_by(self.column.desc()).limit(self.meta.limit).all()
|
|
if self.v not in self._all_users:
|
|
sq = self.db.query(User.id, self.column, func.rank().over(order_by=self.column.desc()).label("rank")).subquery()
|
|
sq_data = self.db.query(sq.c.id, sq.c[self.column.name], sq.c.rank).filter(sq.c.id == self.v.id).limit(1).one()
|
|
self._v_value:int = sq_data[1]
|
|
self._v_position:int = sq_data[2]
|
|
|
|
@property
|
|
def all_users(self) -> list[User]:
|
|
return self._all_users
|
|
|
|
@property
|
|
def v_position(self) -> int:
|
|
return self._v_position
|
|
|
|
@property
|
|
def v_value(self) -> int:
|
|
return self._v_value
|
|
|
|
@property
|
|
def value_func(self) -> Callable[[User], int]:
|
|
return lambda u:getattr(u, self.column.name)
|
|
|
|
class _CountedAndRankedLeaderboard(Leaderboard):
|
|
@classmethod
|
|
def count_and_label(cls, criteria):
|
|
return func.count(criteria).label("count")
|
|
|
|
@classmethod
|
|
def rank_filtered_rank_label_by_desc(cls, criteria):
|
|
return func.rank().over(order_by=func.count(criteria).desc()).label("rank")
|
|
|
|
class BadgeMarseyLeaderboard(_CountedAndRankedLeaderboard):
|
|
def __init__(self, v:User, meta:LeaderboardMeta, db:Session, column:Column):
|
|
super().__init__(v, meta)
|
|
self.db:Session = db
|
|
self.column = column
|
|
self._calculate()
|
|
|
|
def _calculate(self):
|
|
sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery()
|
|
sq_criteria = None
|
|
if self.column == Badge.user_id:
|
|
sq_criteria = User.id == sq.c.user_id
|
|
elif self.column == Marsey.author_id:
|
|
sq_criteria = User.id == sq.c.author_id
|
|
else:
|
|
raise ValueError("This leaderboard function only supports Badge.user_id and Marsey.author_id")
|
|
leaderboard = self.db.query(User, sq.c.count).join(sq, sq_criteria).order_by(sq.c.count.desc())
|
|
|
|
position:Optional[tuple[int, int, int]] = self.db.query(User.id, sq.c.rank, sq.c.count).join(sq, sq_criteria).filter(User.id == self.v.id).one_or_none()
|
|
if position and position[1]:
|
|
self._v_position = position[1]
|
|
self._v_value = position[2]
|
|
else:
|
|
self._v_position = leaderboard.count() + 1
|
|
self._v_value = 0
|
|
self._all_users = {k:v for k, v in leaderboard.limit(self.meta.limit).all()}
|
|
|
|
@property
|
|
def all_users(self) -> list[User]:
|
|
return list(self._all_users.keys())
|
|
|
|
@property
|
|
def v_position(self) -> int:
|
|
return self._v_position
|
|
|
|
@property
|
|
def v_value(self) -> int:
|
|
return self._v_value
|
|
|
|
@property
|
|
def value_func(self) -> Callable[[User], int]:
|
|
return lambda u: self._all_users[u]
|
|
|
|
class UserBlockLeaderboard(_CountedAndRankedLeaderboard):
|
|
def __init__(self, v:User, meta:LeaderboardMeta, db:Session, column:Column):
|
|
super().__init__(v, meta)
|
|
self.db:Session = db
|
|
self.column = column
|
|
self._calculate()
|
|
|
|
def _calculate(self):
|
|
if self.column != UserBlock.target_id:
|
|
raise ValueError("This leaderboard function only supports UserBlock.target_id")
|
|
sq = self.db.query(self.column, self.count_and_label(self.column)).group_by(self.column).subquery()
|
|
leaderboard = self.db.query(User, sq.c.count).join(User, User.id == sq.c.target_id).order_by(sq.c.count.desc())
|
|
|
|
sq = self.db.query(self.column, self.count_and_label(self.column), self.rank_filtered_rank_label_by_desc(self.column)).group_by(self.column).subquery()
|
|
position = self.db.query(sq.c.rank, sq.c.count).join(User, User.id == sq.c.target_id).filter(sq.c.target_id == self.v.id).limit(1).one_or_none()
|
|
if not position: position = (leaderboard.count() + 1, 0)
|
|
leaderboard = leaderboard.limit(self.meta.limit).all()
|
|
self._all_users = {k:v for k, v in leaderboard}
|
|
self._v_position = position[0]
|
|
self._v_value = position[1]
|
|
return (leaderboard, position[0], position[1])
|
|
|
|
@property
|
|
def all_users(self) -> list[User]:
|
|
return list(self._all_users.keys())
|
|
|
|
@property
|
|
def v_position(self) -> int:
|
|
return self._v_position
|
|
|
|
@property
|
|
def v_value(self) -> int:
|
|
return self._v_value
|
|
|
|
@property
|
|
def value_func(self) -> Callable[[User], int]:
|
|
return lambda u: self._all_users[u]
|
|
|
|
class RawSqlLeaderboard(Leaderboard):
|
|
def __init__(self, meta:LeaderboardMeta, db:Session, query:str) -> None: # should be LiteralString on py3.11+
|
|
super().__init__(None, meta)
|
|
self.db = db
|
|
self._calculate(query)
|
|
|
|
def _calculate(self, query:str):
|
|
self.result = {result[0]:list(result) for result in self.db.execute(query).all()}
|
|
users = get_accounts_dict(self.result.keys(), db=self.db)
|
|
if users is None:
|
|
raise Exception("Some users don't exist when they should (was a user deleted?)")
|
|
for user in users: # I know.
|
|
self.result[user].append(users[user])
|
|
|
|
@property
|
|
def all_users(self) -> list[User]:
|
|
return [result[2] for result in self.result.values()]
|
|
|
|
@property
|
|
def v_position(self) -> Optional[int]:
|
|
return None
|
|
|
|
@property
|
|
def v_value(self) -> Optional[int]:
|
|
return None
|
|
|
|
@property
|
|
def v_appears_in_ranking(self) -> bool:
|
|
return True # we set this to True here to try and not grab the data
|
|
|
|
@property
|
|
def user_func(self) -> Callable[[Any], User]:
|
|
return lambda u:u
|
|
|
|
@property
|
|
def value_func(self) -> Callable[[User], int]:
|
|
return lambda u:self.result[u.id][1]
|
|
|
|
class ReceivedDownvotesLeaderboard(RawSqlLeaderboard):
|
|
_query: Final[str] = """
|
|
WITH cv_for_user AS (
|
|
SELECT
|
|
comments.author_id AS target_id,
|
|
COUNT(*)
|
|
FROM commentvotes cv
|
|
JOIN comments ON comments.id = cv.comment_id
|
|
WHERE vote_type = -1
|
|
GROUP BY comments.author_id
|
|
), sv_for_user AS (
|
|
SELECT
|
|
submissions.author_id AS target_id,
|
|
COUNT(*)
|
|
FROM votes sv
|
|
JOIN submissions ON submissions.id = sv.submission_id
|
|
WHERE vote_type = -1
|
|
GROUP BY submissions.author_id
|
|
)
|
|
SELECT
|
|
COALESCE(cvfu.target_id, svfu.target_id) AS target_id,
|
|
(COALESCE(cvfu.count, 0) + COALESCE(svfu.count, 0)) AS count
|
|
FROM cv_for_user cvfu
|
|
FULL OUTER JOIN sv_for_user svfu
|
|
ON cvfu.target_id = svfu.target_id
|
|
ORDER BY count DESC LIMIT 25
|
|
"""
|
|
|
|
def __init__(self, meta:LeaderboardMeta, db:Session) -> None:
|
|
super().__init__(meta, db, self._query)
|
|
|
|
class GivenUpvotesLeaderboard(RawSqlLeaderboard):
|
|
_query: Final[str] = """
|
|
SELECT
|
|
COALESCE(cvbu.user_id, svbu.user_id) AS user_id,
|
|
(COALESCE(cvbu.count, 0) + COALESCE(svbu.count, 0)) AS count
|
|
FROM (SELECT user_id, COUNT(*) FROM votes WHERE vote_type = 1 GROUP BY user_id) AS svbu
|
|
FULL OUTER JOIN (SELECT user_id, COUNT(*) FROM commentvotes WHERE vote_type = 1 GROUP BY user_id) AS cvbu
|
|
ON cvbu.user_id = svbu.user_id
|
|
ORDER BY count DESC LIMIT 25
|
|
"""
|
|
|
|
def __init__(self, meta:LeaderboardMeta, db:Session) -> None:
|
|
super().__init__(meta, db, self._query)
|