isolating api methods, introducing first tests

This commit is contained in:
msramalho
2024-10-17 11:27:35 +01:00
parent e5486898ad
commit 3432d35af6
26 changed files with 2340 additions and 1223 deletions

3
.gitignore vendored
View File

@@ -13,4 +13,5 @@ redis/data/*
#temp
tests
src/user-groups.yaml
wit*
wit*
src/crawls

View File

@@ -1,6 +1,6 @@
# Auto Archiver API
An api that uses celery workers to process URL archive requests via [bellingcat/auto-archiver](https://github.com/bellingcat/auto-archiver), it allows authentication via Google OAuth Apps an d enables CORS, everything runs on docker but development can be done without docker (except for redis).
An api that uses celery workers to process URL archive requests via [bellingcat/auto-archiver](https://github.com/bellingcat/auto-archiver), it allows authentication via Google OAuth Apps and enables CORS, everything runs on docker but development can be done without docker (except for redis).
## Development

View File

@@ -1,5 +1,3 @@
version: '3.8'
services:
web:
restart: "no"

View File

@@ -9,8 +9,6 @@ x-base-setup: &base-setup
CELERY_BROKER_URL: *broker-url
CELERY_RESULT_BACKEND: *broker-url
version: '3.8'
volumes:
crawls:
@@ -34,6 +32,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- crawls:/crawls # BROWSERTRIX_HOME_HOST:BROWSERTRIX_HOME_CONTAINER, do not change /crawls
environment:
# celery broker-url needs to be duplicated here, do not remove
CELERY_BROKER_URL: *broker-url
CELERY_RESULT_BACKEND: *broker-url
WACZ_ENABLE_DOCKER: 1 # Enable calling docker from this container

View File

@@ -7,8 +7,8 @@ WORKDIR /app
RUN curl -fsSL https://get.docker.com -o get-docker.sh && \
sh get-docker.sh
# set environment variables
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# install dependencies
RUN pip install --upgrade pip && \

View File

@@ -23,7 +23,8 @@ auto-archiver = "*"
[dev-packages]
watchdog = "*"
pytest = "==6.2.4"
pytest = "*"
httpx = "*"
[requires]
python_version = "3.10"

2920
src/Pipfile.lock generated

File diff suppressed because it is too large Load Diff

0
src/core/__init__.py Normal file
View File

27
src/core/config.py Normal file
View File

@@ -0,0 +1,27 @@
import os
from dotenv import load_dotenv
load_dotenv()
VERSION = "0.7.0"
API_DESCRIPTION = """
#### API for the Auto-Archiver project, a tool to archive web pages and Google Sheets.
**Usage notes:**
- The API requires a Bearer token for most operations, which you can obtain by logging in with your Google account.
- You can use this API to archive single URLs or entire Google Sheets.
- Once you submit a URL or Sheet for archiving, the API will return a task_id that you can use to check the status of the archiving process. It works asynchronously.
"""
ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",")
BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."}
SERVE_LOCAL_ARCHIVE = os.environ.get("SERVE_LOCAL_ARCHIVE", "")
SQLALCHEMY_DATABASE_URL = os.environ.get("DATABASE_PATH")
REPEAT_COUNT_METRICS_SECONDS = 15
CHROME_APP_IDS = set([app_id.strip() for app_id in os.environ.get("CHROME_APP_IDS", "").split(",")])
BLOCKED_EMAILS = set([e.strip().lower() for e in os.environ.get("BLOCKED_EMAILS", "").split(",")])

44
src/core/events.py Normal file
View File

@@ -0,0 +1,44 @@
import asyncio
import logging
import alembic.config
from fastapi import FastAPI
from sqlalchemy.orm import Session
from contextlib import asynccontextmanager
from fastapi_utils.tasks import repeat_every
from loguru import logger
from db import crud, models
from db.database import get_db, engine
from utils.metrics import measure_regular_metrics, redis_subscribe_worker_exceptions
from core.config import REPEAT_COUNT_METRICS_SECONDS
@asynccontextmanager
async def lifespan(app: FastAPI):
# see https://fastapi.tiangolo.com/advanced/events/#lifespan
# STARTUP
models.Base.metadata.create_all(bind=engine)
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
# disabling uvicorn logger since we use loguru in logging_middleware
logging.getLogger("uvicorn.access").disabled = True
asyncio.create_task(redis_subscribe_worker_exceptions())
asyncio.create_task(refresh_user_groups())
asyncio.create_task(measure_regular_metrics())
yield # separates startup from shutdown instructions
# SHUTDOWN
logger.info("shutting down")
# CRON JOBS
@repeat_every(seconds=60 * 60) # 1 hour
async def refresh_user_groups():
db: Session = next(get_db())
crud.upsert_user_groups(db)
@repeat_every(seconds=REPEAT_COUNT_METRICS_SECONDS)
async def repeat_measure_regular_metrics():
measure_regular_metrics()

15
src/core/logging.py Normal file
View File

@@ -0,0 +1,15 @@
from loguru import logger
from fastapi import Request
from utils.metrics import EXCEPTION_COUNTER
# logging configurations
logger.add("logs/api_logs.log", retention="30 days", rotation="3 days")
async def logging_middleware(request: Request, call_next):
try:
response = await call_next(request)
logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}")
return response
except Exception as e:
EXCEPTION_COUNTER.labels(type(e).__name__).inc()
raise e

View File

@@ -15,7 +15,7 @@ MAX_LIMIT = 100
# --------------- TASK = Archive
def get_task(db: Session, task_id: str, email: str):
def get_archive(db: Session, task_id: str, email: str):
email = email.lower()
query = base_query(db).filter(models.Archive.id == task_id)
if email != ALLOW_ANY_EMAIL:
@@ -24,7 +24,7 @@ def get_task(db: Session, task_id: str, email: str):
return query.first()
def search_tasks_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False):
def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False):
# searches for partial URLs, if email is * no ownership filtering happens
query = base_query(db)
if email != ALLOW_ANY_EMAIL:
@@ -42,7 +42,7 @@ def search_tasks_by_url(db: Session, url: str, email: str, skip: int = 0, limit:
return query.order_by(models.Archive.created_at.desc()).offset(skip).limit(min(limit, MAX_LIMIT)).all()
def search_tasks_by_email(db: Session, email: str, skip: int = 0, limit: int = 100):
def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int = 100):
email = email.lower()
return base_query(db).filter(models.Archive.author.has(email=email)).offset(skip).limit(min(limit, MAX_LIMIT)).all()

View File

@@ -1,10 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from sqlalchemy.orm import sessionmaker, declarative_base
from core.config import SQLALCHEMY_DATABASE_URL
SQLALCHEMY_DATABASE_URL = os.environ.get("DATABASE_PATH")#"sqlite:///./auto-archiver.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
@@ -13,3 +10,8 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
session = SessionLocal()
try: yield session
finally: session.close()

View File

@@ -19,9 +19,7 @@ class Archive(ArchiveCreate):
updated_at: datetime | None
deleted: bool
class Config:
orm_mode = True
model_config = { "from_attributes": True }
class SubmitSheet(BaseModel):
sheet_name: str | None = None
@@ -39,3 +37,13 @@ class SubmitManual(BaseModel):
author_id: str | None = None
group_id: str | None = None
tags: set | None = set()
class Task(BaseModel):
id: str
class TaskResult(Task):
status: str
result: str
class TaskDelete(Task):
deleted: bool

View File

@@ -0,0 +1,5 @@
from endpoints.default import default_router
from endpoints.url import url_router
from endpoints.task import task_router
from endpoints.interoperability import interoperability_router
from endpoints.sheet import sheet_router

36
src/endpoints/default.py Normal file
View File

@@ -0,0 +1,36 @@
from fastapi import APIRouter, Depends, Request, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from loguru import logger
from sqlalchemy.orm import Session
from core.config import VERSION, BREAKING_CHANGES
from db import crud
from db.database import get_db
from security import get_user_auth, bearer_security
default_router = APIRouter()
@default_router.get("/")
async def home(request: Request):
# TODO: maybe split into 2 routes: one non authenticated and one authenticated for the groups info only
status = {"version": VERSION, "breakingChanges": BREAKING_CHANGES}
try:
email = await get_user_auth(await bearer_security(request))
db: Session = next(get_db())
status["groups"] = crud.get_user_groups(db, email)
except HTTPException: pass # not authenticated is fine
except Exception as e: logger.error(e)
return JSONResponse(status)
@default_router.get("/groups", response_model=list[str])
def get_user_groups(db: Session = Depends(get_db), email=Depends(get_user_auth)):
return crud.get_user_groups(db, email)
@default_router.get('/favicon.ico', include_in_schema=False)
async def favicon():
return FileResponse("static/favicon.ico")

View File

@@ -0,0 +1,26 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from auto_archiver import Metadata
from loguru import logger
import sqlalchemy
from security import token_api_key_auth
from db import models, schemas
from worker import insert_result_into_db
interoperability_router = APIRouter(prefix="/interop", tags=["Interoperability endpoints."])
# ----- endpoint to submit data archived elsewhere
@interoperability_router.post("/submit-archive", status_code=201, summary="Submit a manual archive entry, for data that was archived elsewhere.")
def submit_manual_archive(manual: schemas.SubmitManual, auth=Depends(token_api_key_auth)):
result = Metadata.from_json(manual.result)
logger.info(f"MANUAL SUBMIT {result.get_url()} {manual.author_id}")
manual.tags.add("manual")
try:
archive_id = insert_result_into_db(result, manual.tags, manual.public, manual.group_id, manual.author_id, models.generate_uuid())
except sqlalchemy.exc.IntegrityError as e:
logger.error(e)
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
return JSONResponse({"id": archive_id})

23
src/endpoints/sheet.py Normal file
View File

@@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from loguru import logger
from security import ALLOW_ANY_EMAIL, get_token_or_user_auth
from db import schemas
from worker import create_sheet_task
sheet_router = APIRouter(prefix="/sheet", tags=["Google Spreadsheet operations"])
@sheet_router.post("/archive", status_code=201, summary="Submit a Google Sheet archive request, starts a sheet archiving task.", response_model=schemas.Task, response_description="task_id for the archiving task.")
def archive_sheet(sheet:schemas.SubmitSheet, email = Depends(get_token_or_user_auth)):
logger.info(f"SHEET TASK for {sheet=}")
if email == ALLOW_ANY_EMAIL:
email = sheet.author_id or "api-endpoint"
sheet.author_id = email
if not sheet.sheet_name and not sheet.sheet_id:
raise HTTPException(status_code=422, detail=f"sheet name or id is required")
task = create_sheet_task.delay(sheet.model_dump_json())
return JSONResponse({"id": task.id})

47
src/endpoints/task.py Normal file
View File

@@ -0,0 +1,47 @@
import traceback
from celery.result import AsyncResult
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from loguru import logger
from security import get_token_or_user_auth
from db import schemas
from worker import celery
task_router = APIRouter(prefix="/task", tags=["Async task operations"])
@task_router.get("/{task_id}", response_model=schemas.TaskResult, summary="Check the status of an async task by its id, works for URLs and Sheet tasks.")
def get_status(task_id, email=Depends(get_token_or_user_auth)):
logger.info(f"status check for user {email} task {task_id}")
task = AsyncResult(task_id, app=celery)
try:
if task.status == "FAILURE":
# *FAILURE* The task raised an exception, or has exceeded the retry limit.
# The :attr:`result` attribute then contains the exception raised by the task.
# https://docs.celeryq.dev/en/stable/_modules/celery/result.html#AsyncResult
raise task.result
# TODO: refactor to use schema?
# response = schemas.TaskResult(id=task_id, status=task.status, result=task.result)
response = {
"id": task_id,
"status": task.status,
"result": task.result
}
return JSONResponse(jsonable_encoder(response, exclude_unset=True))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
# TODO: refactor to use schema?
# response = schemas.TaskResult(id=task_id, status="FAILURE", result={"error": str(e)})
return JSONResponse({
"id": task_id,
"status": "FAILURE",
"result": {"error": str(e)}
})

57
src/endpoints/url.py Normal file
View File

@@ -0,0 +1,57 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from datetime import datetime
from loguru import logger
from security import get_user_auth, get_token_or_user_auth
from sqlalchemy.orm import Session
from db import crud, schemas
from db.database import get_db
from worker import create_archive_task
url_router = APIRouter(prefix="/url", tags=["Single URL operations"])
@url_router.post("/archive", status_code=201, summary="Submit a single URL archive request, starts an archiving task.", response_model=schemas.Task, response_description="task_id for the archiving task, will match the archive id.")
def archive_url(archive: schemas.ArchiveCreate, email=Depends(get_token_or_user_auth)):
archive.author_id = email
url = archive.url
logger.info(f"new {archive.public=} task for {email=} and {archive.group_id=}: {url}")
if type(url) != str or len(url) <= 5:
raise HTTPException(status_code=422, detail=f"Invalid URL received: {url}")
logger.info("creating task")
task = create_archive_task.delay(archive.model_dump_json())
task_response = schemas.Task(id=task.id)
return JSONResponse(task_response.model_dump())
@url_router.get("/search", response_model=list[schemas.Archive], summary="Search for archive entries by URL.")
def search_by_url(
url: str, skip: int = 0, limit: int = 25,
archived_after: datetime = None, archived_before: datetime = None,
db: Session = Depends(get_db),
email=Depends(get_token_or_user_auth)):
return crud.search_archives_by_url(db, url.strip(), email, skip=skip, limit=limit, archived_after=archived_after, archived_before=archived_before)
@url_router.get("/latest", response_model=list[schemas.Archive], summary="Fetch latest URL archives for the authenticated user.")
def latest(skip: int = 0, limit: int = 25, db: Session = Depends(get_db), email=Depends(get_user_auth)):
return crud.search_archives_by_email(db, email, skip=skip, limit=limit)
@url_router.get("/{id}", response_model=schemas.Archive, summary="Fetch a single URL archive by the associated id.")
def lookup(id, db: Session = Depends(get_db), email=Depends(get_token_or_user_auth)):
return crud.get_archive(db, id, email)
@url_router.delete("/{id}", response_model=schemas.TaskDelete, summary="Delete a single URL archive by id.")
def delete_task(id, db: Session = Depends(get_db), email=Depends(get_user_auth)):
logger.info(f"deleting url archive task {id} request by {email}")
#TODO: use response model?
return JSONResponse({
"id": id,
"deleted": crud.soft_delete_task(db, id, email)
})

View File

@@ -1,63 +1,41 @@
import traceback, os
from celery.result import AsyncResult
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi import FastAPI, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi_utils.tasks import repeat_every
import alembic.config
from dotenv import load_dotenv
import traceback, os, logging
from loguru import logger
from prometheus_fastapi_instrumentator import Instrumentator
from datetime import datetime
import sqlalchemy
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter, Gauge
from contextlib import asynccontextmanager
import asyncio, json
import shutil
from sqlalchemy.orm import Session
from loguru import logger
from worker import REDIS_EXCEPTIONS_CHANNEL, create_archive_task, create_sheet_task, celery, insert_result_into_db, Rdis
from core.logging import logging_middleware
from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db
from db import crud, models, schemas
from db.database import engine, SessionLocal, SQLALCHEMY_DATABASE_URL
from sqlalchemy.orm import Session
from security import get_user_auth, token_api_key_auth, bearer_security, get_token_or_user_auth
from security import get_user_auth, token_api_key_auth, get_token_or_user_auth
from core.config import ALLOWED_ORIGINS, VERSION, SERVE_LOCAL_ARCHIVE, API_DESCRIPTION
from db.database import get_db
from core.events import lifespan
from auto_archiver import Metadata
from endpoints import default_router, url_router, sheet_router, task_router, interoperability_router
load_dotenv()
# Configuration
ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",")
VERSION = "0.6.3"
# min-version refers to the version of auto-archiver-extension on the webstore
BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."}
@repeat_every(seconds=60 * 60) # 1 hour
async def refresh_user_groups():
db: Session = next(get_db())
crud.upsert_user_groups(db)
app = FastAPI(
title="Auto-Archiver API",
description=API_DESCRIPTION,
version=VERSION,
contact={"name": "GitHub", "url": "https://github.com/bellingcat/auto-archiver-api"},
lifespan=lifespan
)
@asynccontextmanager
async def lifespan(app: FastAPI):
# see https://fastapi.tiangolo.com/advanced/events/#lifespan
# STARTUP
models.Base.metadata.create_all(bind=engine)
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
# disabling uvicorn logger since we use loguru in logging_middleware
logging.getLogger("uvicorn.access").disabled = True
asyncio.create_task(redis_subscribe_worker_exceptions())
asyncio.create_task(refresh_user_groups())
asyncio.create_task(measure_regular_metrics())
yield # separates startup from shutdown instructions
# SHUTDOWN
logger.info("shutting down")
app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}, lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
@@ -66,83 +44,56 @@ app.add_middleware(
allow_headers=["*"],
)
EXCEPTION_COUNTER = Counter(
"exceptions",
"Number of times a certain exception has occurred.",
labelnames=("types",)
)
app.include_router(default_router)
app.include_router(url_router)
app.include_router(sheet_router)
app.include_router(task_router)
app.include_router(interoperability_router)
# prometheus exposed in /metrics with authentication
Instrumentator(should_group_status_codes=False, excluded_handlers=["/metrics"]).instrument(app).expose(app, dependencies=[Depends(token_api_key_auth)])
app.mount("/static", StaticFiles(directory="static"), name="static")
# used mostly for development in combination with local_archive
SERVE_LOCAL_ARCHIVE = os.environ.get("SERVE_LOCAL_ARCHIVE", "")
if len(SERVE_LOCAL_ARCHIVE) > 1 and os.path.isdir(SERVE_LOCAL_ARCHIVE):
logger.info(f"mounting local archive {SERVE_LOCAL_ARCHIVE}")
app.mount(SERVE_LOCAL_ARCHIVE, StaticFiles(directory=SERVE_LOCAL_ARCHIVE), name=SERVE_LOCAL_ARCHIVE)
def get_db():
session = SessionLocal()
try: yield session
finally: session.close()
app.middleware("http")(logging_middleware)
# -----Submit URL and manipulate tasks. Bearer protected below
# logging configurations
logger.add("logs/api_logs.log", retention="30 days", rotation="3 days")
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
try:
response = await call_next(request)
logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}")
return response
except Exception as e:
EXCEPTION_COUNTER.labels(type(e).__name__).inc()
raise e
@app.get("/")
async def home(request: Request):
status = {"version": VERSION, "breakingChanges": BREAKING_CHANGES}
try:
# if authenticated will load available groups
email = await get_user_auth(await bearer_security(request))
db: Session = next(get_db())
status["groups"] = crud.get_user_groups(db, email)
except HTTPException: pass
except Exception as e: logger.error(e)
return JSONResponse(status)
@app.get("/tasks/search-url", response_model=list[schemas.Archive], deprecated=True) # DEPRECATED
def search_by_url(url: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, db: Session = Depends(get_db), email=Depends(get_token_or_user_auth)):
return crud.search_archives_by_url(db, url.strip(), email, skip=skip, limit=limit, archived_after=archived_after, archived_before=archived_before)
#-----Submit URL and manipulate tasks. Bearer protected below
@app.get("/tasks/sync", response_model=list[schemas.Archive], deprecated=True) # DEPRECATED
def search(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), email=Depends(get_user_auth)):
return crud.search_archives_by_email(db, email, skip=skip, limit=limit)
@app.get("/groups", response_model=list[str])
def get_user_groups(db: Session = Depends(get_db), email = Depends(get_user_auth)):
return crud.get_user_groups(db, email)
@app.get("/tasks/search-url", response_model=list[schemas.Archive])
def search_by_url(url:str, skip: int = 0, limit: int = 100, archived_after:datetime=None, archived_before:datetime=None, db: Session = Depends(get_db), email = Depends(get_token_or_user_auth)):
return crud.search_tasks_by_url(db, url.strip(), email, skip=skip, limit=limit, archived_after=archived_after, archived_before=archived_before)
@app.get("/tasks/sync", response_model=list[schemas.Archive])
def search(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), email = Depends(get_user_auth)):
return crud.search_tasks_by_email(db, email, skip=skip, limit=limit)
@app.post("/tasks", status_code=201)
def archive_tasks(archive:schemas.ArchiveCreate, email = Depends(get_token_or_user_auth)):
@app.post("/tasks", status_code=201, deprecated=True) # DEPRECATED
def archive_tasks(archive: schemas.ArchiveCreate, email=Depends(get_token_or_user_auth)):
archive.author_id = email
url = archive.url
logger.info(f"new {archive.public=} task for {email=} and {archive.group_id=}: {url}")
if type(url)!=str or len(url)<=5:
if type(url) != str or len(url) <= 5:
raise HTTPException(status_code=422, detail=f"Invalid URL received: {url}")
logger.info("creating task")
task = create_archive_task.delay(archive.json())
task = create_archive_task.delay(archive.model_dump_json())
return JSONResponse({"id": task.id})
@app.get("/archive/{task_id}")
def lookup(task_id, db: Session = Depends(get_db), email = Depends(get_token_or_user_auth)):
return crud.get_task(db, task_id, email)
@app.get("/tasks/{task_id}")
def get_status(task_id, email = Depends(get_token_or_user_auth)):
@app.get("/archive/{task_id}", deprecated=True) # DEPRECATED
def lookup(task_id, db: Session = Depends(get_db), email=Depends(get_token_or_user_auth)):
return crud.get_archive(db, task_id, email)
@app.get("/tasks/{task_id}", deprecated=True) # DEPRECATED
def get_status(task_id, email=Depends(get_token_or_user_auth)):
logger.info(f"status check for user {email} task {task_id}")
task = AsyncResult(task_id, app=celery)
try:
@@ -168,36 +119,42 @@ def get_status(task_id, email = Depends(get_token_or_user_auth)):
"result": {"error": str(e)}
})
@app.delete("/tasks/{task_id}")
def delete_task(task_id, db: Session = Depends(get_db), email = Depends(get_user_auth)):
@app.delete("/tasks/{task_id}", deprecated=True) # DEPRECATED
def delete_task(task_id, db: Session = Depends(get_db), email=Depends(get_user_auth)):
logger.info(f"deleting task {task_id} request by {email}")
return JSONResponse({
"id": task_id,
"deleted": crud.soft_delete_task(db, task_id, email)
})
#----- Google Sheets Logic
@app.post("/sheet", status_code=201)
def archive_sheet(sheet:schemas.SubmitSheet, email = Depends(get_user_auth)):
# ----- Google Sheets Logic
@app.post("/sheet", status_code=201, deprecated=True) # DEPRECATED
def archive_sheet(sheet: schemas.SubmitSheet, email=Depends(get_user_auth)):
logger.info(f"SHEET TASK for {sheet=}")
sheet.author_id = email
if not sheet.sheet_name and not sheet.sheet_id:
raise HTTPException(status_code=422, detail=f"sheet name or id is required")
task = create_sheet_task.delay(sheet.json())
task = create_sheet_task.delay(sheet.model_dump_json())
return JSONResponse({"id": task.id})
@app.post("/sheet_service", status_code=201)
def archive_sheet_service(sheet:schemas.SubmitSheet, auth = Depends(token_api_key_auth)):
@app.post("/sheet_service", status_code=201, deprecated=True) # DEPRECATED
def archive_sheet_service(sheet: schemas.SubmitSheet, auth=Depends(token_api_key_auth)):
logger.info(f"SHEET TASK for {sheet=}")
sheet.author_id = sheet.author_id or "api-endpoint"
if not sheet.sheet_name and not sheet.sheet_id:
raise HTTPException(status_code=422, detail=f"sheet name or id is required")
task = create_sheet_task.delay(sheet.json())
task = create_sheet_task.delay(sheet.model_dump_json())
return JSONResponse({"id": task.id})
#----- endpoint to submit data archived elsewhere
@app.post("/submit-archive", status_code=201)
def submit_manual_archive(manual:schemas.SubmitManual, auth = Depends(token_api_key_auth)):
# ----- endpoint to submit data archived elsewhere
@app.post("/submit-archive", status_code=201, deprecated=True) # DEPRECATED
def submit_manual_archive(manual: schemas.SubmitManual, auth=Depends(token_api_key_auth)):
result = Metadata.from_json(manual.result)
logger.info(f"MANUAL SUBMIT {result.get_url()} {manual.author_id}")
manual.tags.add("manual")
@@ -207,51 +164,3 @@ def submit_manual_archive(manual:schemas.SubmitManual, auth = Depends(token_api_
logger.error(e)
raise HTTPException(status_code=422, detail=f"Cannot insert into DB due to integrity error")
return JSONResponse({"id": archive_id})
# --------- Prometheus metrics
WORKER_EXCEPTION = Counter(
"worker_exceptions_total",
"Number of times a certain exception has occurred on the worker.",
labelnames=("exception", "task",)
)
async def redis_subscribe_worker_exceptions():
PubSubExceptions = Rdis.pubsub()
PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL)
while True:
message = PubSubExceptions.get_message()
if message and message["type"] == "message":
data = json.loads(message["data"].decode("utf-8"))
WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
await asyncio.sleep(1)
DISK_UTILIZATION = Gauge(
"disk_utilization",
"Disk utilization in GB",
labelnames=("type",)
)
DATABASE_METRICS = Gauge(
"database_metrics",
"Useful database metrics from queries",
labelnames=("query", "user")
)
REPEAT_COUNT_METRICS_SECONDS = 15
@repeat_every(seconds=REPEAT_COUNT_METRICS_SECONDS)
async def measure_regular_metrics():
_total, used, free = shutil.disk_usage("/")
DISK_UTILIZATION.labels(type="used").set(used / (2**30))
DISK_UTILIZATION.labels(type="free").set(free / (2**30))
try:
fs = os.stat(SQLALCHEMY_DATABASE_URL.replace("sqlite:///", ""))
DISK_UTILIZATION.labels(type="database").set(fs.st_size / (2**30))
except Exception as e: logger.error(e)
session: Session = next(get_db())
count_archives = crud.count_archives(session)
count_archive_urls = crud.count_archive_urls(session)
DATABASE_METRICS.labels(query="count_archives", user="-").set(count_archives)
DATABASE_METRICS.labels(query="count_archive_urls", user="-").set(count_archive_urls)
for user in crud.count_by_user_since(session, REPEAT_COUNT_METRICS_SECONDS):
DATABASE_METRICS.labels(query="count_by_user", user=user.author_id).set(user.total)

View File

@@ -2,18 +2,16 @@ from loguru import logger
import requests, os, secrets
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.config import CHROME_APP_IDS, BLOCKED_EMAILS
# Configuration
CHROME_APP_IDS = set([app_id.strip() for app_id in os.environ.get("CHROME_APP_IDS", "").split(",")])
# Configuration checks
assert len(CHROME_APP_IDS) > 0, "CHROME_APP_IDS env variable not properly set, it's a csv"
for app_id in CHROME_APP_IDS:
assert len(app_id) > 10, f"CHROME_APP_IDS got invalid id: {app_id} env variable not set"
logger.info(f"{CHROME_APP_IDS=}")
BLOCKED_EMAILS = set([e.strip().lower() for e in os.environ.get("BLOCKED_EMAILS", "").split(",")])
logger.info(f"{len(BLOCKED_EMAILS)=}")
# Auth logic
bearer_security = HTTPBearer()
ALLOW_ANY_EMAIL = "*"

BIN
src/static/favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

0
src/utils/__init__.py Normal file
View File

66
src/utils/metrics.py Normal file
View File

@@ -0,0 +1,66 @@
import asyncio
import json
import os
import shutil
from loguru import logger
from prometheus_client import Counter, Gauge
from sqlalchemy.orm import Session
from core.config import REPEAT_COUNT_METRICS_SECONDS
from db import crud
from db.database import get_db
from core.config import SQLALCHEMY_DATABASE_URL
from worker import REDIS_EXCEPTIONS_CHANNEL, Rdis
# Custom metrics
EXCEPTION_COUNTER = Counter(
"exceptions",
"Number of times a certain exception has occurred.",
labelnames=("types",)
)
WORKER_EXCEPTION = Counter(
"worker_exceptions_total",
"Number of times a certain exception has occurred on the worker.",
labelnames=("exception", "task",)
)
DISK_UTILIZATION = Gauge(
"disk_utilization",
"Disk utilization in GB",
labelnames=("type",)
)
DATABASE_METRICS = Gauge(
"database_metrics",
"Useful database metrics from queries",
labelnames=("query", "user")
)
async def redis_subscribe_worker_exceptions():
# Subscribe to Redis channel and increment the counter for each exception with info on the exception and task
PubSubExceptions = Rdis.pubsub()
PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL)
while True:
message = PubSubExceptions.get_message()
if message and message["type"] == "message":
data = json.loads(message["data"].decode("utf-8"))
WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
await asyncio.sleep(1)
async def measure_regular_metrics():
_total, used, free = shutil.disk_usage("/")
DISK_UTILIZATION.labels(type="used").set(used / (2**30))
DISK_UTILIZATION.labels(type="free").set(free / (2**30))
try:
fs = os.stat(SQLALCHEMY_DATABASE_URL.replace("sqlite:///", ""))
DISK_UTILIZATION.labels(type="database").set(fs.st_size / (2**30))
except Exception as e: logger.error(e)
session: Session = next(get_db())
count_archives = crud.count_archives(session)
count_archive_urls = crud.count_archive_urls(session)
DATABASE_METRICS.labels(query="count_archives", user="-").set(count_archives)
DATABASE_METRICS.labels(query="count_archive_urls", user="-").set(count_archive_urls)
for user in crud.count_by_user_since(session, REPEAT_COUNT_METRICS_SECONDS):
DATABASE_METRICS.labels(query="count_by_user", user=user.author_id).set(user.total)

View File

@@ -1,9 +1,9 @@
import os, traceback, yaml, datetime
import os, traceback, yaml, datetime, sys
from typing import List, Set
from celery import Celery
from celery.signals import task_failure
from celery.signals import task_failure, worker_init
from auto_archiver import Config, ArchivingOrchestrator, Metadata
from auto_archiver.core import Media
from loguru import logger
@@ -42,7 +42,7 @@ def create_archive_task(self, archive_json: str):
if not archive.rearchive:
with get_db() as session:
archives = crud.search_tasks_by_url(session, url, archive.author_id, absolute_search=True)
archives = crud.search_archives_by_url(session, url, archive.author_id, absolute_search=True)
if len(archives):
logger.info(f"Skipping {url=} as it was already archived")
return Metadata.choose_most_complete([a.result for a in archives])
@@ -212,6 +212,9 @@ def redis_publish_exception(exception, task_name):
logger.error(f"Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
# INIT
ORCHESTRATORS = {}
load_orchestrators()
@worker_init.connect
def at_start(sender, **kwargs):
global ORCHESTRATORS
ORCHESTRATORS = {}
load_orchestrators()
logger.info("Orchestrators loaded successfully.")