diff --git a/app/shared/aa_utils.py b/app/shared/aa_utils.py deleted file mode 100644 index 1021f8e..0000000 --- a/app/shared/aa_utils.py +++ /dev/null @@ -1,49 +0,0 @@ -# TODO: code in this file should eventually be moved to the auto-archiver code base - -from typing import List - -from auto_archiver.core import Media, Metadata -from loguru import logger - -from app.shared.db import models - - -def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: - db_urls = [] - for m in result.media: - for i, url in enumerate(m.urls): - db_urls.append( - models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}")) - ) - for k, prop in m.properties.items(): - if prop_converted := convert_if_media(prop): - for i, url in enumerate(prop_converted.urls): - db_urls.append( - models.ArchiveUrl( - url=url, key=prop_converted.get("id", f"{k}_{i}") - ) - ) - if isinstance(prop, list): - for i, prop_media in enumerate(prop): - if prop_media := convert_if_media(prop_media): - for j, url in enumerate(prop_media.urls): - db_urls.append( - models.ArchiveUrl( - url=url, - key=prop_media.get( - "id", f"{k}{prop_media.key}_{i}.{j}" - ), - ) - ) - return db_urls - - -def convert_if_media(media): - if isinstance(media, Media): - return media - elif isinstance(media, dict): - try: - return Media.from_dict(media) - except Exception as e: - logger.debug(f"error parsing {media} : {e}") - return False diff --git a/app/tests/worker/test_worker_main.py b/app/tests/worker/test_worker_main.py index e67aa88..865d039 100644 --- a/app/tests/worker/test_worker_main.py +++ b/app/tests/worker/test_worker_main.py @@ -6,7 +6,8 @@ from auto_archiver.core import Media, Metadata from app.shared import schemas from app.shared.db import models -from app.worker.main import create_archive_task, create_sheet_task, get_all_urls +from app.web.utils.misc import get_all_urls +from app.worker.main import create_archive_task, create_sheet_task class TestCreateArchiveTask: diff --git a/app/web/endpoints/interoperability.py b/app/web/endpoints/interoperability.py index 86ea037..d08bf50 100644 --- a/app/web/endpoints/interoperability.py +++ b/app/web/endpoints/interoperability.py @@ -9,12 +9,12 @@ from loguru import logger from sqlalchemy.orm import Session from app.shared import business_logic, schemas -from app.shared.aa_utils import get_all_urls from app.shared.db import models, worker_crud from app.shared.db.database import get_db_dependency from app.shared.log import log_error from app.web.config import ALLOW_ANY_EMAIL from app.web.security import token_api_key_auth +from app.web.utils.misc import get_all_urls interoperability_router = APIRouter( diff --git a/app/web/endpoints/task.py b/app/web/endpoints/task.py index e9da444..3581ab2 100644 --- a/app/web/endpoints/task.py +++ b/app/web/endpoints/task.py @@ -26,7 +26,8 @@ def get_status( try: if task.status == "FAILURE": # *FAILURE* The task raised an exception, or has exceeded the retry limit. - # The :attr:`result` attribute then contains the exception raised by the task. + # The :attr:`result` attribute then contains the exception raised by + # the task. # https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult raise task.result diff --git a/app/web/middleware.py b/app/web/middleware.py index 47c07b3..b39ae01 100644 --- a/app/web/middleware.py +++ b/app/web/middleware.py @@ -10,7 +10,8 @@ from app.web.utils.metrics import EXCEPTION_COUNTER async def logging_middleware(request: Request, call_next): try: response = await call_next(request) - # TODO: use Origin to have summary prometheus metrics on where requests come from + # TODO: use Origin to have summary prometheus metrics on where + # requests come from # origin = request.headers.get("origin") logger.info( f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}" @@ -25,7 +26,9 @@ async def logging_middleware(request: Request, call_next): raise e -async def increase_exceptions_counter(e: Exception, location: str = "cronjob"): +async def increase_exceptions_counter( + e: Exception, location: str = "cronjob" +) -> None: if location == "cronjob": try: last_trace = traceback.extract_tb(e.__traceback__)[-1] diff --git a/app/web/security.py b/app/web/security.py index 5850ad3..ecb0111 100644 --- a/app/web/security.py +++ b/app/web/security.py @@ -59,7 +59,7 @@ async def get_token_or_user_auth( async def get_user_auth( credentials: HTTPAuthorizationCredentials = Depends(bearer_security), ): - # validates the Bearer token in the case that it requires it + # Validates the Bearer token in the case that it requires it valid_user, info = authenticate_user(credentials.credentials) if valid_user: return info.lower() diff --git a/app/web/utils/metrics.py b/app/web/utils/metrics.py index 04b496f..55f17e8 100644 --- a/app/web/utils/metrics.py +++ b/app/web/utils/metrics.py @@ -37,11 +37,12 @@ DATABASE_METRICS_COUNTER = Counter( ) -async def redis_subscribe_worker_exceptions(REDIS_EXCEPTIONS_CHANNEL: str): - # Subscribe to Redis channel and increment the counter for each exception with info on the exception and task +async def redis_subscribe_worker_exceptions(redis_exceptions_channel: str): + # Subscribe to Redis channel and increment the counter for each exception + # with info on the exception and task Redis = get_redis() PubSubExceptions = Redis.pubsub() - PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL) + PubSubExceptions.subscribe(redis_exceptions_channel) while True: message = PubSubExceptions.get_message() if message and message["type"] == "message": diff --git a/app/web/utils/misc.py b/app/web/utils/misc.py index f78ae1e..b561975 100644 --- a/app/web/utils/misc.py +++ b/app/web/utils/misc.py @@ -1,6 +1,11 @@ import base64 +from typing import List +from auto_archiver.core import Media, Metadata from fastapi.encoders import jsonable_encoder +from loguru import logger + +from app.shared.db import models def custom_jsonable_encoder(obj): @@ -14,3 +19,44 @@ def convert_priority_to_queue_dict(priority: str) -> dict: "priority": 0 if priority == "high" else 10, "queue": f"{priority}_priority", } + + +def convert_if_media(media): + if isinstance(media, Media): + return media + elif isinstance(media, dict): + try: + return Media.from_dict(media) + except Exception as e: + logger.debug(f"error parsing {media} : {e}") + return False + + +def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: + db_urls = [] + for m in result.media: + for i, url in enumerate(m.urls): + db_urls.append( + models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}")) + ) + for k, prop in m.properties.items(): + if prop_converted := convert_if_media(prop): + for i, url in enumerate(prop_converted.urls): + db_urls.append( + models.ArchiveUrl( + url=url, key=prop_converted.get("id", f"{k}_{i}") + ) + ) + if isinstance(prop, list): + for i, prop_media in enumerate(prop): + if prop_media := convert_if_media(prop_media): + for j, url in enumerate(prop_media.urls): + db_urls.append( + models.ArchiveUrl( + url=url, + key=prop_media.get( + "id", f"{k}{prop_media.key}_{i}.{j}" + ), + ) + ) + return db_urls diff --git a/app/worker/main.py b/app/worker/main.py index f7b2915..7716b6b 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -8,12 +8,12 @@ from loguru import logger from sqlalchemy import exc from app.shared import business_logic, schemas -from app.shared.aa_utils import get_all_urls from app.shared.db import models, worker_crud from app.shared.db.database import get_db from app.shared.log import log_error from app.shared.settings import get_settings from app.shared.task_messaging import get_celery, get_redis +from app.web.utils.misc import get_all_urls from app.worker.worker_log import setup_celery_logger