diff --git a/src/core/events.py b/src/core/events.py index e9bbfbc..03865d9 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import alembic.config from fastapi import FastAPI @@ -6,10 +7,11 @@ from contextlib import asynccontextmanager from fastapi_utils.tasks import repeat_every from loguru import logger -from db import crud, models -from db.database import get_db, make_engine +from db import crud, models, schemas +from db.database import get_db, get_db_async, make_engine from shared.settings import get_settings from utils.metrics import measure_regular_metrics, redis_subscribe_worker_exceptions +from worker.main import create_sheet_task @asynccontextmanager @@ -20,13 +22,19 @@ async def lifespan(app: FastAPI): engine = make_engine(get_settings().DATABASE_PATH) models.Base.metadata.create_all(bind=engine) alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) - # disabling uvicorn logger since we use loguru in logging_middleware - logging.getLogger("uvicorn.access").disabled = True + logging.getLogger("uvicorn.access").disabled = True # loguru asyncio.create_task(redis_subscribe_worker_exceptions(get_settings().REDIS_EXCEPTIONS_CHANNEL, get_settings().CELERY_BROKER_URL)) asyncio.create_task(repeat_measure_regular_metrics()) with get_db() as db: crud.upsert_user_groups(db) + # setup archive cronjobs + if get_settings().CRON_ARCHIVE_SHEETS: + asyncio.create_task(archive_hourly_sheets_cronjob()) + asyncio.create_task(archive_daily_sheets_cronjob()) + else: + logger.warning("[CRON] Sheet archive cronjobs are disabled.") + yield # separates startup from shutdown instructions # SHUTDOWN @@ -34,8 +42,27 @@ async def lifespan(app: FastAPI): # CRON JOBS - - -@repeat_every(seconds=get_settings().REPEAT_COUNT_METRICS_SECONDS) +@repeat_every(seconds=get_settings().REPEAT_COUNT_METRICS_SECONDS, on_exception=logger.error) async def repeat_measure_regular_metrics(): await measure_regular_metrics(get_settings().DATABASE_PATH, get_settings().REPEAT_COUNT_METRICS_SECONDS) + + +@repeat_every(seconds=60, wait_first=120, on_exception=logger.error) +async def archive_hourly_sheets_cronjob(): + await archive_sheets_cronjob("hourly", 60, datetime.datetime.now().minute) + + +@repeat_every(seconds=3600, wait_first=120, on_exception=logger.error) +async def archive_daily_sheets_cronjob(): + await archive_sheets_cronjob("daily", 24, datetime.datetime.now().hour) + + +async def archive_sheets_cronjob(frequency: str, interval: int, current_time_unit: int): + triggered_jobs = [] + + async with get_db_async() as db: + sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit) + for s in sheets: + task = create_sheet_task.apply_async(args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group=s.group_id).model_dump_json()]) + triggered_jobs.append({"sheet_id": s.id, "task_id": task.id}) + logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}") diff --git a/src/db/crud.py b/src/db/crud.py index 0ede4f0..069277d 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -1,16 +1,17 @@ from collections import defaultdict from functools import lru_cache from sqlalchemy.orm import Session, load_only -from sqlalchemy import Column, or_, func +from sqlalchemy import Column, or_, func, select from loguru import logger -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from core.config import ALLOW_ANY_EMAIL from db.database import get_db from shared.settings import get_settings from shared.user_groups import UserGroups +from utils.misc import fnv1a_hash_mod from . import models, schemas -import yaml +from sqlalchemy.ext.asyncio import AsyncSession DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT @@ -248,6 +249,18 @@ def get_user_sheet(db: Session, email: str, sheet_id: str) -> models.Sheet: def get_user_sheets(db: Session, email: str) -> list[models.Sheet]: return db.query(models.Sheet).filter(models.Sheet.author_id == email).order_by(models.Sheet.last_url_archived_at.desc()).all() + + +async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, id_hash: str) -> list[models.Sheet]: + result = await db.execute( + select(models.Sheet).filter(models.Sheet.frequency == frequency) + ) + filtered = [] + for sheet in result.scalars(): + if fnv1a_hash_mod(sheet.id, modulo) == id_hash: + filtered.append(sheet) + return filtered + def update_sheet_last_url_archived_at(db: Session, sheet_id: str): db_sheet = db.query(models.Sheet).filter(models.Sheet.id == sheet_id).first() if db_sheet: @@ -256,9 +269,10 @@ def update_sheet_last_url_archived_at(db: Session, sheet_id: str): return True return False + def delete_sheet(db: Session, sheet_id: str, email: str) -> bool: db_sheet = db.query(models.Sheet).filter(models.Sheet.id == sheet_id, models.Sheet.author_id == email).first() if db_sheet: db.delete(db_sheet) db.commit() - return db_sheet is not None \ No newline at end of file + return db_sheet is not None diff --git a/src/db/database.py b/src/db/database.py index 1166099..e72dc15 100644 --- a/src/db/database.py +++ b/src/db/database.py @@ -2,7 +2,8 @@ from functools import lru_cache from sqlalchemy import Engine, create_engine, event from sqlalchemy.orm import sessionmaker from shared.settings import get_settings -from contextlib import contextmanager +from contextlib import asynccontextmanager, contextmanager +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker @lru_cache @@ -34,3 +35,23 @@ def get_db_dependency(): # to use with Depends and ensure proper session closing with get_db() as db: yield db + +# ASYNC connections + + +async def make_async_engine(database_url: str) -> AsyncEngine: + engine = create_async_engine(database_url, connect_args={"check_same_thread": False}) + return engine + + +async def make_async_session_local(engine: AsyncEngine) -> AsyncSession: + return async_sessionmaker(engine, expire_on_commit=False) + + +@asynccontextmanager +async def get_db_async(): + engine = await make_async_engine(get_settings().ASYNC_DATABASE_PATH) + async_session = await make_async_session_local(engine) + async with async_session() as session: + try: yield session + finally: await engine.dispose() diff --git a/src/endpoints/default.py b/src/endpoints/default.py index 8838d2a..ceef172 100644 --- a/src/endpoints/default.py +++ b/src/endpoints/default.py @@ -17,7 +17,7 @@ default_router = APIRouter() @default_router.get("/") async def home(request: Request): - # TODO: maybe split into 2 routes: one non authenticated and one authenticated for the groups info only + # TODO: maybe split into 2 routes: one non authenticated and one authenticated for the groups info only, necessary only for the extension status = {"version": VERSION, "breakingChanges": BREAKING_CHANGES} try: email = await get_user_auth(await bearer_security(request)) diff --git a/src/endpoints/sheet.py b/src/endpoints/sheet.py index 72f37a5..b1a4230 100644 --- a/src/endpoints/sheet.py +++ b/src/endpoints/sheet.py @@ -67,7 +67,6 @@ def archive_user_sheet( if not sheet: raise HTTPException(status_code=403, detail="No access to this sheet.") - # TODO: what happens if user is taken out of group after sheet is created? this should be checked in a cronjob that notifies the user if not user.in_group(sheet.group_id): raise HTTPException(status_code=403, detail="User does not have access to this group.") diff --git a/src/shared/settings.py b/src/shared/settings.py index 8fa5ae5..0a2aec5 100644 --- a/src/shared/settings.py +++ b/src/shared/settings.py @@ -14,9 +14,15 @@ class Settings(BaseSettings): USER_GROUPS_FILENAME: str = "user-groups.yaml" SHEET_ORCHESTRATION_YAML : str = "secrets/orchestration-sheet.yaml" + # cronjobs + CRON_ARCHIVE_SHEETS: bool = False + # database DATABASE_PATH: str DATABASE_QUERY_LIMIT: int = 100 + @property + def ASYNC_DATABASE_PATH(self) -> str: + return self.DATABASE_PATH.replace("sqlite://", "sqlite+aiosqlite://") # redis CELERY_BROKER_URL: str = "redis://localhost:6379" @@ -30,7 +36,7 @@ class Settings(BaseSettings): API_BEARER_TOKEN: Annotated[str, Len(min_length=20)] ALLOWED_ORIGINS: Annotated[set[str], Len(min_length=1)] CHROME_APP_IDS: Annotated[set[Annotated[str, Len(min_length=10)]], Len(min_length=1)] - #TODO: deprecate blocklist + #TODO: deprecate blocklist? BLOCKED_EMAILS: Annotated[Set[str], Len(min_length=0)] = set() @lru_cache diff --git a/src/utils/misc.py b/src/utils/misc.py index f3a9803..4f94a63 100644 --- a/src/utils/misc.py +++ b/src/utils/misc.py @@ -4,4 +4,14 @@ from fastapi.encoders import jsonable_encoder def custom_jsonable_encoder(obj): if isinstance(obj, bytes): return base64.b64encode(obj).decode('utf-8') - return jsonable_encoder(obj) \ No newline at end of file + return jsonable_encoder(obj) + +def fnv1a_hash_mod(s: str, modulo:int) -> int: + # receives a string and returns a number in [0:modulo-1], ensures an even distribution over the modulo range + hash = 0x811c9dc5 # FNV offset basis + fnv_prime = 0x01000193 # FNV prime + for char in s: + hash ^= ord(char) + hash *= fnv_prime + hash &= 0xFFFFFFFF # Keep it 32-bit + return (hash if hash < 0x80000000 else hash - 0x100000000) % modulo \ No newline at end of file