diff --git a/app/worker/main.py b/app/worker/main.py index 7b4826b..f7b2915 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -30,8 +30,16 @@ setup_celery_logger(celery) # logger.add("app/worker/worker_log.log", level="DEBUG") logger.remove = lambda x: print(f"logger.remove({x})") -# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags -@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 1}) + +# TODO: after release, as it requires updating past entries with sheet_id where tag +# is used, drop tags +@celery.task( + name="create_archive_task", + bind=True, + autoretry_for=(Exception,), + retry_backoff=True, + retry_kwargs={"max_retries": 1}, +) def create_archive_task(self, archive_json: str): archive = schemas.ArchiveCreate.model_validate_json(archive_json) @@ -42,9 +50,9 @@ def create_archive_task(self, archive_json: str): orchestrator.setup(args) result = next(orchestrator.feed()) except SystemExit as e: - log_error(e, f"create_archive_task: SystemExit from AA") + log_error(e, "create_archive_task: SystemExit from AA") except Exception as e: - log_error(e, f"create_archive_task") + log_error(e, "create_archive_task") raise e assert result, f"UNABLE TO archive: {archive.url}" @@ -61,10 +69,14 @@ def create_archive_task(self, archive_json: str): @celery.task(name="create_sheet_task", bind=True) def create_sheet_task(self, sheet_json: str): sheet = schemas.SubmitSheet.model_validate_json(sheet_json) - queue_name = (create_sheet_task.request.delivery_info or {}).get('routing_key', 'unknown') + queue_name = (create_sheet_task.request.delivery_info or {}).get( + "routing_key", "unknown" + ) logger.info(f"[queue={queue_name}] SHEET START {sheet=}") - args = get_orchestrator_args(sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id]) + args = get_orchestrator_args( + sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id] + ) orchestrator = ArchivingOrchestrator() orchestrator.setup(args) @@ -82,7 +94,7 @@ def create_sheet_task(self, sheet_json: str): result=json.loads(result.to_json()), sheet_id=sheet.sheet_id, urls=get_all_urls(result), - store_until=get_store_until(sheet.group_id) + store_until=get_store_until(sheet.group_id), ) insert_result_into_db(archive) stats["archived"] += 1 @@ -95,25 +107,39 @@ def create_sheet_task(self, sheet_json: str): stats["errors"].append(str(e)) except SystemExit as e: - log_error(e, f"create_sheet_task: SystemExit from AA") + log_error(e, "create_sheet_task: SystemExit from AA") if stats["archived"] > 0: with get_db() as session: - worker_crud.update_sheet_last_url_archived_at(session, sheet.sheet_id) + worker_crud.update_sheet_last_url_archived_at( + session, sheet.sheet_id + ) logger.info(f"SHEET DONE {sheet=}") # TODO: is this used anywhere? maybe drop it - return schemas.CelerySheetTask(success=True, sheet_id=sheet.sheet_id, time=datetime.datetime.now().isoformat(), stats=stats).model_dump() + return schemas.CelerySheetTask( + success=True, + sheet_id=sheet.sheet_id, + time=datetime.datetime.now().isoformat(), + stats=stats, + ).model_dump() -def get_orchestrator_args(group_id: str, orchestrator_for_sheet: bool, cli_args: list = []) -> list: +def get_orchestrator_args( + group_id: str, orchestrator_for_sheet: bool, cli_args: list = None +) -> list: + if cli_args is None: + cli_args = [] + aa_configs = [] with get_db() as session: group = worker_crud.get_group(session, group_id) if orchestrator_for_sheet: orchestrator_fn = group.orchestrator_sheet else: - orchestrator_fn = worker_crud.get_group(session, group_id).orchestrator + orchestrator_fn = worker_crud.get_group( + session, group_id + ).orchestrator assert orchestrator_fn, f"no orchestrator found for {group_id}" aa_configs.extend(["--config", orchestrator_fn]) aa_configs.extend(cli_args) @@ -123,7 +149,9 @@ def get_orchestrator_args(group_id: str, orchestrator_for_sheet: bool, cli_args: def insert_result_into_db(archive: schemas.ArchiveCreate) -> str: with get_db() as session: db_archive = worker_crud.store_archived_url(session, archive) - logger.debug(f"[ARCHIVE STORED] {db_archive.author_id} {db_archive.url}") + logger.debug( + f"[ARCHIVE STORED] {db_archive.author_id} {db_archive.url}" + ) return db_archive.id @@ -132,13 +160,22 @@ def get_store_until(group_id: str) -> datetime.datetime: return business_logic.get_store_archive_until(session, group_id) -def redis_publish_exception(exception, task_name, traceback: str = ""): +def redis_publish_exception(exception, task_name, trace_back: str = ""): REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL try: - exception_data = {"task": task_name, "type": exception.__class__.__name__, "exception": exception, "traceback": traceback} - Redis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str)) + exception_data = { + "task": task_name, + "type": exception.__class__.__name__, + "exception": exception, + "traceback": trace_back, + } + Redis.publish( + REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str) + ) except Exception as e: - log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") + log_error( + e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}" + ) @task_failure.connect(sender=create_sheet_task) @@ -146,6 +183,10 @@ def redis_publish_exception(exception, task_name, traceback: str = ""): def task_failure_notifier(sender, **kwargs): # automatically capture exceptions in the worker tasks logger.warning(f"⚠️ worker task failed: {sender.name}") - traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))) - log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}") - redis_publish_exception(kwargs['exception'], sender.name, traceback_msg) + traceback_msg = "\n".join( + traceback.format_list(traceback.extract_tb(kwargs["traceback"])) + ) + log_error( + kwargs["exception"], traceback_msg, f"task_failure: {sender.name}" + ) + redis_publish_exception(kwargs["exception"], sender.name, traceback_msg) diff --git a/app/worker/worker_log.py b/app/worker/worker_log.py index 022c63d..c1c8dcc 100644 --- a/app/worker/worker_log.py +++ b/app/worker/worker_log.py @@ -1,6 +1,5 @@ import sys -from celery import Celery from loguru import logger from app.shared.task_messaging import get_celery @@ -8,24 +7,32 @@ from app.shared.task_messaging import get_celery celery = get_celery("worker") -def setup_celery_logger(celery): - # Remove Celery's default handlers to prevent duplicate logs - celery_logger = celery.log.get_default_logger() - for handler in celery_logger.handlers[:]: - celery_logger.removeHandler(handler) - # Set up Loguru logging - logger.add("logs/celery_logs.log", retention="30 days", level="DEBUG") - logger.add("logs/celery_error_logs.log", retention="30 days", level="ERROR") +def setup_celery_logger(c): + # Remove Celery's default handlers to prevent duplicate logs + celery_logger = c.log.get_default_logger() + for handler in celery_logger.handlers[:]: + celery_logger.removeHandler(handler) - # Redirect Celery logs to Loguru - class InterceptHandler: - def write(self, message): - if message.strip(): - logger.info(message.strip()) - # Required to prevent issues with buffered output - def flush(self): pass - def isatty(self): return False + # Set up Loguru logging + logger.add("logs/celery_logs.log", retention="30 days", level="DEBUG") + logger.add("logs/celery_error_logs.log", retention="30 days", level="ERROR") - sys.stdout = InterceptHandler() - sys.stderr = InterceptHandler() + # Redirect Celery logs to Loguru + class InterceptHandler: + @staticmethod + def write(message): + if message.strip(): + logger.info(message.strip()) + + # Required to prevent issues with buffered output + @staticmethod + def flush(): + pass + + @staticmethod + def isatty(): + return False + + sys.stdout = InterceptHandler() + sys.stderr = InterceptHandler()