From 0d51e5cd65827aa4120640356ba53f9aabcd9a1e Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Sat, 8 Feb 2025 17:29:42 +0000 Subject: [PATCH] implements expiration policy via store_until --- src/core/events.py | 64 ++++++++++++++++++- src/db/crud.py | 36 +++++++---- src/db/models.py | 1 + src/db/schemas.py | 2 + ...7ed0_create_archives_store_until_column.py | 34 ++++++++++ src/shared/settings.py | 2 + src/worker/main.py | 13 +++- 7 files changed, 137 insertions(+), 15 deletions(-) create mode 100644 src/migrations/versions/02b2f6d17ed0_create_archives_store_until_column.py diff --git a/src/core/events.py b/src/core/events.py index 3a700af..d051e12 100644 --- a/src/core/events.py +++ b/src/core/events.py @@ -1,4 +1,5 @@ import asyncio +from collections import defaultdict import datetime import logging import alembic.config @@ -16,6 +17,7 @@ from fastapi_mail import FastMail, MessageSchema, MessageType celery = get_celery() + @asynccontextmanager async def lifespan(app: FastAPI): # see https://fastapi.tiangolo.com/advanced/events/#lifespan @@ -42,6 +44,11 @@ async def lifespan(app: FastAPI): else: logger.warning("[CRON] Delete stale sheets cronjob is disabled.") + if get_settings().CRON_DELETE_SCHEDULED_ARCHIVES: + asyncio.create_task(notify_about_expired_archives()) + else: + logger.warning("[CRON] Delete scheduled archives cronjob is disabled.") + wal_checkpoint() yield # separates startup from shutdown instructions @@ -72,13 +79,67 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni async with get_db_async() as db: sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit) for s in sheets: - task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async() triggered_jobs.append({"sheet_id": s.id, "task_id": task.id}) logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}") +# TODO: on exception should logerror but also prometheus counter +DELETE_WINDOW = get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS * 24 * 60 * 60 + +@repeat_every(seconds=DELETE_WINDOW, wait_first=180, on_exception=logger.error) +async def notify_about_expired_archives(): + notify_from = datetime.datetime.now() + datetime.timedelta(days=get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS) + async with get_db_async() as db: + scheduled_deletions = await crud.find_by_store_until(db, notify_from) + + user_archives = defaultdict(list) + for archive in scheduled_deletions: + user_archives[archive.author_id].append(archive) + + if user_archives: + fastmail = FastMail(get_settings().MAIL_CONFIG) + # notify users + for email in user_archives: + list_of_archives = "\n".join([f'{a.url},{a.id}
' for a in user_archives[email]]) + #TODO: how can users download them in bulk? + message = MessageSchema( + subject="Auto Archiver: Archives Scheduled for Deletion", + recipients=[email], + body=f""" + + +

Hi {email},

+

Some of your archives will be deleted in the next {get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS} days, as they are reaching their expiration date according to our retention policy for their groups.

+

If you want to preserve any, make sure to download them now.

+

Here is a CSV list of URLs:

+ + url,archive_id
+ {list_of_archives} +
+

Best,
The Auto Archiver team

