From 1b39f2c291909746ddee3d77c98e19e1c32105c5 Mon Sep 17 00:00:00 2001 From: Lilia Kai Date: Tue, 5 Sep 2023 17:01:52 +0200 Subject: [PATCH 1/3] Rename variables in get_status There are no logic changes in this commit, just renamed variables so that fewer things are called "result" which seemed confusing. Instead of result.result = task_result.result, we can say response.result = task.result --- src/main.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main.py b/src/main.py index f879961..a6a8c46 100644 --- a/src/main.py +++ b/src/main.py @@ -104,22 +104,21 @@ def archive_tasks(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth @app.get("/tasks/{task_id}") def get_status(task_id, email = Depends(get_bearer_auth)): logger.info(f"status check for user {email}") - task_result = AsyncResult(task_id, app=celery) - result = { + task = AsyncResult(task_id, app=celery) + response = { "id": task_id, - "status": task_result.status, - "result": task_result.result + "status": task.status, + "result": task.result } try: - if task_result.result and "error" in task_result.result: - result["status"] = "FAILURE" + if task.result and "error" in task.result: + response["status"] = "FAILURE" except Exception as e: logger.error(e) logger.error(traceback.format_exc()) - result["status"] = "FAILURE" + response["status"] = "FAILURE" try: - json_result = jsonable_encoder(result, exclude_unset=True) - return JSONResponse(json_result) + return JSONResponse(jsonable_encoder(response, exclude_unset=True)) except Exception as e: logger.error(e) logger.error(traceback.format_exc()) From 00201770ba6c3d63f41485f7359ad4c72df4d992 Mon Sep 17 00:00:00 2001 From: Lilia Kai Date: Wed, 6 Sep 2023 15:54:32 +0200 Subject: [PATCH 2/3] Create archive task returns dict instead of string This will save the task result in redis as a json object instead of a json-encoded string. This makes for a nicer response from get_status and prevents the client having to parse a json string to work with the result. --- src/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker.py b/src/worker.py index 90ab531..8e2d8a7 100644 --- a/src/worker.py +++ b/src/worker.py @@ -43,7 +43,7 @@ def create_archive_task(self, archive_json: str): logger.error(e) logger.error(traceback.format_exc()) return {"error": e} - return result.to_json() + return result.to_dict() @celery.task(name="create_sheet_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0}) From f20dd059280488d9cb270d197756597a31277650 Mon Sep 17 00:00:00 2001 From: Lilia Kai Date: Wed, 6 Sep 2023 13:54:32 +0200 Subject: [PATCH 3/3] Refactor get_status and create_archive_task error handling Raise exceptions instead of returning error messages from the worker in create_arvive_task. This ensures consistency in how the errors are presented on the task result: the Exception will be the result instead of *maybe* being wrapped in an object like {error: Exception}. This lets us simplify error handling in get_status so we have only one try/except block where the error can be returned to the client. --- src/main.py | 28 ++++++++++++++-------------- src/worker.py | 7 +++++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/main.py b/src/main.py index a6a8c46..a253afd 100644 --- a/src/main.py +++ b/src/main.py @@ -103,29 +103,29 @@ def archive_tasks(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth @app.get("/tasks/{task_id}") def get_status(task_id, email = Depends(get_bearer_auth)): - logger.info(f"status check for user {email}") + logger.info(f"status check for user {email} task {task_id}") task = AsyncResult(task_id, app=celery) - response = { - "id": task_id, - "status": task.status, - "result": task.result - } - try: - if task.result and "error" in task.result: - response["status"] = "FAILURE" - except Exception as e: - logger.error(e) - logger.error(traceback.format_exc()) - response["status"] = "FAILURE" try: + if task.status == "FAILURE": + # *FAILURE* The task raised an exception, or has exceeded the retry limit. + # The :attr:`result` attribute then contains the exception raised by the task. + # https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult + raise task.result + + response = { + "id": task_id, + "status": task.status, + "result": task.result + } return JSONResponse(jsonable_encoder(response, exclude_unset=True)) + except Exception as e: logger.error(e) logger.error(traceback.format_exc()) return JSONResponse({ "id": task_id, "status": "FAILURE", - "result": {"error": e} + "result": {"error": str(e)} }) @app.delete("/tasks/{task_id}") diff --git a/src/worker.py b/src/worker.py index 8e2d8a7..b68deb6 100644 --- a/src/worker.py +++ b/src/worker.py @@ -30,7 +30,9 @@ def get_db(): def create_archive_task(self, archive_json: str): archive = schemas.ArchiveCreate.parse_raw(archive_json) - if (em := is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id)): return {"error": em} + invalid = is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id) + if (invalid): + raise Exception(invalid) # marks task FAILED, saves the Exception as reult url = archive.url logger.info(f"{url=} {archive=}") @@ -40,9 +42,10 @@ def create_archive_task(self, archive_json: str): try: insert_result_into_db(result, archive.tags, archive.public, archive.group_id, archive.author_id, self.request.id) except Exception as e: + # Log it, then raise again to store the error as the task result logger.error(e) logger.error(traceback.format_exc()) - return {"error": e} + raise e return result.to_dict()