refactoring logging, metrics and new tests

This commit is contained in:
msramalho
2024-10-22 12:55:08 +01:00
parent e58193e47a
commit b013e2a173
14 changed files with 128 additions and 71 deletions

View File

@@ -22,7 +22,7 @@ async def lifespan(app: FastAPI):
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
# disabling uvicorn logger since we use loguru in logging_middleware
logging.getLogger("uvicorn.access").disabled = True
asyncio.create_task(redis_subscribe_worker_exceptions())
asyncio.create_task(redis_subscribe_worker_exceptions(get_settings().REDIS_EXCEPTIONS_CHANNEL, get_settings().CELERY_BROKER_URL))
asyncio.create_task(refresh_user_groups())
asyncio.create_task(repeat_measure_regular_metrics())

View File

@@ -1,15 +1,27 @@
import traceback
from loguru import logger
from fastapi import Request
from utils.metrics import EXCEPTION_COUNTER
# logging configurations
logger.add("logs/api_logs.log", retention="30 days", rotation="3 days")
error_logger = logger.add("logs/error_logs.log", retention="30 days")
def log_error(e: Exception, traceback_str: str = None, extra:str = ""):
# EXCEPTION_COUNTER.labels(type(e).__name__).inc()
if not traceback_str: traceback_str = traceback.format_exc()
if extra: extra = f"{extra}\n"
logger.error(f"{extra}{e.__class__.__name__}: {e}")
error_logger.error(f"{extra}{e.__class__.__name__}: {e}\n{traceback_str}")
async def logging_middleware(request: Request, call_next):
try:
response = await call_next(request)
logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}")
return response
except Exception as e:
from utils.metrics import EXCEPTION_COUNTER
EXCEPTION_COUNTER.labels(type(e).__name__).inc()
log_error(e)
raise e

View File

@@ -74,13 +74,16 @@ def count_archives(db: Session):
def count_archive_urls(db: Session):
return db.query(func.count(models.ArchiveUrl.url)).scalar()
def count_users(db: Session):
return db.query(func.count(models.User.email)).scalar()
def count_by_user_since(db: Session, seconds_delta: int = 15):
time_threshold = datetime.now() - timedelta(seconds=seconds_delta)
return db.query(models.Archive.author_id, func.count().label('total'))\
.filter(models.Archive.created_at >= time_threshold)\
.group_by(models.Archive.author_id)\
.order_by(func.count().desc()).limit(5 * DATABASE_QUERY_LIMIT).all()
.order_by(func.count().desc())\
.limit(500).all()
def base_query(db: Session):

View File

@@ -1,10 +1,10 @@
from fastapi import APIRouter, Depends, Request, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from loguru import logger
from sqlalchemy.orm import Session
from core.config import VERSION, BREAKING_CHANGES
from core.logging import log_error
from db import crud
from db.database import get_db_dependency, get_db
from web.security import get_user_auth, bearer_security
@@ -21,7 +21,7 @@ async def home(request: Request):
with get_db() as db:
status["groups"] = crud.get_user_groups(db, email)
except HTTPException: pass # not authenticated is fine
except Exception as e: logger.error(e)
except Exception as e: log_error(e)
return JSONResponse(status)

View File

