import asyncio import datetime import logging from collections import defaultdict from contextlib import asynccontextmanager import alembic.config from fastapi import FastAPI from fastapi_mail import FastMail, MessageSchema, MessageType from fastapi_utils.tasks import repeat_every from app.shared import schemas from app.shared.db import models from app.shared.db.database import ( get_db, get_db_async, make_engine, wal_checkpoint, ) from app.shared.log import logger from app.shared.settings import get_settings from app.shared.task_messaging import get_celery from app.shared.utils.sheets import get_sheet_access_error from app.web.db import crud from app.web.middleware import increase_exceptions_counter from app.web.utils.metrics import ( measure_regular_metrics, redis_subscribe_worker_exceptions, ) celery = get_celery() # Throttle cache: track when each sheet was last notified about missing # permissions so users are not spammed on every cron cycle. _sheet_no_access_notified: dict[str, datetime.datetime] = {} _NOTIFY_COOLDOWN = datetime.timedelta(hours=24) @asynccontextmanager async def lifespan(app: FastAPI): # see https://fastapi.tiangolo.com/advanced/events/#lifespan # STARTUP engine = make_engine(get_settings().DATABASE_PATH) models.Base.metadata.create_all(bind=engine) alembic.config.main( prog="alembic", argv=[ "-c", "./app/migrations/alembic.ini", "--raiseerr", "upgrade", "head", ], ) logging.getLogger("uvicorn.access").disabled = True # loguru asyncio.create_task( redis_subscribe_worker_exceptions( get_settings().REDIS_EXCEPTIONS_CHANNEL ) ) asyncio.create_task(repeat_measure_regular_metrics()) with get_db() as db: crud.upsert_user_groups(db) # setup archive cronjobs if get_settings().CRON_ARCHIVE_SHEETS: asyncio.create_task(archive_hourly_sheets_cronjob()) asyncio.create_task(archive_daily_sheets_cronjob()) else: logger.warning("[CRON] Sheet archive cronjobs are disabled.") if get_settings().CRON_DELETE_STALE_SHEETS: asyncio.create_task(delete_stale_sheets()) else: logger.warning("[CRON] Delete stale sheets cronjob is disabled.") if get_settings().CRON_DELETE_SCHEDULED_ARCHIVES: asyncio.create_task(notify_about_expired_archives()) else: logger.warning("[CRON] Delete scheduled archives cronjob is disabled.") wal_checkpoint() yield # separates startup from shutdown instructions # SHUTDOWN logger.info("shutting down") # CRON JOBS @repeat_every( seconds=get_settings().REPEAT_COUNT_METRICS_SECONDS, on_exception=increase_exceptions_counter, ) async def repeat_measure_regular_metrics(): await measure_regular_metrics( get_settings().DATABASE_PATH, get_settings().REPEAT_COUNT_METRICS_SECONDS, ) @repeat_every( seconds=60, wait_first=120, on_exception=increase_exceptions_counter ) async def archive_hourly_sheets_cronjob(): await archive_sheets_cronjob("hourly", 60, datetime.datetime.now().minute) @repeat_every( seconds=3600, wait_first=120, on_exception=increase_exceptions_counter ) async def archive_daily_sheets_cronjob(): await archive_sheets_cronjob("daily", 24, datetime.datetime.now().hour) async def archive_sheets_cronjob( frequency: str, interval: int, current_time_unit: int ): triggered_jobs = [] no_access_sheets: dict[str, list[tuple]] = defaultdict(list) async with get_db_async() as db: sheets = await crud.get_sheets_by_id_hash( db, frequency, str(interval), current_time_unit ) for s in sheets: # Check if service account has write access to the sheet group = await db.get(models.Group, s.group_id) if group and group.orchestrator_sheet: access_error = get_sheet_access_error( group.orchestrator_sheet, group.service_account_email, s.id, ) if access_error: no_access_sheets[s.author_id].append( (s, group.service_account_email or "") ) logger.warning( f"[CRON] Skipping sheet {s.id}: not shared with " f"service account {group.service_account_email}" ) continue group_queue = await crud.get_group_priority_async(db, s.group_id) task = celery.signature( "create_sheet_task", args=[ schemas.SubmitSheet( sheet_id=s.id, author_id=s.author_id, group_id=s.group_id, ).model_dump_json() ], ).apply_async(**group_queue) triggered_jobs.append({"sheet_id": s.id, "task_id": task.id}) if no_access_sheets: await _notify_sheet_permission_issues(no_access_sheets) logger.debug( f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}" ) async def _notify_sheet_permission_issues( no_access_sheets: dict[str, list[tuple]], ): """ Send email notifications to users whose sheets are not shared with the Auto Archiver service account. Throttled to at most one email per sheet per 24 hours to avoid spamming. """ now = datetime.datetime.now() fastmail = FastMail(get_settings().mail_config) for email, sheet_infos in no_access_sheets.items(): # Filter to sheets that haven't been notified recently sheets_to_notify = [] for s, sa_email in sheet_infos: last = _sheet_no_access_notified.get(s.id) if not last or (now - last) >= _NOTIFY_COOLDOWN: sheets_to_notify.append((s, sa_email)) _sheet_no_access_notified[s.id] = now if not sheets_to_notify: continue list_of_sheets = "\n".join( [ f'
{sa_email}Hi {email},
The following sheets could not be archived because they have not been shared with the Auto Archiver service account with Editor permissions:
Please open each sheet, click Share, and add the service account email shown above as an Editor. The sheets will be archived automatically on the next scheduled run once access is granted.
Best,
The Auto Archiver team
Hi {email},
Some of your archives will be deleted in the next {get_settings().DELETE_SCHEDULED_ARCHIVES_CHECK_EVERY_N_DAYS} days, as they are reaching their expiration date according to our retention policy for their groups.
If you want to preserve any, make sure to download them now.
Here is a CSV list of URLs:
url,archive_id,time_of_deletion
{list_of_archives}
Best,
The Auto Archiver team
Hi {email},
Your stale sheets have been removed from our system as no new URL was archived in the past {STALE_DAYS} days:
You can always re-add them at https://auto-archiver.bellingcat.com/.
Best,
The Auto Archiver team