diff --git a/app/tests/worker/test_worker_main.py b/app/tests/worker/test_worker_main.py index 865d039..d78b9ea 100644 --- a/app/tests/worker/test_worker_main.py +++ b/app/tests/worker/test_worker_main.py @@ -119,7 +119,7 @@ class TestCreateSheetTask: res = create_sheet_task(self.sheet.model_dump_json()) m_args.assert_called_once_with( - "interstellar", True, ["--gsheet_feeder.sheet_id", "123"] + "interstellar", True, ["--gsheet_feeder_db.sheet_id", "123"] ) m_orchestrator.return_value.setup.assert_called_once() m_orchestrator.return_value.feed.assert_called_once() diff --git a/app/worker/main.py b/app/worker/main.py index 7716b6b..9ce377f 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -25,10 +25,7 @@ Redis = get_redis() USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME setup_celery_logger(celery) - -# TODO: these are temporary PATCHES for new aa's functionality -# logger.add("app/worker/worker_log.log", level="DEBUG") -logger.remove = lambda x: print(f"logger.remove({x})") +AA_LOGGER_ID = None # TODO: after release, as it requires updating past entries with sheet_id where tag @@ -41,14 +38,19 @@ logger.remove = lambda x: print(f"logger.remove({x})") retry_kwargs={"max_retries": 1}, ) def create_archive_task(self, archive_json: str): + global AA_LOGGER_ID archive = schemas.ArchiveCreate.model_validate_json(archive_json) # call auto-archiver args = get_orchestrator_args(archive.group_id, False, [archive.url]) + result = None try: orchestrator = ArchivingOrchestrator() + orchestrator.logger_id = AA_LOGGER_ID # ensure single logger orchestrator.setup(args) - result = next(orchestrator.feed()) + AA_LOGGER_ID = orchestrator.logger_id + for orch_res in orchestrator.feed(): + result = orch_res except SystemExit as e: log_error(e, "create_archive_task: SystemExit from AA") except Exception as e: @@ -68,6 +70,7 @@ def create_archive_task(self, archive_json: str): @celery.task(name="create_sheet_task", bind=True) def create_sheet_task(self, sheet_json: str): + global AA_LOGGER_ID sheet = schemas.SubmitSheet.model_validate_json(sheet_json) queue_name = (create_sheet_task.request.delivery_info or {}).get( "routing_key", "unknown" @@ -75,10 +78,12 @@ def create_sheet_task(self, sheet_json: str): logger.info(f"[queue={queue_name}] SHEET START {sheet=}") args = get_orchestrator_args( - sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id] + sheet.group_id, True, ["--gsheet_feeder_db.sheet_id", sheet.sheet_id] ) orchestrator = ArchivingOrchestrator() + orchestrator.logger_id = AA_LOGGER_ID # ensure single logger orchestrator.setup(args) + AA_LOGGER_ID = orchestrator.logger_id stats = {"archived": 0, "failed": 0, "errors": []} try: @@ -128,8 +133,7 @@ def create_sheet_task(self, sheet_json: str): def get_orchestrator_args( group_id: str, orchestrator_for_sheet: bool, cli_args: list = None ) -> list: - if cli_args is None: - cli_args = [] + cli_args.append("--logging.enabled=false") aa_configs = [] with get_db() as session: