implments cronjob to do the sheet archiving

This commit is contained in:
msramalho
2025-02-07 16:53:33 +00:00
parent fed8543c30
commit eccd71d168
7 changed files with 93 additions and 16 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import datetime
import logging
import alembic.config
from fastapi import FastAPI
@@ -6,10 +7,11 @@ from contextlib import asynccontextmanager
from fastapi_utils.tasks import repeat_every
from loguru import logger
from db import crud, models
from db.database import get_db, make_engine
from db import crud, models, schemas
from db.database import get_db, get_db_async, make_engine
from shared.settings import get_settings
from utils.metrics import measure_regular_metrics, redis_subscribe_worker_exceptions
from worker.main import create_sheet_task
@asynccontextmanager
@@ -20,13 +22,19 @@ async def lifespan(app: FastAPI):
engine = make_engine(get_settings().DATABASE_PATH)
models.Base.metadata.create_all(bind=engine)
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
# disabling uvicorn logger since we use loguru in logging_middleware
logging.getLogger("uvicorn.access").disabled = True
logging.getLogger("uvicorn.access").disabled = True # loguru
asyncio.create_task(redis_subscribe_worker_exceptions(get_settings().REDIS_EXCEPTIONS_CHANNEL, get_settings().CELERY_BROKER_URL))
asyncio.create_task(repeat_measure_regular_metrics())
with get_db() as db:
crud.upsert_user_groups(db)
# setup archive cronjobs
if get_settings().CRON_ARCHIVE_SHEETS:
asyncio.create_task(archive_hourly_sheets_cronjob())
asyncio.create_task(archive_daily_sheets_cronjob())
else:
logger.warning("[CRON] Sheet archive cronjobs are disabled.")
yield # separates startup from shutdown instructions
# SHUTDOWN
@@ -34,8 +42,27 @@ async def lifespan(app: FastAPI):
# CRON JOBS
@repeat_every(seconds=get_settings().REPEAT_COUNT_METRICS_SECONDS)
@repeat_every(seconds=get_settings().REPEAT_COUNT_METRICS_SECONDS, on_exception=logger.error)
async def repeat_measure_regular_metrics():
await measure_regular_metrics(get_settings().DATABASE_PATH, get_settings().REPEAT_COUNT_METRICS_SECONDS)
@repeat_every(seconds=60, wait_first=120, on_exception=logger.error)
async def archive_hourly_sheets_cronjob():
await archive_sheets_cronjob("hourly", 60, datetime.datetime.now().minute)
@repeat_every(seconds=3600, wait_first=120, on_exception=logger.error)
async def archive_daily_sheets_cronjob():
await archive_sheets_cronjob("daily", 24, datetime.datetime.now().hour)
async def archive_sheets_cronjob(frequency: str, interval: int, current_time_unit: int):
triggered_jobs = []
async with get_db_async() as db:
sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit)
for s in sheets:
task = create_sheet_task.apply_async(args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group=s.group_id).model_dump_json()])
triggered_jobs.append({"sheet_id": s.id, "task_id": task.id})
logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}")

View File

@@ -1,16 +1,17 @@
from collections import defaultdict
from functools import lru_cache
from sqlalchemy.orm import Session, load_only
from sqlalchemy import Column, or_, func
from sqlalchemy import Column, or_, func, select
from loguru import logger
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from core.config import ALLOW_ANY_EMAIL
from db.database import get_db
from shared.settings import get_settings
from shared.user_groups import UserGroups
from utils.misc import fnv1a_hash_mod
from . import models, schemas
import yaml
from sqlalchemy.ext.asyncio import AsyncSession
DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT
@@ -248,6 +249,18 @@ def get_user_sheet(db: Session, email: str, sheet_id: str) -> models.Sheet:
def get_user_sheets(db: Session, email: str) -> list[models.Sheet]:
return db.query(models.Sheet).filter(models.Sheet.author_id == email).order_by(models.Sheet.last_url_archived_at.desc()).all()
async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, id_hash: str) -> list[models.Sheet]:
result = await db.execute(
select(models.Sheet).filter(models.Sheet.frequency == frequency)
)
filtered = []
for sheet in result.scalars():
if fnv1a_hash_mod(sheet.id, modulo) == id_hash:
filtered.append(sheet)
return filtered
def update_sheet_last_url_archived_at(db: Session, sheet_id: str):
db_sheet = db.query(models.Sheet).filter(models.Sheet.id == sheet_id).first()
if db_sheet:
@@ -256,9 +269,10 @@ def update_sheet_last_url_archived_at(db: Session, sheet_id: str):
return True
return False
def delete_sheet(db: Session, sheet_id: str, email: str) -> bool:
db_sheet = db.query(models.Sheet).filter(models.Sheet.id == sheet_id, models.Sheet.author_id == email).first()
if db_sheet:
db.delete(db_sheet)
db.commit()
return db_sheet is not None
return db_sheet is not None

