updates AA adds missing dependencies

This commit is contained in:
msramalho
2025-02-15 23:14:53 +00:00
parent 00cdec92f9
commit 1497485612
6 changed files with 383 additions and 1139 deletions

View File

@@ -4,8 +4,8 @@ import traceback, datetime
from celery.signals import task_failure
from loguru import logger
from sqlalchemy import exc
from auto_archiver import Config, ArchivingOrchestrator, Metadata
import auto_archiver
from auto_archiver.core.orchestrator import ArchivingOrchestrator
from app.shared.db import models
from app.shared.db.database import get_db
@@ -16,7 +16,6 @@ from app.shared.log import log_error
from app.shared.aa_utils import get_all_urls
from app.shared.db import worker_crud
settings = get_settings()
celery = get_celery("worker")
@@ -24,19 +23,30 @@ Redis = get_redis()
USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME
# PATCHES for new aa's functionality
# logger.add("app/worker/worker_log.log", level="DEBUG")
logger.remove = lambda x: print(f"logger.remove({x})")
# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags
@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0})
def create_archive_task(self, archive_json: str):
archive = schemas.ArchiveCreate.model_validate_json(archive_json)
# call auto-archiver
orchestrator = load_orchestrator(archive.group_id)
result = orchestrator.feed_item(Metadata().set_url(archive.url))
args = get_orchestrator_args(archive.group_id, False, [archive.url])
# args = get_orchestrator_args(archive.group_id, False, [archive.url, "--extractors", "generic_extractor"])
logger.error(args)
try:
result = next(ArchivingOrchestrator().run(args), None)
except SystemExit as e:
log_error(e, f"create_archive_task: SystemExit from AA")
except Exception as e:
log_error(e, f"create_archive_task")
raise e
assert result, f"UNABLE TO archive: {archive.url}"
# prepare and insert in DB
store_until = get_store_until(archive.group_id)
archive.store_until = store_until
archive.store_until = get_store_until(archive.group_id)
archive.id = self.request.id
archive.urls = get_all_urls(result)
archive.result = json.loads(result.to_json())
@@ -51,32 +61,36 @@ def create_sheet_task(self, sheet_json: str):
queue_name = (create_sheet_task.request.delivery_info or {}).get('routing_key', 'unknown')
logger.info(f"[queue={queue_name}] SHEET START {sheet=}")
orchestrator = load_orchestrator(sheet.group_id, True, {"configurations": {"gsheet_feeder": {"sheet_id": sheet.sheet_id}}})
args = get_orchestrator_args(sheet.group_id, True, ["--gsheet_feeder.sheet_id", sheet.sheet_id])
stats = {"archived": 0, "failed": 0, "errors": []}
for result in orchestrator.feed():
try:
assert result, f"UNABLE TO archive: {result.get_url()}"
archive = schemas.ArchiveCreate(
author_id=sheet.author_id,
url=result.get_url(),
group_id=sheet.group_id,
tags=sheet.tags,
id=models.generate_uuid(),
result=json.loads(result.to_json()),
sheet_id=sheet.sheet_id,
urls=get_all_urls(result),
store_until = get_store_until(sheet.group_id)
)
insert_result_into_db(archive)
stats["archived"] += 1
except exc.IntegrityError as e:
logger.warning(f"cached result detected: {e}")
except Exception as e:
log_error(e, extra=f"{self.name}: {sheet_json}")
redis_publish_exception(e, self.name, traceback.format_exc())
stats["failed"] += 1
stats["errors"].append(str(e))
try:
for result in ArchivingOrchestrator().run(args):
try:
assert result, f"ERROR archiving URL for sheet {sheet.sheet_id}"
archive = schemas.ArchiveCreate(
author_id=sheet.author_id,
url=result.get_url(),
group_id=sheet.group_id,
tags=sheet.tags,
id=models.generate_uuid(),
result=json.loads(result.to_json()),
sheet_id=sheet.sheet_id,
urls=get_all_urls(result),
store_until=get_store_until(sheet.group_id)
)
insert_result_into_db(archive)
stats["archived"] += 1
except exc.IntegrityError as e:
logger.warning(f"cached result detected: {e}")
except Exception as e:
log_error(e, extra=f"{self.name}: {sheet_json}")
redis_publish_exception(e, self.name, traceback.format_exc())
stats["failed"] += 1
stats["errors"].append(str(e))
except SystemExit as e:
log_error(e, f"create_sheet_task: SystemExit from AA")
if stats["archived"] > 0:
with get_db() as session:
@@ -87,7 +101,8 @@ def create_sheet_task(self, sheet_json: str):
return schemas.CelerySheetTask(success=True, sheet_id=sheet.sheet_id, time=datetime.datetime.now().isoformat(), stats=stats).model_dump()
def load_orchestrator(group_id: str, orchestrator_for_sheet: bool = False, overwrite_configs: dict = {}) -> ArchivingOrchestrator:
def get_orchestrator_args(group_id: str, orchestrator_for_sheet: bool, cli_args: list = []) -> list:
aa_configs = []
with get_db() as session:
group = worker_crud.get_group(session, group_id)
if orchestrator_for_sheet:
@@ -95,11 +110,9 @@ def load_orchestrator(group_id: str, orchestrator_for_sheet: bool = False, overw
else:
orchestrator_fn = worker_crud.get_group(session, group_id).orchestrator
assert orchestrator_fn, f"no orchestrator found for {group_id}"
config = Config()
config.parse(use_cli=False, yaml_config_filename=orchestrator_fn, overwrite_configs=overwrite_configs)
return ArchivingOrchestrator(config)
aa_configs.extend(["--config", orchestrator_fn])
aa_configs.extend(cli_args)
return aa_configs
def insert_result_into_db(archive: schemas.ArchiveCreate) -> str:
@@ -108,10 +121,12 @@ def insert_result_into_db(archive: schemas.ArchiveCreate) -> str:
logger.debug(f"[ARCHIVE STORED] {db_task.author_id} {db_task.url}")
return db_task.id
def get_store_until(group_id: str) -> datetime.datetime:
with get_db() as session:
return business_logic.get_store_archive_until(session, group_id)
def redis_publish_exception(exception, task_name, traceback: str = ""):
REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL
try: