mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-12 21:48:35 +03:00
adds new metrics
This commit is contained in:
@@ -1,8 +1,8 @@
|
|||||||
from functools import cache
|
from functools import cache
|
||||||
from sqlalchemy.orm import Session, load_only
|
from sqlalchemy.orm import Session, load_only
|
||||||
from sqlalchemy import Column, or_
|
from sqlalchemy import Column, or_, func
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from security import ALLOW_ANY_EMAIL
|
from security import ALLOW_ANY_EMAIL
|
||||||
from . import models, schemas
|
from . import models, schemas
|
||||||
@@ -65,6 +65,18 @@ def soft_delete_task(db: Session, task_id: str, email: str) -> bool:
|
|||||||
db.commit()
|
db.commit()
|
||||||
return db_task is not None
|
return db_task is not None
|
||||||
|
|
||||||
|
def count_archives(db:Session):
|
||||||
|
return db.query(func.count(models.Archive.id)).scalar()
|
||||||
|
|
||||||
|
def count_archive_urls(db:Session):
|
||||||
|
return db.query(func.count(models.ArchiveUrl.url)).scalar()
|
||||||
|
|
||||||
|
def count_by_user_since(db:Session, time_delta: timedelta = timedelta(seconds=30)):
|
||||||
|
time_threshold = datetime.now() - time_delta
|
||||||
|
return db.query(models.Archive.author_id,func.count().label('total'))\
|
||||||
|
.filter(models.Archive.created_at >= time_threshold)\
|
||||||
|
.group_by(models.Archive.author_id)\
|
||||||
|
.order_by(func.count().desc()).limit(5 * MAX_LIMIT).all()
|
||||||
|
|
||||||
def base_query(db: Session):
|
def base_query(db: Session):
|
||||||
# allow only some fields to be returned, for example author should remain hidden
|
# allow only some fields to be returned, for example author should remain hidden
|
||||||
|
|||||||
39
src/main.py
39
src/main.py
@@ -12,14 +12,15 @@ from loguru import logger
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from prometheus_fastapi_instrumentator import Instrumentator
|
from prometheus_fastapi_instrumentator import Instrumentator
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter, Gauge
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
import asyncio, json
|
import asyncio, json
|
||||||
|
import shutil
|
||||||
|
|
||||||
from worker import REDIS_EXCEPTIONS_CHANNEL, create_archive_task, create_sheet_task, celery, insert_result_into_db, Rdis
|
from worker import REDIS_EXCEPTIONS_CHANNEL, create_archive_task, create_sheet_task, celery, insert_result_into_db, Rdis
|
||||||
|
|
||||||
from db import crud, models, schemas
|
from db import crud, models, schemas
|
||||||
from db.database import engine, SessionLocal
|
from db.database import engine, SessionLocal, SQLALCHEMY_DATABASE_URL
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from security import get_user_auth, token_api_key_auth, bearer_security, get_token_or_user_auth
|
from security import get_user_auth, token_api_key_auth, bearer_security, get_token_or_user_auth
|
||||||
from auto_archiver import Metadata
|
from auto_archiver import Metadata
|
||||||
@@ -48,6 +49,7 @@ async def lifespan(app: FastAPI):
|
|||||||
logging.getLogger("uvicorn.access").disabled = True
|
logging.getLogger("uvicorn.access").disabled = True
|
||||||
asyncio.create_task(redis_subscribe_worker_exceptions())
|
asyncio.create_task(redis_subscribe_worker_exceptions())
|
||||||
asyncio.create_task(refresh_user_groups())
|
asyncio.create_task(refresh_user_groups())
|
||||||
|
asyncio.create_task(measure_disk_utilization())
|
||||||
|
|
||||||
yield # separates startup from shutdown instructions
|
yield # separates startup from shutdown instructions
|
||||||
|
|
||||||
@@ -206,6 +208,7 @@ def submit_manual_archive(manual:schemas.SubmitManual, auth = Depends(token_api_
|
|||||||
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
|
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
|
||||||
return JSONResponse({"id": archive_id})
|
return JSONResponse({"id": archive_id})
|
||||||
|
|
||||||
|
# --------- Prometheus metrics
|
||||||
|
|
||||||
WORKER_EXCEPTION = Counter(
|
WORKER_EXCEPTION = Counter(
|
||||||
"worker_exceptions_total",
|
"worker_exceptions_total",
|
||||||
@@ -220,4 +223,34 @@ async def redis_subscribe_worker_exceptions():
|
|||||||
if message and message["type"] == "message":
|
if message and message["type"] == "message":
|
||||||
data = json.loads(message["data"].decode("utf-8"))
|
data = json.loads(message["data"].decode("utf-8"))
|
||||||
WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
|
WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
DISK_UTILIZATION = Gauge(
|
||||||
|
"disk_utilization",
|
||||||
|
"Disk utilization in GB",
|
||||||
|
labelnames=("type",)
|
||||||
|
)
|
||||||
|
DATABASE_METRICS = Gauge(
|
||||||
|
"database_metrics",
|
||||||
|
"Useful database metrics from queries",
|
||||||
|
labelnames=("query", "user")
|
||||||
|
)
|
||||||
|
|
||||||
|
@repeat_every(seconds=15)
|
||||||
|
async def measure_disk_utilization():
|
||||||
|
_total, used, free = shutil.disk_usage("/")
|
||||||
|
DISK_UTILIZATION.labels(type="used").set(used / (2**30))
|
||||||
|
DISK_UTILIZATION.labels(type="free").set(free / (2**30))
|
||||||
|
try:
|
||||||
|
fs = os.stat(SQLALCHEMY_DATABASE_URL.replace("sqlite:///", ""))
|
||||||
|
DISK_UTILIZATION.labels(type="database").set(fs.st_size / (2**30))
|
||||||
|
except Exception as e: logger.info(e)
|
||||||
|
|
||||||
|
session: Session = next(get_db())
|
||||||
|
count_archives = crud.count_archives(session)
|
||||||
|
count_archive_urls = crud.count_archive_urls(session)
|
||||||
|
DATABASE_METRICS.labels(query="count_archives", user="-").set(count_archives)
|
||||||
|
DATABASE_METRICS.labels(query="count_archive_urls", user="-").set(count_archive_urls)
|
||||||
|
|
||||||
|
for user in crud.count_by_user_since(session):
|
||||||
|
DATABASE_METRICS.labels(query="count_by_user", user=user.author_id).set(user.total)
|
||||||
|
|||||||
Reference in New Issue
Block a user