diff --git a/src/core/events.py b/src/core/events.py index 8336f7b..3a700af 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -73,7 +73,7 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit) for s in sheets: - task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group=s.group_id).model_dump_json()]).apply_async() + task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async() triggered_jobs.append({"sheet_id": s.id, "task_id": task.id}) logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}") diff --git a/src/core/logging.py b/src/core/logging.py index fe9f905..4929232 100644 --- a/src/core/logging.py +++ b/src/core/logging.py @@ -21,5 +21,6 @@ async def logging_middleware(request: Request, call_next): except Exception as e: from utils.metrics import EXCEPTION_COUNTER EXCEPTION_COUNTER.labels(type=e.__class__.__name__).inc() + logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - {e.__class__.__name__} {e}") log_error(e) raise e \ No newline at end of file diff --git a/src/db/crud.py b/src/db/crud.py index 68989b3..d984a4f 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -50,8 +50,8 @@ def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, lim def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int = 100): return base_query(db).filter(models.Archive.author_id == email).order_by(models.Archive.created_at.desc()).offset(skip).limit(get_limit(limit)).all() - -def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]): +#TODO: rename task to archive +def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]) -> models.Archive: db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id) db_task.tags = tags db_task.urls = urls @@ -234,8 +234,8 @@ def upsert_user_groups(db: Session): # --------------- SHEET -def create_sheet(db: Session, sheet_id: str, sheet_name: str, email: str, group_id: str, frequency: str): - db_sheet = models.Sheet(id=sheet_id, name=sheet_name, author_id=email, group_id=group_id, frequency=frequency) +def create_sheet(db: Session, sheet_id: str, name: str, email: str, group_id: str, frequency: str): + db_sheet = models.Sheet(id=sheet_id, name=name, author_id=email, group_id=group_id, frequency=frequency) db.add(db_sheet) db.commit() db.refresh(db_sheet) @@ -250,7 +250,6 @@ def get_user_sheets(db: Session, email: str) -> list[models.Sheet]: return db.query(models.Sheet).filter(models.Sheet.author_id == email).order_by(models.Sheet.last_url_archived_at.desc()).all() - async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, id_hash: str) -> list[models.Sheet]: result = await db.execute( select(models.Sheet).filter(models.Sheet.frequency == frequency) @@ -288,3 +287,15 @@ def delete_sheet(db: Session, sheet_id: str, email: str) -> bool: db.delete(db_sheet) db.commit() return db_sheet is not None + + +#--- Celery worker tasks + + +def insert_result_into_db(db: Session, archive: schemas.ArchiveCreate) -> models.Archive: + # create and load user, tags, if needed + create_or_get_user(db, archive.author_id) + db_tags = [create_tag(db, tag) for tag in archive.tags] + # insert everything + db_task = create_task(db, task=archive, tags=db_tags, urls=archive.urls) + return db_task \ No newline at end of file diff --git a/src/db/schemas.py b/src/db/schemas.py index 6600c63..850fa3a 100644 --- a/src/db/schemas.py +++ b/src/db/schemas.py @@ -4,22 +4,12 @@ from pydantic import BaseModel from datetime import datetime -class Tag(BaseModel): - id: str - created_at: datetime - - model_config = {"from_attributes": True} - __hash__ = object.__hash__ - class SubmitSheet(BaseModel): - sheet_name: str | None = None - sheet_id: str | None = None - header: int = 1 - public: bool = False + sheet_id: str | None author_id: str | None = None group_id: str | None tags: set[str] | None = set() - columns: dict | None = {} # TODO: implement + columns: dict | None = {} # TODO: implement/remove class SubmitManual(BaseModel): @@ -85,7 +75,7 @@ class ArchiveTrigger(BaseModel): url: Annotated[str, Len(min_length=5)] public: bool = False group_id: Annotated[str, Len(min_length=1)] = "default" - tags: set[Tag] | None = None + tags: set[str] | None = None class ArchiveCreate(ArchiveTrigger): id: str | None = None @@ -107,4 +97,10 @@ class Usage(BaseModel): total_sheets: int = 0 class UsageResponse(Usage): - groups: dict[str, Usage] \ No newline at end of file + groups: dict[str, Usage] + +class CelerySheetTask(BaseModel): + success: bool + sheet_id: str + time: datetime + stats: dict \ No newline at end of file diff --git a/src/endpoints/task.py b/src/endpoints/task.py index 0c7f1e3..bacdb31 100644 --- a/src/endpoints/task.py +++ b/src/endpoints/task.py @@ -18,7 +18,6 @@ celery = get_celery() @task_router.get("/{task_id}", summary="Check the status of an async task by its id, works for URLs and Sheet tasks.") def get_status(task_id, email=Depends(get_token_or_user_auth)) -> schemas.TaskResult: - logger.info(f"status check for user {email} task {task_id}") task = AsyncResult(task_id, app=celery) try: if task.status == "FAILURE": diff --git a/src/shared/__init__.py b/src/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/worker/test_worker_main.py b/src/tests/worker/test_worker_main.py index 3550173..f82a80c 100644 --- a/src/tests/worker/test_worker_main.py +++ b/src/tests/worker/test_worker_main.py @@ -64,7 +64,7 @@ class Test_create_archive_task(): class Test_create_sheet_task(): URL = "https://example-live.com" - sheet = schemas.SubmitSheet(sheet_name="Sheet", sheet_id="123", author_id="rick@example.com", group_id=None) + sheet = schemas.SubmitSheet(sheet_id="123", author_id="rick@example.com", group_id=None) # @patch("worker.main.insert_result_into_db") @patch("worker.main.models.generate_uuid", return_value="constant-uuid") @@ -82,7 +82,6 @@ class Test_create_sheet_task(): m_orch_generator.return_value = m_orch res = create_sheet_task(self.sheet.model_dump_json()) - print(res) assert res["archived"] == 1 assert res["failed"] == 0 assert len(res["errors"]) == 0 diff --git a/src/web/main.py b/src/web/main.py index 8b721f9..ee6d192 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -142,8 +142,6 @@ def app_factory(settings = get_settings()): def archive_sheet(sheet: schemas.SubmitSheet, email=Depends(get_user_auth), db: Session = Depends(get_db_dependency)): logger.info(f"SHEET TASK for {sheet=}") sheet.author_id = email - if not sheet.sheet_name and not sheet.sheet_id: - raise HTTPException(status_code=422, detail=f"sheet name or id is required") if not crud.is_user_in_group(db, email, sheet.group_id): raise HTTPException(status_code=403, detail="User does not have access to this group.") task = celery.signature("create_sheet_task", args=[sheet.model_dump_json()]).delay() @@ -151,11 +149,9 @@ def app_factory(settings = get_settings()): @app.post("/sheet_service", status_code=201, deprecated=True) # DEPRECATED - def archive_sheet_service(sheet: schemas.SubmitSheet, auth=Depends(token_api_key_auth), db: Session = Depends(get_db_dependency)): + def archive_sheet_service(sheet: schemas.SubmitSheet, auth=Depends(token_api_key_auth)): logger.info(f"SHEET TASK for {sheet=}") sheet.author_id = sheet.author_id or "api-endpoint" - if not sheet.sheet_name and not sheet.sheet_id: - raise HTTPException(status_code=422, detail=f"sheet name or id is required") task = celery.signature("create_sheet_task", args=[sheet.model_dump_json()]).delay() return JSONResponse({"id": task.id}) diff --git a/src/worker/main.py b/src/worker/main.py index 27b261a..940158b 100644 --- a/src/worker/main.py +++ b/src/worker/main.py @@ -1,6 +1,6 @@ -import traceback, yaml, datetime -from typing import List, Set +import traceback, datetime +from typing import List from celery.signals import task_failure from auto_archiver import Config, ArchivingOrchestrator, Metadata @@ -23,6 +23,8 @@ Redis = get_redis() USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME +# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags + @celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0}) def create_archive_task(self, archive_json: str): @@ -32,35 +34,39 @@ def create_archive_task(self, archive_json: str): # call auto-archiver orchestrator = load_orchestrator(archive.group_id) result = orchestrator.feed_item(Metadata().set_url(archive.url)) - - # prepare for DB assert result, f"UNABLE TO archive: {archive.url}" + + # prepare and insert in DB archive.id = self.request.id archive.urls = get_all_urls(result) archive.result = json.loads(result.to_json()) - insert_result_into_db(archive) - return archive.result.to_dict() # TODO: is return used? + + return archive.result @celery.task(name="create_sheet_task", bind=True) def create_sheet_task(self, sheet_json: str): sheet = schemas.SubmitSheet.model_validate_json(sheet_json) - sheet.tags.add("gsheet") logger.info(f"SHEET START {sheet=}") - # TODO: drop sheet_name and use only sheet_id (new endpoints/models) - orchestrator = load_orchestrator(sheet.group_id, {"configurations": {"gsheet_feeder": {"sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "header": sheet.header}}}) + orchestrator = load_orchestrator(sheet.group_id, True, {"configurations": {"gsheet_feeder": {"sheet_id": sheet.sheet_id}}}) stats = {"archived": 0, "failed": 0, "errors": []} for result in orchestrator.feed(): - if not result: - logger.error("Got empty result from feeder, an internal error must have occurred.") - continue try: - # TODO: remove public from sheet in new refactor - #TODO: use new insert_result_into_db - insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid(), sheet.sheet_id) + assert result, f"UNABLE TO archive: {result.get_url()}" + archive = schemas.ArchiveCreate( + author_id=sheet.author_id, + url=result.get_url(), + group_id=sheet.group_id, + tags=sheet.tags, + id=models.generate_uuid(), + result=json.loads(result.to_json()), + sheet_id=sheet.sheet_id, + urls=get_all_urls(result) + ) + insert_result_into_db(archive) stats["archived"] += 1 except exc.IntegrityError as e: logger.warning(f"cached result detected: {e}") @@ -75,33 +81,17 @@ def create_sheet_task(self, sheet_json: str): crud.update_sheet_last_url_archived_at(session, sheet.sheet_id) logger.info(f"SHEET DONE {sheet=}") - # TODO: use data model - return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat(), **stats} + # TODO: is this used anywhere? maybe drop it + return schemas.CelerySheetTask(success=True, sheet_id=sheet.sheet_id, time=datetime.datetime.now().isoformat(), stats=stats).model_dump() -@task_failure.connect(sender=create_sheet_task) -@task_failure.connect(sender=create_archive_task) -def task_failure_notifier(sender, **kwargs): - # automatically capture exceptions in the worker tasks - logger.warning(f"⚠️ worker task failed: {sender.name}") - traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))) - log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}") - redis_publish_exception(kwargs['exception'], sender.name, traceback_msg) - - -def read_user_groups(): - # read yaml safely - with open(USER_GROUPS_FILENAME) as inf: - try: - return yaml.safe_load(inf) - except yaml.YAMLError as e: - logger.error(f"could not open user groups filename {USER_GROUPS_FILENAME}: {e}") - raise e - - -def load_orchestrator(group_id: str, overwrite_configs: dict = {}) -> ArchivingOrchestrator: +def load_orchestrator(group_id: str, orchestrator_for_sheet: bool = False, overwrite_configs: dict = {}) -> ArchivingOrchestrator: with get_db() as session: - orchestrator_fn = crud.get_group(session, group_id).orchestrator + group = crud.get_group(session, group_id) + if orchestrator_for_sheet: + orchestrator_fn = group.orchestrator_sheet + else: + orchestrator_fn = crud.get_group(session, group_id).orchestrator assert orchestrator_fn, f"no orchestrator found for {group_id}" config = Config() @@ -111,29 +101,11 @@ def load_orchestrator(group_id: str, overwrite_configs: dict = {}) -> ArchivingO def insert_result_into_db(archive: schemas.ArchiveCreate) -> str: with get_db() as session: - # create and load user, tags, if needed - crud.create_or_get_user(session, archive.author_id) - db_tags = [crud.create_tag(session, tag) for tag in archive.tags] - # insert everything - db_task = crud.create_task(session, task=archive, tags=db_tags, urls=archive.urls) - logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})") + db_task = crud.insert_result_into_db(session, archive) + logger.debug(f"[ARCHIVE STORED] {db_task.author_id} {db_task.url}") return db_task.id -def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id: str, sheet_id: str = "") -> str: - logger.info(f"INSERTING {public=} {group_id=} {author_id=} {tags=} into {task_id}") - assert result, f"UNABLE TO archive: {result.get_url() if result else result}" - with get_db() as session: - # urls are created by get_all_urls - # create author_id if needed - crud.create_or_get_user(session, author_id) - # create DB TAGs if needed - db_tags = [crud.create_tag(session, tag) for tag in tags] - # insert archive - db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id, sheet_id=sheet_id), tags=db_tags, urls=get_all_urls(result)) - logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})") - return db_task.id - # TODO: this should live within the auto-archiver def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: db_urls = [] @@ -166,4 +138,14 @@ def redis_publish_exception(exception, task_name, traceback: str = ""): exception_data = {"task": task_name, "type": exception.__class__.__name__, "exception": exception, "traceback": traceback} Redis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str)) except Exception as e: - log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") \ No newline at end of file + log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") + + +@task_failure.connect(sender=create_sheet_task) +@task_failure.connect(sender=create_archive_task) +def task_failure_notifier(sender, **kwargs): + # automatically capture exceptions in the worker tasks + logger.warning(f"⚠️ worker task failed: {sender.name}") + traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))) + log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}") + redis_publish_exception(kwargs['exception'], sender.name, traceback_msg)