@@ -7,6 +7,7 @@ import sqlalchemy
from web.security import token_api_key_auth
from db import models, schemas
from worker import insert_result_into_db
from core.logging import log_error
interoperability_router = APIRouter(prefix="/interop", tags=["Interoperability endpoints."])
@@ -21,6 +22,6 @@ def submit_manual_archive(manual: schemas.SubmitManual, auth=Depends(token_api_k
try:
archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid())
except sqlalchemy.exc.IntegrityError as e:
logger.error(e)
log_error(e)
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
return JSONResponse({"id": archive_id}, status_code=201)

View File

@@ -1,4 +1,3 @@
import traceback
from celery.result import AsyncResult
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
@@ -8,7 +7,7 @@ from loguru import logger
from web.security import get_token_or_user_auth
from db import schemas
from core.logging import log_error
from worker import celery
@@ -25,8 +24,6 @@ def get_status(task_id, email=Depends(get_token_or_user_auth)):
# The :attr:`result` attribute then contains the exception raised by the task.
# https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult
raise task.result
# TODO: refactor to use schema?
# response = schemas.TaskResult(id=task_id, status=task.status, result=task.result)
response = {
"id": task_id,
@@ -36,10 +33,7 @@ def get_status(task_id, email=Depends(get_token_or_user_auth)):
return JSONResponse(jsonable_encoder(response, exclude_unset=True))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
# TODO: refactor to use schema?
# response = schemas.TaskResult(id=task_id, status="FAILURE", result={"error": str(e)})
log_error(e)
return JSONResponse({
"id": task_id,
"status": "FAILURE",

View File

@@ -22,7 +22,7 @@ class Settings(BaseSettings):
REDIS_EXCEPTIONS_CHANNEL: str = "exceptions-channel"
# observability
REPEAT_COUNT_METRICS_SECONDS: int = 15
REPEAT_COUNT_METRICS_SECONDS: int = 30
# security
API_BEARER_TOKEN: Annotated[str, Len(min_length=20)]

View File

@@ -252,6 +252,13 @@ def test_count_archive_urls(test_data, db_session):
assert crud.count_archives(db_session) == 99
assert crud.count_archive_urls(db_session) == 999
def test_count_users(test_data, db_session):
from db import crud
assert crud.count_users(db_session) == 4
db_session.query(models.User).filter(models.User.email == "rick@example.com").delete()
db_session.commit()
assert crud.count_users(db_session) == 3
def test_count_by_users_since(test_data, db_session):
from db import crud

View File

@@ -101,8 +101,20 @@ async def test_prometheus_metrics(test_data, client_with_auth, get_settings):
assert 'disk_utilization{type="used"}' in r2.text
assert 'disk_utilization{type="free"}' in r2.text
assert 'disk_utilization{type="database"}' in r2.text
assert 'database_metrics{query="count_archives",user="-"} 100.0' in r2.text
assert 'database_metrics{query="count_archive_urls",user="-"} 1000.0' in r2.text
assert 'database_metrics{query="count_by_user",user="rick@example.com"} 34.0' in r2.text
assert 'database_metrics{query="count_by_user",user="morty@example.com"} 33.0' in r2.text
assert 'database_metrics{query="count_by_user",user="jerry@example.com"} 33.0' in r2.text
assert 'database_metrics{query="count_archives"} 100.0' in r2.text
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r2.text
assert 'database_metrics{query="count_users"} 4.0' in r2.text
assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r2.text
assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r2.text
assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r2.text
# 30s window, should not change the gauges nor the total in the counters
from utils.metrics import measure_regular_metrics
await measure_regular_metrics(get_settings.DATABASE_PATH, 30)
r3 = client_with_auth.get("/metrics")
assert 'database_metrics{query="count_archives"} 100.0' in r3.text
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r3.text
assert 'database_metrics{query="count_users"} 4.0' in r3.text
assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r3.text
assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r3.text
assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r3.text

View File

@@ -1,10 +1,32 @@
import os
from unittest.mock import patch
from fastapi.testclient import TestClient
from shared.settings import get_settings
import shutil
import pytest
def test_lifespan(app):
with TestClient(app) as client:
r = client.get("/health")
assert r.status_code == 200
assert r.json() == {"status": "ok"}
def test_alembic(db_session):
import alembic.config
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
alembic.config.main(argv=['--raiseerr', 'downgrade', 'base'])
@patch("endpoints.default.crud.get_user_groups", side_effect=Exception('mocked error'))
def test_logging_middleware(m1, client_with_auth):
from utils.metrics import EXCEPTION_COUNTER
assert len(EXCEPTION_COUNTER.collect()[0].samples) == 0
with pytest.raises(Exception, match="mocked error"):
client_with_auth.get("/groups")
# creates one empty and one from above
assert len(EXCEPTION_COUNTER.collect()[0].samples) == 2
def test_serve_local_archive_logic(get_settings):
# create a test file first
os.makedirs("local_archive_test", exist_ok=True)

View File

@@ -25,10 +25,11 @@ async def test_get_token_or_user_auth_with_api():
async def test_get_token_or_user_auth_with_user():
from web.security import get_token_or_user_auth
bad_user = HTTPAuthorizationCredentials(scheme="ipsum", credentials="invalid")
e: pytest.ExceptionInfo = None
with pytest.raises(HTTPException) as e:
await get_token_or_user_auth(bad_user)
assert e.status_code == 401
assert e.detail == "invalid access_token"
assert e.value.status_code == 401
assert e.value.detail == "invalid access_token"
@patch("web.security.authenticate_user", return_value=(True, "summer@example.com"))
@@ -44,10 +45,11 @@ async def test_get_user_auth(m1):
async def test_token_api_key_auth_exception(m1):
from web.security import token_api_key_auth
e: pytest.ExceptionInfo = None
with pytest.raises(HTTPException) as e:
await token_api_key_auth(HTTPAuthorizationCredentials(scheme="ipsum", credentials="does-not-matter"), auto_error=True)
assert e.status_code == 401
assert e.detail == "Wrong auth credentials"
assert e.value.status_code == 401
assert e.value.detail == "Wrong auth credentials"
@pytest.mark.asyncio

View File

@@ -2,62 +2,68 @@ import asyncio
import json
import os
import shutil
from loguru import logger
from prometheus_client import Counter, Gauge
import redis
from db import crud
from db.database import get_db
from worker import REDIS_EXCEPTIONS_CHANNEL, Rdis
from core.logging import log_error
# Custom metrics
EXCEPTION_COUNTER = Counter(
"exceptions",
"Number of times a certain exception has occurred.",
labelnames=("types",)
labelnames=["types"]
)
WORKER_EXCEPTION = Counter(
"worker_exceptions_total",
"Number of times a certain exception has occurred on the worker.",
labelnames=("exception", "task",)
labelnames=["types", "exception", "task", "traceback"]
)
DISK_UTILIZATION = Gauge(
"disk_utilization",
"Disk utilization in GB",
labelnames=("type",)
labelnames=["type"]
)
DATABASE_METRICS = Gauge(
"database_metrics",
"Useful database metrics from queries",
labelnames=("query", "user")
"Database metric readings at a certain point in time",
labelnames=["query"]
)
DATABASE_METRICS_COUNTER = Counter(
"database_metrics_counter",
"Database metrics that increase over time",
labelnames=["query", "user"]
)
async def redis_subscribe_worker_exceptions():
async def redis_subscribe_worker_exceptions(REDIS_EXCEPTIONS_CHANNEL, CELERY_BROKER_URL):
# Subscribe to Redis channel and increment the counter for each exception with info on the exception and task
Rdis = redis.Redis.from_url(CELERY_BROKER_URL)
PubSubExceptions = Rdis.pubsub()
PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL)
while True:
message = PubSubExceptions.get_message()
if message and message["type"] == "message":
data = json.loads(message["data"].decode("utf-8"))
WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
WORKER_EXCEPTION.labels(types=type(data["exception"]).__name__, exception=data["exception"], task=data["task"], traceback=data["traceback"]).inc()
await asyncio.sleep(1)
async def measure_regular_metrics(sqlite_db_url:str, repeat_in_seconds:int):
async def measure_regular_metrics(sqlite_db_url: str, repeat_in_seconds: int):
_total, used, free = shutil.disk_usage("/")
DISK_UTILIZATION.labels(type="used").set(used / (2**30))
DISK_UTILIZATION.labels(type="free").set(free / (2**30))
try:
try:
fs = os.stat(sqlite_db_url.replace("sqlite:///", ""))
DISK_UTILIZATION.labels(type="database").set(fs.st_size / (2**30))
except Exception as e: logger.error(e)
except Exception as e: log_error(e)
with get_db() as db:
count_archives = crud.count_archives(db)
count_archive_urls = crud.count_archive_urls(db)
DATABASE_METRICS.labels(query="count_archives", user="-").set(count_archives)
DATABASE_METRICS.labels(query="count_archive_urls", user="-").set(count_archive_urls)
DATABASE_METRICS.labels(query="count_archives").set(crud.count_archives(db))
DATABASE_METRICS.labels(query="count_archive_urls").set(crud.count_archive_urls(db))
DATABASE_METRICS.labels(query="count_users").set(crud.count_users(db))
for user in crud.count_by_user_since(db, repeat_in_seconds):
DATABASE_METRICS.labels(query="count_by_user", user=user.author_id).set(user.total)
DATABASE_METRICS_COUNTER.labels(query="count_by_user", user=user.author_id).inc(user.total)

View File

@@ -1,4 +1,4 @@
import traceback, os
import os
from celery.result import AsyncResult
from fastapi import FastAPI, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
@@ -11,7 +11,7 @@ import sqlalchemy
from sqlalchemy.orm import Session
from loguru import logger
from core.logging import logging_middleware
from core.logging import logging_middleware, log_error
from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db
from db import crud, models, schemas
@@ -42,6 +42,7 @@ def app_factory(settings = get_settings()):
allow_methods=["*"],
allow_headers=["*"],
)
app.middleware("http")(logging_middleware)
app.include_router(default_router)
app.include_router(url_router)
@@ -60,7 +61,6 @@ def app_factory(settings = get_settings()):
app.mount(settings.SERVE_LOCAL_ARCHIVE, StaticFiles(directory=local_dir), name=settings.SERVE_LOCAL_ARCHIVE)
app.middleware("http")(logging_middleware)
# -----Submit URL and manipulate tasks. Bearer protected below
@@ -111,8 +111,7 @@ def app_factory(settings = get_settings()):
return JSONResponse(jsonable_encoder(response, exclude_unset=True))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
log_error(e)
return JSONResponse({
"id": task_id,
"status": "FAILURE",
@@ -161,7 +160,7 @@ def app_factory(settings = get_settings()):
try:
archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid())
except sqlalchemy.exc.IntegrityError as e:
logger.error(e)
log_error(e)
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
return JSONResponse({"id": archive_id})

View File

@@ -14,6 +14,7 @@ from shared.settings import get_settings
import json
import redis
from sqlalchemy import exc
from core.logging import log_error
settings = get_settings()
@@ -21,17 +22,17 @@ celery = Celery(__name__)
celery.conf.broker_url = settings.CELERY_BROKER_URL
celery.conf.result_backend = settings.CELERY_RESULT_BACKEND
USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME
REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL
Rdis = redis.Redis.from_url(celery.conf.broker_url)
@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 3})
def create_archive_task(self, archive_json: str):
archive = schemas.ArchiveCreate.model_validate_json(archive_json)
logger.info(f"Archiving {archive.url=} {archive.tags=} {archive.public=} {archive.group_id=} {archive.author_id=}")
invalid = is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id)
if invalid:
raise Exception(invalid) # marks task FAILED, saves the Exception as result
raise Exception(invalid) # marks task FAILED, saves the Exception as result
url = archive.url
logger.info(f"{url=} {archive=}")
@@ -45,14 +46,13 @@ def create_archive_task(self, archive_json: str):
orchestrator = choose_orchestrator(archive.group_id, archive.author_id)
result = orchestrator.feed_item(Metadata().set_url(url))
try:
insert_result_into_db(result, archive.tags, archive.public, archive.group_id, archive.author_id, self.request.id)
except Exception as e:
# Log it, then raise again to store the error as the task result
logger.error(e)
logger.error(traceback.format_exc())
redis_publish_exception(e, self.name)
log_error(e)
redis_publish_exception(e, self.name, traceback.format_exc())
raise e
return result.to_dict()
@@ -72,7 +72,7 @@ def create_sheet_task(self, sheet_json: str):
stats = {"archived": 0, "failed": 0, "errors": []}
for result in orchestrator.feed():
if not result:
if not result:
logger.error("Got empty result from feeder, an internal error must have occurred.")
continue
try:
@@ -82,10 +82,8 @@ def create_sheet_task(self, sheet_json: str):
logger.warning(f"cached result detected: {e}")
stats["archived"] += 1
except Exception as e:
logger.error(type(e))
logger.error(e)
logger.error(traceback.format_exc())
redis_publish_exception(e, self.name)
log_error(e, extra=f"{self.name}: {sheet_json}")
redis_publish_exception(e, self.name, traceback.format_exc())
stats["failed"] += 1
stats["errors"].append(str(e))
@@ -96,11 +94,11 @@ def create_sheet_task(self, sheet_json: str):
@task_failure.connect(sender=create_sheet_task)
@task_failure.connect(sender=create_archive_task)
def task_failure_notifier(sender, **kwargs):
logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ")
logger.error(kwargs['exception'])
logger.error(kwargs['traceback'])
logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))))
redis_publish_exception(kwargs['exception'], sender.name)
traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))
logger.warning("😅 From task_failure_notifier ==> Task failed successfully!")
log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}")
redis_publish_exception(kwargs['exception'], sender.name, traceback_msg)
def choose_orchestrator(group, email):
global ORCHESTRATORS
@@ -187,10 +185,11 @@ def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]:
if isinstance(prop, list):
for i, prop_media in enumerate(prop):
if prop_media := convert_if_media(prop_media):
for j, url in enumerate(prop_media.urls):
for j, url in enumerate(prop_media.urls):
db_urls.append(models.ArchiveUrl(url=url, key=prop_media.get("id", f"{k}{prop_media.key}_{i}.{j}")))
return db_urls
def convert_if_media(media):
if isinstance(media, Media): return media
elif isinstance(media, dict):
@@ -199,13 +198,13 @@ def convert_if_media(media):
logger.debug(f"error parsing {media} : {e}")
return False
def redis_publish_exception(exception, task_name):
def redis_publish_exception(exception, task_name, traceback: str = ""):
REDIS_EXCEPTIONS_CHANNEL = settings.REDIS_EXCEPTIONS_CHANNEL
try:
Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name}, default=str))
Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name, "traceback": traceback}, default=str))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
logger.error(f"Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
@worker_init.connect