mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-12 05:28:34 +03:00
introduces group/global usage & permissions, validates in endpoints and tests endpoints
This commit is contained in:
@@ -19,6 +19,7 @@ from core.logging import log_error
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
celery = Celery(__name__)
|
||||
celery.conf.broker_url = settings.CELERY_BROKER_URL
|
||||
celery.conf.result_backend = settings.CELERY_RESULT_BACKEND
|
||||
@@ -48,6 +49,7 @@ def create_archive_task(self, archive_json: str):
|
||||
return Metadata.choose_most_complete([a.result for a in archives])
|
||||
|
||||
orchestrator = choose_orchestrator(archive.group_id, archive.author_id)
|
||||
logger.info(f"Using orchestrator {orchestrator=}")
|
||||
result = orchestrator.feed_item(Metadata().set_url(url))
|
||||
|
||||
try:
|
||||
@@ -59,7 +61,7 @@ def create_archive_task(self, archive_json: str):
|
||||
raise e
|
||||
return result.to_dict()
|
||||
|
||||
|
||||
#TODO: refactor how user-groups are loaded and orchestrators chosen
|
||||
@celery.task(name="create_sheet_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0})
|
||||
def create_sheet_task(self, sheet_json: str):
|
||||
sheet = schemas.SubmitSheet.model_validate_json(sheet_json)
|
||||
@@ -79,7 +81,8 @@ def create_sheet_task(self, sheet_json: str):
|
||||
continue
|
||||
try:
|
||||
#TODO: remove public from sheet in new refactor
|
||||
insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid())
|
||||
#TODO: update the sheets table with the current date if any new archive was done
|
||||
insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid(), sheet.sheet_id)
|
||||
stats["archived"] += 1
|
||||
except exc.IntegrityError as e:
|
||||
logger.warning(f"cached result detected: {e}")
|
||||
@@ -89,6 +92,10 @@ def create_sheet_task(self, sheet_json: str):
|
||||
stats["failed"] += 1
|
||||
stats["errors"].append(str(e))
|
||||
|
||||
if stats["archived"] > 0:
|
||||
with get_db() as session:
|
||||
crud.update_sheet_last_url_archived_at(session, sheet.sheet_id)
|
||||
|
||||
logger.info(f"SHEET DONE {sheet=}")
|
||||
return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat(), **stats}
|
||||
|
||||
@@ -165,7 +172,7 @@ def is_group_invalid_for_user(public: bool, group_id: str, author_id: str):
|
||||
return False
|
||||
|
||||
|
||||
def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id: str) -> str:
|
||||
def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id: str, sheet_id:str="") -> str:
|
||||
logger.info(f"INSERTING {public=} {group_id=} {author_id=} {tags=} into {task_id}")
|
||||
assert result, f"UNABLE TO archive: {result.get_url() if result else result}"
|
||||
with get_db() as session:
|
||||
@@ -175,7 +182,7 @@ def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_
|
||||
# create DB TAGs if needed
|
||||
db_tags = [crud.create_tag(session, tag) for tag in tags]
|
||||
# insert archive
|
||||
db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id), tags=db_tags, urls=get_all_urls(result))
|
||||
db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id, sheet_id=sheet_id), tags=db_tags, urls=get_all_urls(result))
|
||||
logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})")
|
||||
return db_task.id
|
||||
|
||||
|
||||
Reference in New Issue
Block a user