updates worker to use AA 1.0.0

This commit is contained in:
msramalho
2025-04-03 15:14:43 +01:00
parent 6c42d7b447
commit c49cec1b6c
2 changed files with 13 additions and 9 deletions

View File

@@ -119,7 +119,7 @@ class TestCreateSheetTask:
res = create_sheet_task(self.sheet.model_dump_json()) res = create_sheet_task(self.sheet.model_dump_json())
m_args.assert_called_once_with( m_args.assert_called_once_with(
"interstellar", True, ["--gsheet_feeder.sheet_id", "123"] "interstellar", True, ["--gsheet_feeder_db.sheet_id", "123"]
) )
m_orchestrator.return_value.setup.assert_called_once() m_orchestrator.return_value.setup.assert_called_once()
m_orchestrator.return_value.feed.assert_called_once() m_orchestrator.return_value.feed.assert_called_once()

View File

@@ -25,10 +25,7 @@ Redis = get_redis()
USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME
setup_celery_logger(celery) setup_celery_logger(celery)
AA_LOGGER_ID = None
# TODO: these are temporary PATCHES for new aa's functionality
# logger.add("app/worker/worker_log.log", level="DEBUG")
logger.remove = lambda x: print(f"logger.remove({x})")
# TODO: after release, as it requires updating past entries with sheet_id where tag # TODO: after release, as it requires updating past entries with sheet_id where tag
@@ -41,14 +38,19 @@ logger.remove = lambda x: print(f"logger.remove({x})")
retry_kwargs={"max_retries": 1}, retry_kwargs={"max_retries": 1},
) )
def create_archive_task(self, archive_json: str): def create_archive_task(self, archive_json: str):
global AA_LOGGER_ID
archive = schemas.ArchiveCreate.model_validate_json(archive_json) archive = schemas.ArchiveCreate.model_validate_json(archive_json)
# call auto-archiver # call auto-archiver
args = get_orchestrator_args(archive.group_id, False, [archive.url]) args = get_orchestrator_args(archive.group_id, False, [archive.url])
result = None
try: try:
orchestrator = ArchivingOrchestrator() orchestrator = ArchivingOrchestrator()
orchestrator.logger_id = AA_LOGGER_ID # ensure single logger
orchestrator.setup(args) orchestrator.setup(args)
result = next(orchestrator.feed()) AA_LOGGER_ID = orchestrator.logger_id
for orch_res in orchestrator.feed():
result = orch_res
except SystemExit as e: except SystemExit as e:
log_error(e, "create_archive_task: SystemExit from AA") log_error(e, "create_archive_task: SystemExit from AA")
except Exception as e: except Exception as e:
@@ -68,6 +70,7 @@ def create_archive_task(self, archive_json: str):
@celery.task(name="create_sheet_task", bind=True) @celery.task(name="create_sheet_task", bind=True)
def create_sheet_task(self, sheet_json: str): def create_sheet_task(self, sheet_json: str):
global AA_LOGGER_ID
sheet = schemas.SubmitSheet.model_validate_json(sheet_json) sheet = schemas.SubmitSheet.model_validate_json(sheet_json)
queue_name = (create_sheet_task.request.delivery_info or {}).get( queue_name = (create_sheet_task.request.delivery_info or {}).get(
"routing_key", "unknown" "routing_key", "unknown"
@@ -75,10 +78,12 @@ def create_sheet_task(self, sheet_json: str):
logger.info(f"[queue={queue_name}] SHEET START {sheet=}") logger.info(f"[queue={queue_name}] SHEET START {sheet=}")
args = get_orchestrator_args( args = get_orchestrator_args(
sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id] sheet.group_id, True, ["--gsheet_feeder_db.sheet_id", sheet.sheet_id]
) )
orchestrator = ArchivingOrchestrator() orchestrator = ArchivingOrchestrator()
orchestrator.logger_id = AA_LOGGER_ID # ensure single logger
orchestrator.setup(args) orchestrator.setup(args)
AA_LOGGER_ID = orchestrator.logger_id
stats = {"archived": 0, "failed": 0, "errors": []} stats = {"archived": 0, "failed": 0, "errors": []}
try: try:
@@ -128,8 +133,7 @@ def create_sheet_task(self, sheet_json: str):
def get_orchestrator_args( def get_orchestrator_args(
group_id: str, orchestrator_for_sheet: bool, cli_args: list = None group_id: str, orchestrator_for_sheet: bool, cli_args: list = None
) -> list: ) -> list:
if cli_args is None: cli_args.append("--logging.enabled=false")
cli_args = []
aa_configs = [] aa_configs = []
with get_db() as session: with get_db() as session: