From f323b06230b4a198423005fcd74b7385e23c530f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Sp=C3=B6ttel?= <1682504+fspoettel@users.noreply.github.com> Date: Wed, 28 Jun 2023 09:44:27 +0200 Subject: [PATCH] feat: add task time limit settings, hard time limit --- app/shared/celery.py | 7 +++++-- app/shared/settings.py | 5 ++++- app/web/main.py | 2 +- app/worker/main.py | 9 +++++++-- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/app/shared/celery.py b/app/shared/celery.py index 388d9b4..8bd334a 100644 --- a/app/shared/celery.py +++ b/app/shared/celery.py @@ -4,6 +4,9 @@ from app.shared.settings import settings def get_celery_binding() -> Celery: - celery = Celery() - celery.conf.broker_url = settings.BROKER_URL + celery = Celery( + broker_url=settings.BROKER_URL, + broker_connection_retry=False, + broker_connection_retry_on_startup=False, + ) return celery diff --git a/app/shared/settings.py b/app/shared/settings.py index 70300ea..39d11e2 100644 --- a/app/shared/settings.py +++ b/app/shared/settings.py @@ -8,6 +8,9 @@ class Settings(BaseSettings): DATABASE_URI: str ENVIRONMENT: str + TASK_SOFT_TIME_LIMIT: int = 3 * 60 * 60 + TASK_HARD_TIME_LIMIT: int = 4 * 60 * 60 + # derived settings BROKER_URL: str @@ -17,4 +20,4 @@ if "pytest" in sys.modules: _env_file=".env.test", _env_file_encoding="utf-8" ) # type: ignore else: - settings = Settings() + settings = Settings() # type: ignore diff --git a/app/web/main.py b/app/web/main.py index bc09d69..41bb707 100644 --- a/app/web/main.py +++ b/app/web/main.py @@ -103,7 +103,7 @@ def get_transcripts( "/jobs/{id}", response_model=schemas.Job, responses={404: {"model": DetailResponse, "description": "Not found"}}, - summary="Get metadata for one jobs", + summary="Get metadata for one job", ) def get_transcript( id: UUID = Path(), session: Session = Depends(get_session) diff --git a/app/worker/main.py b/app/worker/main.py index c920b0c..c15fb57 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -9,6 +9,7 @@ import app.shared.db.models as models import app.shared.db.schemas as schemas from app.shared.celery import get_celery_binding from app.shared.db.base import SessionLocal +from app.shared.settings import settings from app.worker.strategies.local import LocalStrategy celery = get_celery_binding() @@ -20,7 +21,6 @@ class TranscribeTask(Task): def __init__(self) -> None: super().__init__() # currently only `LocalStrategy` is implemented. - # TODO: implement remote processing strategy. self.strategy: Optional[LocalStrategy] = None def __call__(self, *args: Any, **kwargs: Any) -> Any: @@ -30,7 +30,12 @@ class TranscribeTask(Task): return self.run(*args, **kwargs) -@celery.task(base=TranscribeTask, bind=True, soft_time_limit=2 * 60 * 60) +@celery.task( + base=TranscribeTask, + bind=True, + soft_time_limit=settings.TASK_SOFT_TIME_LIMIT, + time_limit=settings.TASK_HARD_TIME_LIMIT, +) def transcribe(self: Task, job_id: UUID) -> None: try: # runs in a separate thread => requires sqlite's WAL mode to be enabled.