+ + + """, + subtype=MessageType.html + ) + await fastmail.send_message(message) + logger.info(f"[CRON] Email sent to {email} about {len(user_archives[email])} scheduled archives deletion.") + + # now schedule the deletion event + asyncio.create_task(delete_expired_archives()) + + +@repeat_every(max_repetitions=1, wait_first=DELETE_WINDOW - (60 * 60), seconds=0, on_exception=logger.error) +async def delete_expired_archives(): + async with get_db_async() as db: + count_deleted = await crud.soft_delete_expired_archives(db) + if count_deleted: + logger.info(f"[CRON] Deleted {count_deleted} archives.") + + + @repeat_every(seconds=86400, wait_first=150, on_exception=logger.error) async def delete_stale_sheets(): STALE_DAYS = get_settings().DELETE_STALE_SHEETS_DAYS @@ -112,4 +173,3 @@ async def delete_stale_sheets(): ) await fastmail.send_message(message) logger.info(f"[CRON] Email sent to {email} about stale sheets deletion.") - diff --git a/src/db/crud.py b/src/db/crud.py index d984a4f..0742ca7 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -15,12 +15,17 @@ from sqlalchemy.ext.asyncio import AsyncSession DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT -# --------------- TASK = Archive - def get_limit(user_limit: int): return max(1, min(user_limit, DATABASE_QUERY_LIMIT)) +# --------------- TASK = Archive + +def base_query(db: Session): + # NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed + return db.query(models.Archive)\ + .filter(models.Archive.deleted == False)\ + .options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result, models.Archive.store_until)) def get_archive(db: Session, id: str, email: str): query = base_query(db).filter(models.Archive.id == id) @@ -29,8 +34,7 @@ def get_archive(db: Session, id: str, email: str): query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups))) return query.first() - -def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False): +def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False)-> list[models.Archive]: # searches for partial URLs, if email is * no ownership filtering happens query = base_query(db) if email != ALLOW_ANY_EMAIL: @@ -52,7 +56,7 @@ def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int #TODO: rename task to archive def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]) -> models.Archive: - db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id) + db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id, store_until=task.store_until) db_task.tags = tags db_task.urls = urls db.add(db_task) @@ -90,13 +94,21 @@ def count_by_user_since(db: Session, seconds_delta: int = 15): .order_by(func.count().desc())\ .limit(500).all() +async def find_by_store_until(db: AsyncSession, store_until_is_before:datetime) -> dict: + res = await db.execute( + select(models.Archive) + .filter(models.Archive.deleted ==False, models.Archive.store_until < store_until_is_before) + ) + return res.scalars() -def base_query(db: Session): - # NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed - return db.query(models.Archive)\ - .filter(models.Archive.deleted == False)\ - .options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result)) - +async def soft_delete_expired_archives(db: AsyncSession) -> dict: + to_delete = await find_by_store_until(db, datetime.now()) + counter = 0 + for archive in to_delete: + archive.deleted = True + counter += 1 + await db.commit() + return counter # --------------- TAG @@ -269,7 +281,7 @@ async def delete_stale_sheets(db: AsyncSession, inactivity_days: int) -> dict: for sheet in result.scalars(): await db.delete(sheet) deleted[sheet.author_id].append(sheet) - await db.commit() + await db.commit() return dict(deleted) def update_sheet_last_url_archived_at(db: Session, sheet_id: str): diff --git a/src/db/models.py b/src/db/models.py index 41d2c3c..5447531 100644 --- a/src/db/models.py +++ b/src/db/models.py @@ -37,6 +37,7 @@ class Archive(Base): deleted = Column(Boolean, default=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + store_until = Column(DateTime(timezone=True), default=None) group_id = Column(String, ForeignKey("groups.id"), default=None) author_id = Column(String, ForeignKey("users.email")) diff --git a/src/db/schemas.py b/src/db/schemas.py index 850fa3a..aff5730 100644 --- a/src/db/schemas.py +++ b/src/db/schemas.py @@ -38,6 +38,7 @@ class ArchiveResult(BaseModel): url: str result: dict created_at: datetime + store_until: datetime | None class Task(BaseModel): @@ -82,6 +83,7 @@ class ArchiveCreate(ArchiveTrigger): result: dict | None = None sheet_id: str | None = None urls: list | None = None + store_until: datetime | None = None class Archive(ArchiveCreate): created_at: datetime diff --git a/src/migrations/versions/02b2f6d17ed0_create_archives_store_until_column.py b/src/migrations/versions/02b2f6d17ed0_create_archives_store_until_column.py new file mode 100644 index 0000000..d00fa2c --- /dev/null +++ b/src/migrations/versions/02b2f6d17ed0_create_archives_store_until_column.py @@ -0,0 +1,34 @@ +"""create archives.store_until column + +Revision ID: 02b2f6d17ed0 +Revises: 1636724ec4b1 +Create Date: 2025-02-08 15:22:20.392522 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '02b2f6d17ed0' +down_revision = '1636724ec4b1' +branch_labels = None +depends_on = None +STORE_UNTIL_COL = "store_until" + + +def upgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + columns = [col['name'] for col in inspector.get_columns('archives')] + + if STORE_UNTIL_COL not in columns: + op.add_column('archives', sa.Column(STORE_UNTIL_COL, sa.DateTime(), nullable=True, default=None)) + + +def downgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + columns = [col['name'] for col in inspector.get_columns('archives')] + if STORE_UNTIL_COL in columns: + op.drop_column('archives', STORE_UNTIL_COL) diff --git a/src/shared/settings.py b/src/shared/settings.py index f7383dd..8a7cf31 100644 --- a/src/shared/settings.py +++ b/src/shared/settings.py @@ -20,6 +20,8 @@ class Settings(BaseSettings): CRON_ARCHIVE_SHEETS: bool = False CRON_DELETE_STALE_SHEETS: bool = True DELETE_STALE_SHEETS_DAYS: int = 14 + CRON_DELETE_SCHEDULED_ARCHIVES: bool = True + DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS: int = 14 # database DATABASE_PATH: str diff --git a/src/worker/main.py b/src/worker/main.py index 940158b..df8ffb8 100644 --- a/src/worker/main.py +++ b/src/worker/main.py @@ -37,6 +37,8 @@ def create_archive_task(self, archive_json: str): assert result, f"UNABLE TO archive: {archive.url}" # prepare and insert in DB + store_until = get_store_until(archive.group_id) + archive.store_until = store_until archive.id = self.request.id archive.urls = get_all_urls(result) archive.result = json.loads(result.to_json()) @@ -64,7 +66,8 @@ def create_sheet_task(self, sheet_json: str): id=models.generate_uuid(), result=json.loads(result.to_json()), sheet_id=sheet.sheet_id, - urls=get_all_urls(result) + urls=get_all_urls(result), + store_until = get_store_until(sheet.group_id) ) insert_result_into_db(archive) stats["archived"] += 1 @@ -122,6 +125,14 @@ def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]: return db_urls +def get_store_until(group_id: str) -> datetime.datetime: + with get_db() as session: + group = crud.get_group(session, group_id) + max_lifespan = group.permissions.get("max_archive_lifespan_months", -1) + if max_lifespan == -1: return None + + return datetime.datetime.now() + datetime.timedelta(days=30 * max_lifespan) + # TODO: this should live within the auto-archiver?? def convert_if_media(media): if isinstance(media, Media): return media