rDrama/files/classes/cron/tasks.py

401 lines
11 KiB
Python

from __future__ import annotations
import contextlib
import dataclasses
from datetime import date, datetime, timedelta, timezone
from enum import IntEnum, IntFlag
from typing import TYPE_CHECKING, Final, Optional, Union
import flask
import flask_caching
import flask_mail
import redis
from sqlalchemy.orm import relationship, Session
from sqlalchemy.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import (Boolean, DateTime, Integer, SmallInteger,
Text, Time, String)
from files.classes.base import CreatedBase
from files.helpers.time import format_age, format_datetime
if TYPE_CHECKING:
from files.classes.user import User
class ScheduledTaskType(IntEnum):
PYTHON_CALLABLE = 1
SCHEDULED_SUBMISSION = 2
def __str__(self):
if not self.name: return super().__str__()
return self.name.replace('_', ' ').title()
class ScheduledTaskState(IntEnum):
WAITING = 1
'''
A task waiting to be triggered
'''
RUNNING = 2
'''
A task that is currently running
'''
class DayOfWeek(IntFlag):
SUNDAY = 1 << 1
MONDAY = 1 << 2
TUESDAY = 1 << 3
WEDNESDAY = 1 << 4
THURSDAY = 1 << 5
FRIDAY = 1 << 6
SATURDAY = 1 << 7
WEEKDAYS = MONDAY | TUESDAY | WEDNESDAY | THURSDAY | FRIDAY
WEEKENDS = SATURDAY | SUNDAY
NONE = 0 << 0
ALL = WEEKDAYS | WEEKENDS
@classmethod
@property
def all_days(cls) -> list["DayOfWeek"]:
return [
cls.SUNDAY, cls.MONDAY, cls.TUESDAY, cls.WEDNESDAY,
cls.THURSDAY, cls.FRIDAY, cls.SATURDAY
]
@property
def empty(self) -> bool:
return self not in self.ALL
def __contains__(self, other:Union[date, "DayOfWeek"]) -> bool:
_days:dict[int, "DayOfWeek"] = {
0: self.MONDAY,
1: self.TUESDAY,
2: self.WEDNESDAY,
3: self.THURSDAY,
4: self.FRIDAY,
5: self.SATURDAY,
6: self.SUNDAY
}
if not isinstance(other, date):
return super().__contains__(other)
weekday:int = other.weekday()
if not 0 <= weekday <= 6:
raise Exception(
f"Unexpected weekday value (got {weekday}, expected 0-6)")
return _days[weekday] in self
_UserConvertible = Union["User", str, int]
@dataclasses.dataclass(frozen=True, kw_only=True, slots=True)
class TaskRunContext:
'''
A full task run context, with references to all app globals embedded.
This is the entirety of the application's global state at this point.
This is explicit state. This is useful so scheduled tasks do not have
to import from `files.__main__` and so they can use all of the features
of the application without being in a request context.
'''
app:flask.app.Flask
'''
The application. Many of the app functions use the app context globals and
do not have their state explicitly passed. This is a convenience get out of
jail free card so that most features (excepting those that require a
`request` context can be used.)
'''
cache:flask_caching.Cache
'''
A cache extension. This is useful for situations where a scheduled task
might want to interact with the cache in some way (for example invalidating
or adding something to the cache.)
'''
db:Session
'''
A database session. Useful for when a task needs to modify something in the
database (for example creating a submission)
'''
mail:flask_mail.Mail
'''
The mail extension. Needed for sending emails.
'''
redis:redis.Redis
'''
A direct reference to our redis connection. Normally most operations that
involve the redis datastore use flask_caching's Cache object (accessed via
the `cache` property), however this is provided as a convenience for more
granular redis operations.
'''
task:RepeatableTask
'''
A reference to the task that is being ran.
'''
task_run:RepeatableTaskRun
'''
A reference to this current run of the task.
'''
trigger_time:datetime
'''
The date and time (UTC) that this task was triggered
'''
@property
def run_time(self) -> datetime:
'''
The date and time (UTC) that this task was actually ran
'''
return self.task_run.created_datetime_py
@contextlib.contextmanager
def app_context(self, *, v:Optional[_UserConvertible]=None):
'''
Context manager that uses `self.app` to generate an app context and set
up the application with expected globals. This assigns `g.db`, `g.v`,
and `g.debug`.
This is intended for use with legacy code that does not pass state
explicitly and instead relies on the use of `g` for state passing. If
at all possible, state should be passed explicitly to functions that
require it.
Usage is simple:
```py
with ctx.app_context() as app_ctx:
# code that requires g
```
Any code that uses `g` can be ran here. As this is intended for
scenarios that may be outside of a request context code that uses the
request context may still raise `RuntimeException`s.
An example
```py
from flask import g, request # import works ok
def legacy_function():
u:Optional[User] = g.db.get(User, 1784) # works ok! :)
u.admin_level = \\
request.values.get("admin_level", default=9001, type=int)
# raises a RuntimeError :(
g.db.commit()
```
This is because there is no actual request being made. Creating a
mock request context is doable but outside of the scope of this
function as this is often not needed outside of route handlers (where
this function is out of scope anyway).
:param v: A `User`, an `int`, a `str`, or `None`. `g.v` will be set
using the following rules:
1. If `v` is an `int`, `files.helpers.get_account` is called and the
result of that is stored in `g.v`.
2. If `v` is an `str`, `files.helpers.get_user` is called and the
result of that is stored in `g.v`.
3. If `v` is a `User`, it is stored in `g.v`.
It is expected that callees will provide a valid user ID or username.
If an invalid one is provided, *no* exception will be raised and `g.v`
will be set to `None`.
This is mainly provided as an optional feature so that tasks can be
somewhat "sudo"ed as a particular user. Note that `g.v` is always
assigned (even if to `None`) in order to prevent code that depends on
its existence from raising an exception.
'''
with self.app.app_context() as app_ctx:
app_ctx.g.db = self.db
from files.helpers.get import get_account, get_user
if isinstance(v, str):
v = get_user(v, graceful=True)
elif isinstance(v, int):
v = get_account(v, graceful=True, db=self.db)
app_ctx.g.v = v
app_ctx.g.debug = self.app.debug
yield app_ctx
@contextlib.contextmanager
def db_transaction(self):
try:
yield
self.db.commit()
except:
self.db.rollback()
_TABLE_NAME: Final[str] = "tasks_repeatable"
class RepeatableTask(CreatedBase):
__tablename__ = _TABLE_NAME
id = Column(Integer, primary_key=True, nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
type_id = Column(SmallInteger, nullable=False)
enabled = Column(Boolean, default=True, nullable=False)
run_state = Column(SmallInteger, default=int(ScheduledTaskState.WAITING), nullable=False)
run_time_last = Column(DateTime, default=None)
frequency_day = Column(SmallInteger, nullable=False)
time_of_day_utc = Column(Time, nullable=False)
# used for the cron hardcoding system
label = Column(String, nullable=True, unique=True)
runs = relationship("RepeatableTaskRun", back_populates="task")
@property
def type(self) -> ScheduledTaskType:
return ScheduledTaskType(self.type_id)
@type.setter
def type(self, value:ScheduledTaskType):
self.type_id = value
@property
def frequency_day_flags(self) -> DayOfWeek:
return DayOfWeek(self.frequency_day)
@frequency_day_flags.setter
def frequency_day_flags(self, value:DayOfWeek):
self.frequency_day = int(value)
@property
def run_state_enum(self) -> ScheduledTaskState:
return ScheduledTaskState(self.run_state)
@run_state_enum.setter
def run_state_enum(self, value:ScheduledTaskState):
self.run_state = int(value)
@property
def run_time_last_or_created_utc(self) -> datetime:
return self.run_time_last or self.created_datetime_py
@property
def run_time_last_str(self) -> str:
if not self.run_time_last: return 'Never'
return (f'{format_datetime(self.run_time_last)} '
f'({format_age(self.run_time_last)})')
@property
def trigger_time(self) -> datetime | None:
return self.next_trigger(self.run_time_last_or_created_utc)
def can_run(self, now: datetime) -> bool:
return not (
self.trigger_time is None
or now < self.trigger_time
or self.run_state_enum != ScheduledTaskState.WAITING)
def next_trigger(self, anchor: datetime) -> datetime | None:
if not self.enabled: return None
if self.frequency_day_flags.empty: return None
day:timedelta = timedelta(1.0)
target_date:datetime = anchor - day # incremented at start of for loop
for i in range(8):
target_date = target_date + day
if i == 0 and target_date.time() > self.time_of_day_utc: continue
if target_date in self.frequency_day_flags: break
else:
raise Exception("Could not find suitable timestamp to run next task")
return datetime.combine(target_date, self.time_of_day_utc, tzinfo=timezone.utc) # type: ignore
def run(self, db: Session, trigger_time: datetime) -> RepeatableTaskRun:
run:RepeatableTaskRun = RepeatableTaskRun(task_id=self.id)
try:
from files.__main__ import app, cache, mail, r # i know
ctx: TaskRunContext = TaskRunContext(
app=app,
cache=cache,
db=db,
mail=mail,
redis=r,
task=self,
task_run=run,
trigger_time=trigger_time,
)
self.run_task(ctx)
except Exception as e:
run.exception = e
run.completed_utc = datetime.now(tz=timezone.utc)
db.add(run)
return run
def run_task(self, ctx:TaskRunContext):
raise NotImplementedError()
def contains_day_str(self, day_str:str) -> bool:
return (bool(day_str)
and DayOfWeek[day_str.upper()] in self.frequency_day_flags)
def __repr__(self) -> str:
return f'<{self.__class__.__name__}(id={self.id}, created_utc={self.created_date}, author_id={self.author_id})>'
__mapper_args__ = {
"polymorphic_on": type_id,
}
class RepeatableTaskRun(CreatedBase):
__tablename__ = "tasks_repeatable_runs"
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey(RepeatableTask.id), nullable=False)
manual = Column(Boolean, default=False, nullable=False)
traceback_str = Column(Text, nullable=True)
completed_utc = Column(DateTime)
task = relationship(RepeatableTask, back_populates="runs")
_exception: Optional[Exception] = None # not part of the db model
@property
def completed_datetime_py(self) -> datetime | None:
if self.completed_utc is None:
return None
return datetime.combine(
self.completed_utc.date(),
self.completed_utc.time(),
timezone.utc)
@property
def completed_datetime_str(self) -> str:
return format_datetime(self.completed_utc)
@property
def status_text(self) -> str:
if not self.completed_utc: return "Running"
return "Failed" if self.traceback_str else "Completed"
@property
def time_elapsed(self) -> Optional[timedelta]:
if self.completed_datetime_py is None: return None
return self.completed_datetime_py - self.created_datetime_py
@property
def time_elapsed_str(self) -> str:
elapsed:Optional[timedelta] = self.time_elapsed
if not elapsed: return ''
return str(elapsed)
@property
def exception(self) -> Optional[Exception]:
return self._exception
@exception.setter
def exception(self, value: Optional[Exception]) -> None:
self._exception = value
self.traceback_str = str(value) if value else None