diff --git a/files/__main__.py b/files/__main__.py index f0e1a3be2..8f22f00c4 100644 --- a/files/__main__.py +++ b/files/__main__.py @@ -194,11 +194,12 @@ limiter = flask_limiter.Limiter( # ...and then after that we can load the database. engine: Engine = create_engine(DATABASE_URL) -db_session: scoped_session = scoped_session(sessionmaker( +db_session_factory: sessionmaker = sessionmaker( bind=engine, autoflush=False, future=True, -)) +) +db_session: scoped_session = scoped_session(db_session_factory) # now that we've that, let's add the cache, compression, and mail extensions to our app... diff --git a/files/commands/cron.py b/files/commands/cron.py index 1595e98df..4f672f332 100644 --- a/files/commands/cron.py +++ b/files/commands/cron.py @@ -4,9 +4,9 @@ import time from datetime import datetime, timezone from typing import Final -from sqlalchemy.orm import scoped_session, Session +from sqlalchemy.orm import sessionmaker, Session -from files.__main__ import app, db_session +from files.__main__ import app, db_session_factory from files.classes.cron.tasks import (DayOfWeek, RepeatableTask, RepeatableTaskRun, ScheduledTaskState) @@ -41,7 +41,7 @@ def cron_app_worker(): logging.info("Starting scheduler worker process") while True: try: - _run_tasks(db_session) + _run_tasks(db_session_factory) except Exception as e: logging.exception( "An unhandled exception occurred while running tasks", @@ -77,7 +77,7 @@ def _acquire_lock_exclusive(db: Session, table: str): raise -def _run_tasks(db_session_factory: scoped_session): +def _run_tasks(db_session_factory: sessionmaker): ''' Runs tasks, attempting to guarantee that a task is ran once and only once. This uses postgres to lock the table containing our tasks at key points in @@ -87,54 +87,53 @@ def _run_tasks(db_session_factory: scoped_session): running task does not lock the entire table for its entire run, which would for example, prevent any statistics about status from being gathered. ''' + db: Session = db_session_factory() - db: Session - with db_session_factory() as db: + with _acquire_lock_exclusive(db, RepeatableTask.__tablename__): + now: datetime = datetime.now(tz=timezone.utc) + + tasks: list[RepeatableTask] = db.query(RepeatableTask).filter( + RepeatableTask.enabled == True, + RepeatableTask.frequency_day != int(DayOfWeek.NONE), + RepeatableTask.run_state != int(ScheduledTaskState.RUNNING), + (RepeatableTask.run_time_last <= now) + | (RepeatableTask.run_time_last == None), + ).all() + + # SQLA needs to query again for the inherited object info anyway + # so it's fine that objects in the list get expired on txn end. + # Prefer more queries to risk of task run duplication. + tasks_to_run: list[RepeatableTask] = list(filter( + lambda task: task.can_run(now), tasks)) + + for task in tasks_to_run: + now = datetime.now(tz=timezone.utc) with _acquire_lock_exclusive(db, RepeatableTask.__tablename__): - now: datetime = datetime.now(tz=timezone.utc) + # We need to check for runnability again because we don't mutex + # the RepeatableTask.run_state until now. + if not task.can_run(now): + continue + task.run_time_last = now + task.run_state_enum = ScheduledTaskState.RUNNING - tasks: list[RepeatableTask] = db.query(RepeatableTask).filter( - RepeatableTask.enabled == True, - RepeatableTask.frequency_day != int(DayOfWeek.NONE), - RepeatableTask.run_state != int(ScheduledTaskState.RUNNING), - (RepeatableTask.run_time_last <= now) - | (RepeatableTask.run_time_last == None), - ).all() + # This *must* happen before we start doing db queries, including sqlalchemy db queries + db.begin() + task_debug_identifier = f"(ID {task.id}:{task.label})" + logging.info(f"Running task {task_debug_identifier}") - # SQLA needs to query again for the inherited object info anyway - # so it's fine that objects in the list get expired on txn end. - # Prefer more queries to risk of task run duplication. - tasks_to_run: list[RepeatableTask] = list(filter( - lambda task: task.can_run(now), tasks)) + run: RepeatableTaskRun = task.run(db, task.run_time_last_or_created_utc) - for task in tasks_to_run: - now = datetime.now(tz=timezone.utc) - with _acquire_lock_exclusive(db, RepeatableTask.__tablename__): - # We need to check for runnability again because we don't mutex - # the RepeatableTask.run_state until now. - if not task.can_run(now): - continue - task.run_time_last = now - task.run_state_enum = ScheduledTaskState.RUNNING + if run.exception: + # TODO: collect errors somewhere other than just here and in the + # task run object itself (see #220). + logging.exception( + f"Exception running task {task_debug_identifier}", + exc_info=run.exception + ) + db.rollback() + else: + db.commit() + logging.info(f"Finished task {task_debug_identifier}") - # This *must* happen before we start doing db queries, including sqlalchemy db queries - db.begin() - task_debug_identifier = f"(ID {task.id}:{task.label})" - logging.info(f"Running task {task_debug_identifier}") - - run: RepeatableTaskRun = task.run(db, task.run_time_last_or_created_utc) - - if run.exception: - # TODO: collect errors somewhere other than just here and in the - # task run object itself (see #220). - logging.exception( - f"Exception running task {task_debug_identifier}", - exc_info=run.exception - ) - db.rollback() - else: - db.commit() - logging.info(f"Finished task {task_debug_identifier}") - - with _acquire_lock_exclusive(db, RepeatableTask.__tablename__): - task.run_state_enum = ScheduledTaskState.WAITING + with _acquire_lock_exclusive(db, RepeatableTask.__tablename__): + task.run_state_enum = ScheduledTaskState.WAITING