From b013e2a17316a9f8479180eb6f0f83c8676768d1 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:55:08 +0100 Subject: [PATCH] refactoring logging, metrics and new tests --- src/core/events.py | 2 +- src/core/logging.py | 14 +++++++++- src/db/crud.py | 5 +++- src/endpoints/default.py | 4 +-- src/endpoints/interoperability.py | 3 +- src/endpoints/task.py | 10 ++----- src/shared/settings.py | 2 +- src/tests/db/test_crud.py | 7 +++++ src/tests/endpoints/test_default.py | 22 +++++++++++---- src/tests/web/test_main.py | 26 +++++++++++++++-- src/tests/web/test_security.py | 10 ++++--- src/utils/metrics.py | 40 +++++++++++++++------------ src/web/main.py | 11 ++++---- src/worker.py | 43 ++++++++++++++--------------- 14 files changed, 128 insertions(+), 71 deletions(-) diff --git a/src/core/events.py b/src/core/events.py index 2fb649e..445d027 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -22,7 +22,7 @@ async def lifespan(app: FastAPI): alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) # disabling uvicorn logger since we use loguru in logging_middleware logging.getLogger("uvicorn.access").disabled = True - asyncio.create_task(redis_subscribe_worker_exceptions()) + asyncio.create_task(redis_subscribe_worker_exceptions(get_settings().REDIS_EXCEPTIONS_CHANNEL, get_settings().CELERY_BROKER_URL)) asyncio.create_task(refresh_user_groups()) asyncio.create_task(repeat_measure_regular_metrics()) diff --git a/src/core/logging.py b/src/core/logging.py index 1798f0c..fbaed29 100644 --- a/src/core/logging.py +++ b/src/core/logging.py @@ -1,15 +1,27 @@ +import traceback from loguru import logger from fastapi import Request -from utils.metrics import EXCEPTION_COUNTER # logging configurations logger.add("logs/api_logs.log", retention="30 days", rotation="3 days") +error_logger = logger.add("logs/error_logs.log", retention="30 days") + + +def log_error(e: Exception, traceback_str: str = None, extra:str = ""): + # EXCEPTION_COUNTER.labels(type(e).__name__).inc() + if not traceback_str: traceback_str = traceback.format_exc() + if extra: extra = f"{extra}\n" + logger.error(f"{extra}{e.__class__.__name__}: {e}") + error_logger.error(f"{extra}{e.__class__.__name__}: {e}\n{traceback_str}") + async def logging_middleware(request: Request, call_next): try: response = await call_next(request) logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}") return response except Exception as e: + from utils.metrics import EXCEPTION_COUNTER EXCEPTION_COUNTER.labels(type(e).__name__).inc() + log_error(e) raise e \ No newline at end of file diff --git a/src/db/crud.py b/src/db/crud.py index 1bcfc44..97dacf3 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -74,13 +74,16 @@ def count_archives(db: Session): def count_archive_urls(db: Session): return db.query(func.count(models.ArchiveUrl.url)).scalar() +def count_users(db: Session): + return db.query(func.count(models.User.email)).scalar() def count_by_user_since(db: Session, seconds_delta: int = 15): time_threshold = datetime.now() - timedelta(seconds=seconds_delta) return db.query(models.Archive.author_id, func.count().label('total'))\ .filter(models.Archive.created_at >= time_threshold)\ .group_by(models.Archive.author_id)\ - .order_by(func.count().desc()).limit(5 * DATABASE_QUERY_LIMIT).all() + .order_by(func.count().desc())\ + .limit(500).all() def base_query(db: Session): diff --git a/src/endpoints/default.py b/src/endpoints/default.py index da2fc97..269fa3f 100644 --- a/src/endpoints/default.py +++ b/src/endpoints/default.py @@ -1,10 +1,10 @@ from fastapi import APIRouter, Depends, Request, HTTPException from fastapi.responses import FileResponse, JSONResponse -from loguru import logger from sqlalchemy.orm import Session from core.config import VERSION, BREAKING_CHANGES +from core.logging import log_error from db import crud from db.database import get_db_dependency, get_db from web.security import get_user_auth, bearer_security @@ -21,7 +21,7 @@ async def home(request: Request): with get_db() as db: status["groups"] = crud.get_user_groups(db, email) except HTTPException: pass # not authenticated is fine - except Exception as e: logger.error(e) + except Exception as e: log_error(e) return JSONResponse(status) diff --git a/src/endpoints/interoperability.py b/src/endpoints/interoperability.py index d1bcfa4..b784e03 100644 --- a/src/endpoints/interoperability.py +++ b/src/endpoints/interoperability.py @@ -7,6 +7,7 @@ import sqlalchemy from web.security import token_api_key_auth from db import models, schemas from worker import insert_result_into_db +from core.logging import log_error interoperability_router = APIRouter(prefix="/interop", tags=["Interoperability endpoints."]) @@ -21,6 +22,6 @@ def submit_manual_archive(manual: schemas.SubmitManual, auth=Depends(token_api_k try: archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid()) except sqlalchemy.exc.IntegrityError as e: - logger.error(e) + log_error(e) raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error") return JSONResponse({"id": archive_id}, status_code=201) diff --git a/src/endpoints/task.py b/src/endpoints/task.py index 7544aca..532bad6 100644 --- a/src/endpoints/task.py +++ b/src/endpoints/task.py @@ -1,4 +1,3 @@ -import traceback from celery.result import AsyncResult from fastapi import APIRouter, Depends from fastapi.encoders import jsonable_encoder @@ -8,7 +7,7 @@ from loguru import logger from web.security import get_token_or_user_auth from db import schemas - +from core.logging import log_error from worker import celery @@ -25,8 +24,6 @@ def get_status(task_id, email=Depends(get_token_or_user_auth)): # The :attr:`result` attribute then contains the exception raised by the task. # https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult raise task.result - # TODO: refactor to use schema? - # response = schemas.TaskResult(id=task_id, status=task.status, result=task.result) response = { "id": task_id, @@ -36,10 +33,7 @@ def get_status(task_id, email=Depends(get_token_or_user_auth)): return JSONResponse(jsonable_encoder(response, exclude_unset=True)) except Exception as e: - logger.error(e) - logger.error(traceback.format_exc()) - # TODO: refactor to use schema? - # response = schemas.TaskResult(id=task_id, status="FAILURE", result={"error": str(e)}) + log_error(e) return JSONResponse({ "id": task_id, "status": "FAILURE", diff --git a/src/shared/settings.py b/src/shared/settings.py index 56e138f..39e2598 100644 --- a/src/shared/settings.py +++ b/src/shared/settings.py @@ -22,7 +22,7 @@ class Settings(BaseSettings): REDIS_EXCEPTIONS_CHANNEL: str = "exceptions-channel" # observability - REPEAT_COUNT_METRICS_SECONDS: int = 15 + REPEAT_COUNT_METRICS_SECONDS: int = 30 # security API_BEARER_TOKEN: Annotated[str, Len(min_length=20)] diff --git a/src/tests/db/test_crud.py b/src/tests/db/test_crud.py index 6dbfaf0..daad766 100644 --- a/src/tests/db/test_crud.py +++ b/src/tests/db/test_crud.py @@ -252,6 +252,13 @@ def test_count_archive_urls(test_data, db_session): assert crud.count_archives(db_session) == 99 assert crud.count_archive_urls(db_session) == 999 +def test_count_users(test_data, db_session): + from db import crud + + assert crud.count_users(db_session) == 4 + db_session.query(models.User).filter(models.User.email == "rick@example.com").delete() + db_session.commit() + assert crud.count_users(db_session) == 3 def test_count_by_users_since(test_data, db_session): from db import crud diff --git a/src/tests/endpoints/test_default.py b/src/tests/endpoints/test_default.py index dbd9f4f..b840260 100644 --- a/src/tests/endpoints/test_default.py +++ b/src/tests/endpoints/test_default.py @@ -101,8 +101,20 @@ async def test_prometheus_metrics(test_data, client_with_auth, get_settings): assert 'disk_utilization{type="used"}' in r2.text assert 'disk_utilization{type="free"}' in r2.text assert 'disk_utilization{type="database"}' in r2.text - assert 'database_metrics{query="count_archives",user="-"} 100.0' in r2.text - assert 'database_metrics{query="count_archive_urls",user="-"} 1000.0' in r2.text - assert 'database_metrics{query="count_by_user",user="rick@example.com"} 34.0' in r2.text - assert 'database_metrics{query="count_by_user",user="morty@example.com"} 33.0' in r2.text - assert 'database_metrics{query="count_by_user",user="jerry@example.com"} 33.0' in r2.text + assert 'database_metrics{query="count_archives"} 100.0' in r2.text + assert 'database_metrics{query="count_archive_urls"} 1000.0' in r2.text + assert 'database_metrics{query="count_users"} 4.0' in r2.text + assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r2.text + assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r2.text + assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r2.text + + # 30s window, should not change the gauges nor the total in the counters + from utils.metrics import measure_regular_metrics + await measure_regular_metrics(get_settings.DATABASE_PATH, 30) + r3 = client_with_auth.get("/metrics") + assert 'database_metrics{query="count_archives"} 100.0' in r3.text + assert 'database_metrics{query="count_archive_urls"} 1000.0' in r3.text + assert 'database_metrics{query="count_users"} 4.0' in r3.text + assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r3.text + assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r3.text + assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r3.text \ No newline at end of file diff --git a/src/tests/web/test_main.py b/src/tests/web/test_main.py index c59b30a..e880311 100644 --- a/src/tests/web/test_main.py +++ b/src/tests/web/test_main.py @@ -1,10 +1,32 @@ import os +from unittest.mock import patch from fastapi.testclient import TestClient -from shared.settings import get_settings - import shutil +import pytest + +def test_lifespan(app): + with TestClient(app) as client: + r = client.get("/health") + assert r.status_code == 200 + assert r.json() == {"status": "ok"} + +def test_alembic(db_session): + import alembic.config + alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) + alembic.config.main(argv=['--raiseerr', 'downgrade', 'base']) + +@patch("endpoints.default.crud.get_user_groups", side_effect=Exception('mocked error')) +def test_logging_middleware(m1, client_with_auth): + from utils.metrics import EXCEPTION_COUNTER + assert len(EXCEPTION_COUNTER.collect()[0].samples) == 0 + with pytest.raises(Exception, match="mocked error"): + client_with_auth.get("/groups") + # creates one empty and one from above + assert len(EXCEPTION_COUNTER.collect()[0].samples) == 2 + + def test_serve_local_archive_logic(get_settings): # create a test file first os.makedirs("local_archive_test", exist_ok=True) diff --git a/src/tests/web/test_security.py b/src/tests/web/test_security.py index ecacea6..64fe4d4 100644 --- a/src/tests/web/test_security.py +++ b/src/tests/web/test_security.py @@ -25,10 +25,11 @@ async def test_get_token_or_user_auth_with_api(): async def test_get_token_or_user_auth_with_user(): from web.security import get_token_or_user_auth bad_user = HTTPAuthorizationCredentials(scheme="ipsum", credentials="invalid") + e: pytest.ExceptionInfo = None with pytest.raises(HTTPException) as e: await get_token_or_user_auth(bad_user) - assert e.status_code == 401 - assert e.detail == "invalid access_token" + assert e.value.status_code == 401 + assert e.value.detail == "invalid access_token" @patch("web.security.authenticate_user", return_value=(True, "summer@example.com")) @@ -44,10 +45,11 @@ async def test_get_user_auth(m1): async def test_token_api_key_auth_exception(m1): from web.security import token_api_key_auth + e: pytest.ExceptionInfo = None with pytest.raises(HTTPException) as e: await token_api_key_auth(HTTPAuthorizationCredentials(scheme="ipsum", credentials="does-not-matter"), auto_error=True) - assert e.status_code == 401 - assert e.detail == "Wrong auth credentials" + assert e.value.status_code == 401 + assert e.value.detail == "Wrong auth credentials" @pytest.mark.asyncio diff --git a/src/utils/metrics.py b/src/utils/metrics.py index e63297c..8d513e6 100644 --- a/src/utils/metrics.py +++ b/src/utils/metrics.py @@ -2,62 +2,68 @@ import asyncio import json import os import shutil -from loguru import logger from prometheus_client import Counter, Gauge +import redis from db import crud from db.database import get_db -from worker import REDIS_EXCEPTIONS_CHANNEL, Rdis +from core.logging import log_error # Custom metrics EXCEPTION_COUNTER = Counter( "exceptions", "Number of times a certain exception has occurred.", - labelnames=("types",) + labelnames=["types"] ) WORKER_EXCEPTION = Counter( "worker_exceptions_total", "Number of times a certain exception has occurred on the worker.", - labelnames=("exception", "task",) + labelnames=["types", "exception", "task", "traceback"] ) DISK_UTILIZATION = Gauge( "disk_utilization", "Disk utilization in GB", - labelnames=("type",) + labelnames=["type"] ) DATABASE_METRICS = Gauge( "database_metrics", - "Useful database metrics from queries", - labelnames=("query", "user") + "Database metric readings at a certain point in time", + labelnames=["query"] +) +DATABASE_METRICS_COUNTER = Counter( + "database_metrics_counter", + "Database metrics that increase over time", + labelnames=["query", "user"] ) -async def redis_subscribe_worker_exceptions(): +async def redis_subscribe_worker_exceptions(REDIS_EXCEPTIONS_CHANNEL, CELERY_BROKER_URL): # Subscribe to Redis channel and increment the counter for each exception with info on the exception and task + Rdis = redis.Redis.from_url(CELERY_BROKER_URL) PubSubExceptions = Rdis.pubsub() PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL) while True: message = PubSubExceptions.get_message() if message and message["type"] == "message": data = json.loads(message["data"].decode("utf-8")) - WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc() + WORKER_EXCEPTION.labels(types=type(data["exception"]).__name__, exception=data["exception"], task=data["task"], traceback=data["traceback"]).inc() await asyncio.sleep(1) -async def measure_regular_metrics(sqlite_db_url:str, repeat_in_seconds:int): + +async def measure_regular_metrics(sqlite_db_url: str, repeat_in_seconds: int): _total, used, free = shutil.disk_usage("/") DISK_UTILIZATION.labels(type="used").set(used / (2**30)) DISK_UTILIZATION.labels(type="free").set(free / (2**30)) - try: + try: fs = os.stat(sqlite_db_url.replace("sqlite:///", "")) DISK_UTILIZATION.labels(type="database").set(fs.st_size / (2**30)) - except Exception as e: logger.error(e) + except Exception as e: log_error(e) with get_db() as db: - count_archives = crud.count_archives(db) - count_archive_urls = crud.count_archive_urls(db) - DATABASE_METRICS.labels(query="count_archives", user="-").set(count_archives) - DATABASE_METRICS.labels(query="count_archive_urls", user="-").set(count_archive_urls) + DATABASE_METRICS.labels(query="count_archives").set(crud.count_archives(db)) + DATABASE_METRICS.labels(query="count_archive_urls").set(crud.count_archive_urls(db)) + DATABASE_METRICS.labels(query="count_users").set(crud.count_users(db)) for user in crud.count_by_user_since(db, repeat_in_seconds): - DATABASE_METRICS.labels(query="count_by_user", user=user.author_id).set(user.total) \ No newline at end of file + DATABASE_METRICS_COUNTER.labels(query="count_by_user", user=user.author_id).inc(user.total) diff --git a/src/web/main.py b/src/web/main.py index f70a281..addc3a9 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -1,4 +1,4 @@ -import traceback, os +import os from celery.result import AsyncResult from fastapi import FastAPI, Depends, HTTPException from fastapi.encoders import jsonable_encoder @@ -11,7 +11,7 @@ import sqlalchemy from sqlalchemy.orm import Session from loguru import logger -from core.logging import logging_middleware +from core.logging import logging_middleware, log_error from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db from db import crud, models, schemas @@ -42,6 +42,7 @@ def app_factory(settings = get_settings()): allow_methods=["*"], allow_headers=["*"], ) + app.middleware("http")(logging_middleware) app.include_router(default_router) app.include_router(url_router) @@ -60,7 +61,6 @@ def app_factory(settings = get_settings()): app.mount(settings.SERVE_LOCAL_ARCHIVE, StaticFiles(directory=local_dir), name=settings.SERVE_LOCAL_ARCHIVE) - app.middleware("http")(logging_middleware) # -----Submit URL and manipulate tasks. Bearer protected below @@ -111,8 +111,7 @@ def app_factory(settings = get_settings()): return JSONResponse(jsonable_encoder(response, exclude_unset=True)) except Exception as e: - logger.error(e) - logger.error(traceback.format_exc()) + log_error(e) return JSONResponse({ "id": task_id, "status": "FAILURE", @@ -161,7 +160,7 @@ def app_factory(settings = get_settings()): try: archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid()) except sqlalchemy.exc.IntegrityError as e: - logger.error(e) + log_error(e) raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error") return JSONResponse({"id": archive_id}) diff --git a/src/worker.py b/src/worker.py index 94fbb00..9b6c417 100644 --- a/src/worker.py +++ b/src/worker.py @@ -14,6 +14,7 @@ from shared.settings import get_settings import json import redis from sqlalchemy import exc +from core.logging import log_error settings = get_settings() @@ -21,17 +22,17 @@ celery = Celery(__name__) celery.conf.broker_url = settings.CELERY_BROKER_URL celery.conf.result_backend = settings.CELERY_RESULT_BACKEND USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME -REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL Rdis = redis.Redis.from_url(celery.conf.broker_url) + @celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 3}) def create_archive_task(self, archive_json: str): archive = schemas.ArchiveCreate.model_validate_json(archive_json) logger.info(f"Archiving {archive.url=} {archive.tags=} {archive.public=} {archive.group_id=} {archive.author_id=}") invalid = is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id) if invalid: - raise Exception(invalid) # marks task FAILED, saves the Exception as result + raise Exception(invalid) # marks task FAILED, saves the Exception as result url = archive.url logger.info(f"{url=} {archive=}") @@ -45,14 +46,13 @@ def create_archive_task(self, archive_json: str): orchestrator = choose_orchestrator(archive.group_id, archive.author_id) result = orchestrator.feed_item(Metadata().set_url(url)) - + try: insert_result_into_db(result, archive.tags, archive.public, archive.group_id, archive.author_id, self.request.id) except Exception as e: # Log it, then raise again to store the error as the task result - logger.error(e) - logger.error(traceback.format_exc()) - redis_publish_exception(e, self.name) + log_error(e) + redis_publish_exception(e, self.name, traceback.format_exc()) raise e return result.to_dict() @@ -72,7 +72,7 @@ def create_sheet_task(self, sheet_json: str): stats = {"archived": 0, "failed": 0, "errors": []} for result in orchestrator.feed(): - if not result: + if not result: logger.error("Got empty result from feeder, an internal error must have occurred.") continue try: @@ -82,10 +82,8 @@ def create_sheet_task(self, sheet_json: str): logger.warning(f"cached result detected: {e}") stats["archived"] += 1 except Exception as e: - logger.error(type(e)) - logger.error(e) - logger.error(traceback.format_exc()) - redis_publish_exception(e, self.name) + log_error(e, extra=f"{self.name}: {sheet_json}") + redis_publish_exception(e, self.name, traceback.format_exc()) stats["failed"] += 1 stats["errors"].append(str(e)) @@ -96,11 +94,11 @@ def create_sheet_task(self, sheet_json: str): @task_failure.connect(sender=create_sheet_task) @task_failure.connect(sender=create_archive_task) def task_failure_notifier(sender, **kwargs): - logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ") - logger.error(kwargs['exception']) - logger.error(kwargs['traceback']) - logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))) - redis_publish_exception(kwargs['exception'], sender.name) + traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))) + logger.warning("😅 From task_failure_notifier ==> Task failed successfully!") + log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}") + redis_publish_exception(kwargs['exception'], sender.name, traceback_msg) + def choose_orchestrator(group, email): global ORCHESTRATORS @@ -187,10 +185,11 @@ def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: if isinstance(prop, list): for i, prop_media in enumerate(prop): if prop_media := convert_if_media(prop_media): - for j, url in enumerate(prop_media.urls): + for j, url in enumerate(prop_media.urls): db_urls.append(models.ArchiveUrl(url=url, key=prop_media.get("id", f"{k}{prop_media.key}_{i}.{j}"))) return db_urls + def convert_if_media(media): if isinstance(media, Media): return media elif isinstance(media, dict): @@ -199,13 +198,13 @@ def convert_if_media(media): logger.debug(f"error parsing {media} : {e}") return False -def redis_publish_exception(exception, task_name): + +def redis_publish_exception(exception, task_name, traceback: str = ""): + REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL try: - Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name}, default=str)) + Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name, "traceback": traceback}, default=str)) except Exception as e: - logger.error(e) - logger.error(traceback.format_exc()) - logger.error(f"Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") + log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") @worker_init.connect