diff --git a/src/main.py b/src/main.py index 44493e6..6223887 100644 --- a/src/main.py +++ b/src/main.py @@ -102,30 +102,29 @@ def archive_tasks(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth @app.get("/tasks/{task_id}") def get_status(task_id, email = Depends(get_bearer_auth)): - logger.info(f"status check for user {email}") - task_result = AsyncResult(task_id, app=celery) - result = { - "id": task_id, - "status": task_result.status, - "result": task_result.result - } + logger.info(f"status check for user {email} task {task_id}") + task = AsyncResult(task_id, app=celery) try: - if task_result.result and "error" in task_result.result: - result["status"] = "FAILURE" - except Exception as e: - logger.error(e) - logger.error(traceback.format_exc()) - result["status"] = "FAILURE" - try: - json_result = jsonable_encoder(result, exclude_unset=True) - return JSONResponse(json_result) + if task.status == "FAILURE": + # *FAILURE* The task raised an exception, or has exceeded the retry limit. + # The :attr:`result` attribute then contains the exception raised by the task. + # https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult + raise task.result + + response = { + "id": task_id, + "status": task.status, + "result": task.result + } + return JSONResponse(jsonable_encoder(response, exclude_unset=True)) + except Exception as e: logger.error(e) logger.error(traceback.format_exc()) return JSONResponse({ "id": task_id, "status": "FAILURE", - "result": {"error": e} + "result": {"error": str(e)} }) @app.delete("/tasks/{task_id}") diff --git a/src/worker.py b/src/worker.py index 90ab531..b68deb6 100644 --- a/src/worker.py +++ b/src/worker.py @@ -30,7 +30,9 @@ def get_db(): def create_archive_task(self, archive_json: str): archive = schemas.ArchiveCreate.parse_raw(archive_json) - if (em := is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id)): return {"error": em} + invalid = is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id) + if (invalid): + raise Exception(invalid) # marks task FAILED, saves the Exception as reult url = archive.url logger.info(f"{url=} {archive=}") @@ -40,10 +42,11 @@ def create_archive_task(self, archive_json: str): try: insert_result_into_db(result, archive.tags, archive.public, archive.group_id, archive.author_id, self.request.id) except Exception as e: + # Log it, then raise again to store the error as the task result logger.error(e) logger.error(traceback.format_exc()) - return {"error": e} - return result.to_json() + raise e + return result.to_dict() @celery.task(name="create_sheet_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0})