diff --git a/src/db/crud.py b/src/db/crud.py index 35d7abe..266de09 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -44,7 +44,6 @@ def base_query(db:Session): .filter(models.Archive.deleted == False) ## --------------- TAG - def create_tag(db: Session, tag: str): db_tag = db.query(models.Tag).filter(models.Tag.id==tag).first() if not db_tag: @@ -68,6 +67,16 @@ def get_user_groups(db: Session, email:str): ## --------------- INIT User-Groups + +def get_user(db: Session, author_id: str): + db_user = db.query(models.User).filter(models.User.email==author_id).first() + if not db_user: + db_user = models.User(email=author_id) + db.add(db_user) + db.commit() + db.refresh(db_user) + return db_user + @cache def get_group(db:Session, group_name:str)->models.Group: db_group = db.query(models.Group).filter(models.Group.id==group_name).first() diff --git a/src/main.py b/src/main.py index 2f9a29a..074b4fe 100644 --- a/src/main.py +++ b/src/main.py @@ -9,6 +9,7 @@ import alembic.config from dotenv import load_dotenv import traceback, os, logging from loguru import logger +import sqlalchemy from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db @@ -137,12 +138,14 @@ def archive_sheet(sheet:schemas.SubmitSheet, email = Depends(get_bearer_auth)): #----- endpoint to submit data archived elsewhere @app.post("/submit-archive", status_code=201) def submit_manual_archive(manual:schemas.SubmitManual, basic_auth = Depends(get_basic_auth)): - logger.info(f"Submit {manual=}") result = Metadata.from_json(manual.result) - logger.info(f"{result=}") + logger.info(f"MANUAL SUBMIT {result.get_url()} {manual.author_id}") manual.tags.add("manual") - - archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid()) + try: + archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid()) + except sqlalchemy.exc.IntegrityError as e: + logger.error(e) + raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error") return JSONResponse({"id": archive_id}) diff --git a/src/worker.py b/src/worker.py index 317deed..a9f6441 100644 --- a/src/worker.py +++ b/src/worker.py @@ -5,10 +5,11 @@ from typing import List, Set from celery import Celery from celery.signals import task_failure from auto_archiver import Config, ArchivingOrchestrator, Metadata +from auto_archiver.core import Media from loguru import logger from db import crud, schemas, models -from db.database import SessionLocal +from db.database import SessionLocal from contextlib import contextmanager import json @@ -50,11 +51,11 @@ def create_sheet_task(self, sheet_json: str): sheet = schemas.SubmitSheet.parse_raw(sheet_json) sheet.tags.add("gsheet") logger.info(f"SHEET START {sheet=}") - + if (em := is_group_invalid_for_user(sheet.public, sheet.group_id, sheet.author_id)): return {"error": em} config = Config() - #TODO: use choose_orchestrator and overwrite the feeder + # TODO: use choose_orchestrator and overwrite the feeder config.parse(use_cli=False, yaml_config_filename="secrets/orchestration-sheet.yaml", overwrite_configs={"configurations": {"gsheet_feeder": {"sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "header": sheet.header}}}) orchestrator = ArchivingOrchestrator(config) @@ -62,13 +63,13 @@ def create_sheet_task(self, sheet_json: str): for result in orchestrator.feed(): try: insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid()) - stats["archived"]+=1 + stats["archived"] += 1 except Exception as e: logger.error(e) logger.error(traceback.format_exc()) - stats["failed"]+=1 + stats["failed"] += 1 stats["errors"].append(e) - + logger.info(f"SHEET DONE {sheet=}") return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat(), **stats} @@ -143,20 +144,43 @@ def is_group_invalid_for_user(public: bool, group_id: str, author_id: str): return False -def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id:str) -> str: - logger.info(f"INSERTING {public=} {result} into {task_id}") +def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id: str) -> str: + logger.info(f"INSERTING {public=} {group_id=} {author_id=} {tags=} into {task_id}") assert result, f"UNABLE TO archive: {result.get_url()}" with get_db() as session: - # create DB URLs - db_urls = [models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}")) for i, m in enumerate(result.media) for url in m.urls] + # urls are created by get_all_urls + # create author_id if needed + db_user = crud.get_user(session, author_id) # create DB TAGs if needed db_tags = [crud.create_tag(session, tag) for tag in tags] # insert archive - db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id), tags=db_tags, urls=db_urls) - logger.debug(f"Added {db_task.id=} to database on {db_task.created_at}") + db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id), tags=db_tags, urls=get_all_urls(result)) + logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})") return db_task.id +def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: + db_urls = [] + for m in result.media: + for i, url in enumerate(m.urls): db_urls.append(models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}"))) + for k, prop in m.properties.items(): + if prop_converted := convert_if_media(prop): + for i, url in enumerate(prop_converted.urls): db_urls.append(models.ArchiveUrl(url=url, key=m.get("id", f"{k}_{i}"))) + if isinstance(prop, list): + for i, prop_media in enumerate(prop): + if prop_media := convert_if_media(prop_media): + for j, url in enumerate(prop_media.urls): db_urls.append(models.ArchiveUrl(url=url, key=m.get("id", f"{k}{prop_media.key}_{i}.{j}"))) + return db_urls + +def convert_if_media(media): + if isinstance(media, Media): return media + elif isinstance(media, dict): + try: return Media.from_dict(media) + except Exception as e: + logger.debug(e) + return False + + # INIT ORCHESTRATORS = {} load_orchestrators()