View File

@@ -2,7 +2,8 @@ from functools import lru_cache
from sqlalchemy import Engine, create_engine, event
from sqlalchemy.orm import sessionmaker
from shared.settings import get_settings
from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker
@lru_cache
@@ -34,3 +35,23 @@ def get_db_dependency():
# to use with Depends and ensure proper session closing
with get_db() as db:
yield db
# ASYNC connections
async def make_async_engine(database_url: str) -> AsyncEngine:
engine = create_async_engine(database_url, connect_args={"check_same_thread": False})
return engine
async def make_async_session_local(engine: AsyncEngine) -> AsyncSession:
return async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanager
async def get_db_async():
engine = await make_async_engine(get_settings().ASYNC_DATABASE_PATH)
async_session = await make_async_session_local(engine)
async with async_session() as session:
try: yield session
finally: await engine.dispose()

View File

@@ -17,7 +17,7 @@ default_router = APIRouter()
@default_router.get("/")
async def home(request: Request):
# TODO: maybe split into 2 routes: one non authenticated and one authenticated for the groups info only
# TODO: maybe split into 2 routes: one non authenticated and one authenticated for the groups info only, necessary only for the extension
status = {"version": VERSION, "breakingChanges": BREAKING_CHANGES}
try:
email = await get_user_auth(await bearer_security(request))

View File

@@ -67,7 +67,6 @@ def archive_user_sheet(
if not sheet:
raise HTTPException(status_code=403, detail="No access to this sheet.")
# TODO: what happens if user is taken out of group after sheet is created? this should be checked in a cronjob that notifies the user
if not user.in_group(sheet.group_id):
raise HTTPException(status_code=403, detail="User does not have access to this group.")

View File

@@ -14,9 +14,15 @@ class Settings(BaseSettings):
USER_GROUPS_FILENAME: str = "user-groups.yaml"
SHEET_ORCHESTRATION_YAML : str = "secrets/orchestration-sheet.yaml"
# cronjobs
CRON_ARCHIVE_SHEETS: bool = False
# database
DATABASE_PATH: str
DATABASE_QUERY_LIMIT: int = 100
@property
def ASYNC_DATABASE_PATH(self) -> str:
return self.DATABASE_PATH.replace("sqlite://", "sqlite+aiosqlite://")
# redis
CELERY_BROKER_URL: str = "redis://localhost:6379"
@@ -30,7 +36,7 @@ class Settings(BaseSettings):
API_BEARER_TOKEN: Annotated[str, Len(min_length=20)]
ALLOWED_ORIGINS: Annotated[set[str], Len(min_length=1)]
CHROME_APP_IDS: Annotated[set[Annotated[str, Len(min_length=10)]], Len(min_length=1)]
#TODO: deprecate blocklist
#TODO: deprecate blocklist?
BLOCKED_EMAILS: Annotated[Set[str], Len(min_length=0)] = set()
@lru_cache

View File

@@ -4,4 +4,14 @@ from fastapi.encoders import jsonable_encoder
def custom_jsonable_encoder(obj):
if isinstance(obj, bytes):
return base64.b64encode(obj).decode('utf-8')
return jsonable_encoder(obj)
return jsonable_encoder(obj)
def fnv1a_hash_mod(s: str, modulo:int) -> int:
# receives a string and returns a number in [0:modulo-1], ensures an even distribution over the modulo range
hash = 0x811c9dc5 # FNV offset basis
fnv_prime = 0x01000193 # FNV prime
for char in s:
hash ^= ord(char)
hash *= fnv_prime
hash &= 0xFFFFFFFF # Keep it 32-bit
return (hash if hash < 0x80000000 else hash - 0x100000000) % modulo