mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-08 03:28:35 +03:00
Format and lint workers directory (#60)
This commit is contained in:
@@ -30,8 +30,16 @@ setup_celery_logger(celery)
|
||||
# logger.add("app/worker/worker_log.log", level="DEBUG")
|
||||
logger.remove = lambda x: print(f"logger.remove({x})")
|
||||
|
||||
# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags
|
||||
@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 1})
|
||||
|
||||
# TODO: after release, as it requires updating past entries with sheet_id where tag
|
||||
# is used, drop tags
|
||||
@celery.task(
|
||||
name="create_archive_task",
|
||||
bind=True,
|
||||
autoretry_for=(Exception,),
|
||||
retry_backoff=True,
|
||||
retry_kwargs={"max_retries": 1},
|
||||
)
|
||||
def create_archive_task(self, archive_json: str):
|
||||
archive = schemas.ArchiveCreate.model_validate_json(archive_json)
|
||||
|
||||
@@ -42,9 +50,9 @@ def create_archive_task(self, archive_json: str):
|
||||
orchestrator.setup(args)
|
||||
result = next(orchestrator.feed())
|
||||
except SystemExit as e:
|
||||
log_error(e, f"create_archive_task: SystemExit from AA")
|
||||
log_error(e, "create_archive_task: SystemExit from AA")
|
||||
except Exception as e:
|
||||
log_error(e, f"create_archive_task")
|
||||
log_error(e, "create_archive_task")
|
||||
raise e
|
||||
assert result, f"UNABLE TO archive: {archive.url}"
|
||||
|
||||
@@ -61,10 +69,14 @@ def create_archive_task(self, archive_json: str):
|
||||
@celery.task(name="create_sheet_task", bind=True)
|
||||
def create_sheet_task(self, sheet_json: str):
|
||||
sheet = schemas.SubmitSheet.model_validate_json(sheet_json)
|
||||
queue_name = (create_sheet_task.request.delivery_info or {}).get('routing_key', 'unknown')
|
||||
queue_name = (create_sheet_task.request.delivery_info or {}).get(
|
||||
"routing_key", "unknown"
|
||||
)
|
||||
logger.info(f"[queue={queue_name}] SHEET START {sheet=}")
|
||||
|
||||
args = get_orchestrator_args(sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id])
|
||||
args = get_orchestrator_args(
|
||||
sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id]
|
||||
)
|
||||
orchestrator = ArchivingOrchestrator()
|
||||
orchestrator.setup(args)
|
||||
|
||||
@@ -82,7 +94,7 @@ def create_sheet_task(self, sheet_json: str):
|
||||
result=json.loads(result.to_json()),
|
||||
sheet_id=sheet.sheet_id,
|
||||
urls=get_all_urls(result),
|
||||
store_until=get_store_until(sheet.group_id)
|
||||
store_until=get_store_until(sheet.group_id),
|
||||
)
|
||||
insert_result_into_db(archive)
|
||||
stats["archived"] += 1
|
||||
@@ -95,25 +107,39 @@ def create_sheet_task(self, sheet_json: str):
|
||||
stats["errors"].append(str(e))
|
||||
|
||||
except SystemExit as e:
|
||||
log_error(e, f"create_sheet_task: SystemExit from AA")
|
||||
log_error(e, "create_sheet_task: SystemExit from AA")
|
||||
|
||||
if stats["archived"] > 0:
|
||||
with get_db() as session:
|
||||
worker_crud.update_sheet_last_url_archived_at(session, sheet.sheet_id)
|
||||
worker_crud.update_sheet_last_url_archived_at(
|
||||
session, sheet.sheet_id
|
||||
)
|
||||
|
||||
logger.info(f"SHEET DONE {sheet=}")
|
||||
# TODO: is this used anywhere? maybe drop it
|
||||
return schemas.CelerySheetTask(success=True, sheet_id=sheet.sheet_id, time=datetime.datetime.now().isoformat(), stats=stats).model_dump()
|
||||
return schemas.CelerySheetTask(
|
||||
success=True,
|
||||
sheet_id=sheet.sheet_id,
|
||||
time=datetime.datetime.now().isoformat(),
|
||||
stats=stats,
|
||||
).model_dump()
|
||||
|
||||
|
||||
def get_orchestrator_args(group_id: str, orchestrator_for_sheet: bool, cli_args: list = []) -> list:
|
||||
def get_orchestrator_args(
|
||||
group_id: str, orchestrator_for_sheet: bool, cli_args: list = None
|
||||
) -> list:
|
||||
if cli_args is None:
|
||||
cli_args = []
|
||||
|
||||
aa_configs = []
|
||||
with get_db() as session:
|
||||
group = worker_crud.get_group(session, group_id)
|
||||
if orchestrator_for_sheet:
|
||||
orchestrator_fn = group.orchestrator_sheet
|
||||
else:
|
||||
orchestrator_fn = worker_crud.get_group(session, group_id).orchestrator
|
||||
orchestrator_fn = worker_crud.get_group(
|
||||
session, group_id
|
||||
).orchestrator
|
||||
assert orchestrator_fn, f"no orchestrator found for {group_id}"
|
||||
aa_configs.extend(["--config", orchestrator_fn])
|
||||
aa_configs.extend(cli_args)
|
||||
@@ -123,7 +149,9 @@ def get_orchestrator_args(group_id: str, orchestrator_for_sheet: bool, cli_args:
|
||||
def insert_result_into_db(archive: schemas.ArchiveCreate) -> str:
|
||||
with get_db() as session:
|
||||
db_archive = worker_crud.store_archived_url(session, archive)
|
||||
logger.debug(f"[ARCHIVE STORED] {db_archive.author_id} {db_archive.url}")
|
||||
logger.debug(
|
||||
f"[ARCHIVE STORED] {db_archive.author_id} {db_archive.url}"
|
||||
)
|
||||
return db_archive.id
|
||||
|
||||
|
||||
@@ -132,13 +160,22 @@ def get_store_until(group_id: str) -> datetime.datetime:
|
||||
return business_logic.get_store_archive_until(session, group_id)
|
||||
|
||||
|
||||
def redis_publish_exception(exception, task_name, traceback: str = ""):
|
||||
def redis_publish_exception(exception, task_name, trace_back: str = ""):
|
||||
REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL
|
||||
try:
|
||||
exception_data = {"task": task_name, "type": exception.__class__.__name__, "exception": exception, "traceback": traceback}
|
||||
Redis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str))
|
||||
exception_data = {
|
||||
"task": task_name,
|
||||
"type": exception.__class__.__name__,
|
||||
"exception": exception,
|
||||
"traceback": trace_back,
|
||||
}
|
||||
Redis.publish(
|
||||
REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str)
|
||||
)
|
||||
except Exception as e:
|
||||
log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
|
||||
log_error(
|
||||
e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}"
|
||||
)
|
||||
|
||||
|
||||
@task_failure.connect(sender=create_sheet_task)
|
||||
@@ -146,6 +183,10 @@ def redis_publish_exception(exception, task_name, traceback: str = ""):
|
||||
def task_failure_notifier(sender, **kwargs):
|
||||
# automatically capture exceptions in the worker tasks
|
||||
logger.warning(f"⚠️ worker task failed: {sender.name}")
|
||||
traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))
|
||||
log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}")
|
||||
redis_publish_exception(kwargs['exception'], sender.name, traceback_msg)
|
||||
traceback_msg = "\n".join(
|
||||
traceback.format_list(traceback.extract_tb(kwargs["traceback"]))
|
||||
)
|
||||
log_error(
|
||||
kwargs["exception"], traceback_msg, f"task_failure: {sender.name}"
|
||||
)
|
||||
redis_publish_exception(kwargs["exception"], sender.name, traceback_msg)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import sys
|
||||
|
||||
from celery import Celery
|
||||
from loguru import logger
|
||||
|
||||
from app.shared.task_messaging import get_celery
|
||||
@@ -8,24 +7,32 @@ from app.shared.task_messaging import get_celery
|
||||
|
||||
celery = get_celery("worker")
|
||||
|
||||
def setup_celery_logger(celery):
|
||||
# Remove Celery's default handlers to prevent duplicate logs
|
||||
celery_logger = celery.log.get_default_logger()
|
||||
for handler in celery_logger.handlers[:]:
|
||||
celery_logger.removeHandler(handler)
|
||||
|
||||
# Set up Loguru logging
|
||||
logger.add("logs/celery_logs.log", retention="30 days", level="DEBUG")
|
||||
logger.add("logs/celery_error_logs.log", retention="30 days", level="ERROR")
|
||||
def setup_celery_logger(c):
|
||||
# Remove Celery's default handlers to prevent duplicate logs
|
||||
celery_logger = c.log.get_default_logger()
|
||||
for handler in celery_logger.handlers[:]:
|
||||
celery_logger.removeHandler(handler)
|
||||
|
||||
# Redirect Celery logs to Loguru
|
||||
class InterceptHandler:
|
||||
def write(self, message):
|
||||
if message.strip():
|
||||
logger.info(message.strip())
|
||||
# Required to prevent issues with buffered output
|
||||
def flush(self): pass
|
||||
def isatty(self): return False
|
||||
# Set up Loguru logging
|
||||
logger.add("logs/celery_logs.log", retention="30 days", level="DEBUG")
|
||||
logger.add("logs/celery_error_logs.log", retention="30 days", level="ERROR")
|
||||
|
||||
sys.stdout = InterceptHandler()
|
||||
sys.stderr = InterceptHandler()
|
||||
# Redirect Celery logs to Loguru
|
||||
class InterceptHandler:
|
||||
@staticmethod
|
||||
def write(message):
|
||||
if message.strip():
|
||||
logger.info(message.strip())
|
||||
|
||||
# Required to prevent issues with buffered output
|
||||
@staticmethod
|
||||
def flush():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def isatty():
|
||||
return False
|
||||
|
||||
sys.stdout = InterceptHandler()
|
||||
sys.stderr = InterceptHandler()
|
||||
|
||||
Reference in New Issue
Block a user