mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-13 05:58:35 +03:00
implements expiration policy via store_until
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
from collections import defaultdict
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import alembic.config
|
import alembic.config
|
||||||
@@ -16,6 +17,7 @@ from fastapi_mail import FastMail, MessageSchema, MessageType
|
|||||||
|
|
||||||
celery = get_celery()
|
celery = get_celery()
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# see https://fastapi.tiangolo.com/advanced/events/#lifespan
|
# see https://fastapi.tiangolo.com/advanced/events/#lifespan
|
||||||
@@ -42,6 +44,11 @@ async def lifespan(app: FastAPI):
|
|||||||
else:
|
else:
|
||||||
logger.warning("[CRON] Delete stale sheets cronjob is disabled.")
|
logger.warning("[CRON] Delete stale sheets cronjob is disabled.")
|
||||||
|
|
||||||
|
if get_settings().CRON_DELETE_SCHEDULED_ARCHIVES:
|
||||||
|
asyncio.create_task(notify_about_expired_archives())
|
||||||
|
else:
|
||||||
|
logger.warning("[CRON] Delete scheduled archives cronjob is disabled.")
|
||||||
|
|
||||||
wal_checkpoint()
|
wal_checkpoint()
|
||||||
|
|
||||||
yield # separates startup from shutdown instructions
|
yield # separates startup from shutdown instructions
|
||||||
@@ -72,13 +79,67 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni
|
|||||||
async with get_db_async() as db:
|
async with get_db_async() as db:
|
||||||
sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit)
|
sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit)
|
||||||
for s in sheets:
|
for s in sheets:
|
||||||
|
|
||||||
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async()
|
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async()
|
||||||
|
|
||||||
triggered_jobs.append({"sheet_id": s.id, "task_id": task.id})
|
triggered_jobs.append({"sheet_id": s.id, "task_id": task.id})
|
||||||
logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}")
|
logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: on exception should logerror but also prometheus counter
|
||||||
|
DELETE_WINDOW = get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS * 24 * 60 * 60
|
||||||
|
|
||||||
|
@repeat_every(seconds=DELETE_WINDOW, wait_first=180, on_exception=logger.error)
|
||||||
|
async def notify_about_expired_archives():
|
||||||
|
notify_from = datetime.datetime.now() + datetime.timedelta(days=get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS)
|
||||||
|
async with get_db_async() as db:
|
||||||
|
scheduled_deletions = await crud.find_by_store_until(db, notify_from)
|
||||||
|
|
||||||
|
user_archives = defaultdict(list)
|
||||||
|
for archive in scheduled_deletions:
|
||||||
|
user_archives[archive.author_id].append(archive)
|
||||||
|
|
||||||
|
if user_archives:
|
||||||
|
fastmail = FastMail(get_settings().MAIL_CONFIG)
|
||||||
|
# notify users
|
||||||
|
for email in user_archives:
|
||||||
|
list_of_archives = "\n".join([f'{a.url},{a.id}<br/>' for a in user_archives[email]])
|
||||||
|
#TODO: how can users download them in bulk?
|
||||||
|
message = MessageSchema(
|
||||||
|
subject="Auto Archiver: Archives Scheduled for Deletion",
|
||||||
|
recipients=[email],
|
||||||
|
body=f"""
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<p>Hi {email},</p>
|
||||||
|
<p>Some of your archives will be deleted in the next {get_settings().DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS} days, as they are reaching their expiration date according to our retention policy for their groups.</p>
|
||||||
|
<p>If you want to preserve any, make sure to download them now.</p>
|
||||||
|
<p>Here is a CSV list of URLs:</p>
|
||||||
|
<code>
|
||||||
|
url,archive_id<br/>
|
||||||
|
{list_of_archives}
|
||||||
|
</code>
|
||||||
|
<p>Best,<br>The Auto Archiver team</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
""",
|
||||||
|
subtype=MessageType.html
|
||||||
|
)
|
||||||
|
await fastmail.send_message(message)
|
||||||
|
logger.info(f"[CRON] Email sent to {email} about {len(user_archives[email])} scheduled archives deletion.")
|
||||||
|
|
||||||
|
# now schedule the deletion event
|
||||||
|
asyncio.create_task(delete_expired_archives())
|
||||||
|
|
||||||
|
|
||||||
|
@repeat_every(max_repetitions=1, wait_first=DELETE_WINDOW - (60 * 60), seconds=0, on_exception=logger.error)
|
||||||
|
async def delete_expired_archives():
|
||||||
|
async with get_db_async() as db:
|
||||||
|
count_deleted = await crud.soft_delete_expired_archives(db)
|
||||||
|
if count_deleted:
|
||||||
|
logger.info(f"[CRON] Deleted {count_deleted} archives.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@repeat_every(seconds=86400, wait_first=150, on_exception=logger.error)
|
@repeat_every(seconds=86400, wait_first=150, on_exception=logger.error)
|
||||||
async def delete_stale_sheets():
|
async def delete_stale_sheets():
|
||||||
STALE_DAYS = get_settings().DELETE_STALE_SHEETS_DAYS
|
STALE_DAYS = get_settings().DELETE_STALE_SHEETS_DAYS
|
||||||
@@ -112,4 +173,3 @@ async def delete_stale_sheets():
|
|||||||
)
|
)
|
||||||
await fastmail.send_message(message)
|
await fastmail.send_message(message)
|
||||||
logger.info(f"[CRON] Email sent to {email} about stale sheets deletion.")
|
logger.info(f"[CRON] Email sent to {email} about stale sheets deletion.")
|
||||||
|
|
||||||
|
|||||||
@@ -15,12 +15,17 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||||||
|
|
||||||
DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT
|
DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT
|
||||||
|
|
||||||
# --------------- TASK = Archive
|
|
||||||
|
|
||||||
|
|
||||||
def get_limit(user_limit: int):
|
def get_limit(user_limit: int):
|
||||||
return max(1, min(user_limit, DATABASE_QUERY_LIMIT))
|
return max(1, min(user_limit, DATABASE_QUERY_LIMIT))
|
||||||
|
|
||||||
|
# --------------- TASK = Archive
|
||||||
|
|
||||||
|
def base_query(db: Session):
|
||||||
|
# NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed
|
||||||
|
return db.query(models.Archive)\
|
||||||
|
.filter(models.Archive.deleted == False)\
|
||||||
|
.options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result, models.Archive.store_until))
|
||||||
|
|
||||||
def get_archive(db: Session, id: str, email: str):
|
def get_archive(db: Session, id: str, email: str):
|
||||||
query = base_query(db).filter(models.Archive.id == id)
|
query = base_query(db).filter(models.Archive.id == id)
|
||||||
@@ -29,8 +34,7 @@ def get_archive(db: Session, id: str, email: str):
|
|||||||
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
||||||
return query.first()
|
return query.first()
|
||||||
|
|
||||||
|
def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False)-> list[models.Archive]:
|
||||||
def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False):
|
|
||||||
# searches for partial URLs, if email is * no ownership filtering happens
|
# searches for partial URLs, if email is * no ownership filtering happens
|
||||||
query = base_query(db)
|
query = base_query(db)
|
||||||
if email != ALLOW_ANY_EMAIL:
|
if email != ALLOW_ANY_EMAIL:
|
||||||
@@ -52,7 +56,7 @@ def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int
|
|||||||
|
|
||||||
#TODO: rename task to archive
|
#TODO: rename task to archive
|
||||||
def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]) -> models.Archive:
|
def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]) -> models.Archive:
|
||||||
db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id)
|
db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id, store_until=task.store_until)
|
||||||
db_task.tags = tags
|
db_task.tags = tags
|
||||||
db_task.urls = urls
|
db_task.urls = urls
|
||||||
db.add(db_task)
|
db.add(db_task)
|
||||||
@@ -90,13 +94,21 @@ def count_by_user_since(db: Session, seconds_delta: int = 15):
|
|||||||
.order_by(func.count().desc())\
|
.order_by(func.count().desc())\
|
||||||
.limit(500).all()
|
.limit(500).all()
|
||||||
|
|
||||||
|
async def find_by_store_until(db: AsyncSession, store_until_is_before:datetime) -> dict:
|
||||||
|
res = await db.execute(
|
||||||
|
select(models.Archive)
|
||||||
|
.filter(models.Archive.deleted ==False, models.Archive.store_until < store_until_is_before)
|
||||||
|
)
|
||||||
|
return res.scalars()
|
||||||
|
|
||||||
def base_query(db: Session):
|
async def soft_delete_expired_archives(db: AsyncSession) -> dict:
|
||||||
# NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed
|
to_delete = await find_by_store_until(db, datetime.now())
|
||||||
return db.query(models.Archive)\
|
counter = 0
|
||||||
.filter(models.Archive.deleted == False)\
|
for archive in to_delete:
|
||||||
.options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result))
|
archive.deleted = True
|
||||||
|
counter += 1
|
||||||
|
await db.commit()
|
||||||
|
return counter
|
||||||
# --------------- TAG
|
# --------------- TAG
|
||||||
|
|
||||||
|
|
||||||
@@ -269,7 +281,7 @@ async def delete_stale_sheets(db: AsyncSession, inactivity_days: int) -> dict:
|
|||||||
for sheet in result.scalars():
|
for sheet in result.scalars():
|
||||||
await db.delete(sheet)
|
await db.delete(sheet)
|
||||||
deleted[sheet.author_id].append(sheet)
|
deleted[sheet.author_id].append(sheet)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
return dict(deleted)
|
return dict(deleted)
|
||||||
|
|
||||||
def update_sheet_last_url_archived_at(db: Session, sheet_id: str):
|
def update_sheet_last_url_archived_at(db: Session, sheet_id: str):
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ class Archive(Base):
|
|||||||
deleted = Column(Boolean, default=False)
|
deleted = Column(Boolean, default=False)
|
||||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||||
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
||||||
|
store_until = Column(DateTime(timezone=True), default=None)
|
||||||
|
|
||||||
group_id = Column(String, ForeignKey("groups.id"), default=None)
|
group_id = Column(String, ForeignKey("groups.id"), default=None)
|
||||||
author_id = Column(String, ForeignKey("users.email"))
|
author_id = Column(String, ForeignKey("users.email"))
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ class ArchiveResult(BaseModel):
|
|||||||
url: str
|
url: str
|
||||||
result: dict
|
result: dict
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
|
store_until: datetime | None
|
||||||
|
|
||||||
|
|
||||||
class Task(BaseModel):
|
class Task(BaseModel):
|
||||||
@@ -82,6 +83,7 @@ class ArchiveCreate(ArchiveTrigger):
|
|||||||
result: dict | None = None
|
result: dict | None = None
|
||||||
sheet_id: str | None = None
|
sheet_id: str | None = None
|
||||||
urls: list | None = None
|
urls: list | None = None
|
||||||
|
store_until: datetime | None = None
|
||||||
|
|
||||||
class Archive(ArchiveCreate):
|
class Archive(ArchiveCreate):
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
"""create archives.store_until column
|
||||||
|
|
||||||
|
Revision ID: 02b2f6d17ed0
|
||||||
|
Revises: 1636724ec4b1
|
||||||
|
Create Date: 2025-02-08 15:22:20.392522
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '02b2f6d17ed0'
|
||||||
|
down_revision = '1636724ec4b1'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
STORE_UNTIL_COL = "store_until"
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
inspector = sa.inspect(conn)
|
||||||
|
columns = [col['name'] for col in inspector.get_columns('archives')]
|
||||||
|
|
||||||
|
if STORE_UNTIL_COL not in columns:
|
||||||
|
op.add_column('archives', sa.Column(STORE_UNTIL_COL, sa.DateTime(), nullable=True, default=None))
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
inspector = sa.inspect(conn)
|
||||||
|
columns = [col['name'] for col in inspector.get_columns('archives')]
|
||||||
|
if STORE_UNTIL_COL in columns:
|
||||||
|
op.drop_column('archives', STORE_UNTIL_COL)
|
||||||
@@ -20,6 +20,8 @@ class Settings(BaseSettings):
|
|||||||
CRON_ARCHIVE_SHEETS: bool = False
|
CRON_ARCHIVE_SHEETS: bool = False
|
||||||
CRON_DELETE_STALE_SHEETS: bool = True
|
CRON_DELETE_STALE_SHEETS: bool = True
|
||||||
DELETE_STALE_SHEETS_DAYS: int = 14
|
DELETE_STALE_SHEETS_DAYS: int = 14
|
||||||
|
CRON_DELETE_SCHEDULED_ARCHIVES: bool = True
|
||||||
|
DELETE_SCHEDULED_ARCHIVES_NOTIFY_DAYS: int = 14
|
||||||
|
|
||||||
# database
|
# database
|
||||||
DATABASE_PATH: str
|
DATABASE_PATH: str
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ def create_archive_task(self, archive_json: str):
|
|||||||
assert result, f"UNABLE TO archive: {archive.url}"
|
assert result, f"UNABLE TO archive: {archive.url}"
|
||||||
|
|
||||||
# prepare and insert in DB
|
# prepare and insert in DB
|
||||||
|
store_until = get_store_until(archive.group_id)
|
||||||
|
archive.store_until = store_until
|
||||||
archive.id = self.request.id
|
archive.id = self.request.id
|
||||||
archive.urls = get_all_urls(result)
|
archive.urls = get_all_urls(result)
|
||||||
archive.result = json.loads(result.to_json())
|
archive.result = json.loads(result.to_json())
|
||||||
@@ -64,7 +66,8 @@ def create_sheet_task(self, sheet_json: str):
|
|||||||
id=models.generate_uuid(),
|
id=models.generate_uuid(),
|
||||||
result=json.loads(result.to_json()),
|
result=json.loads(result.to_json()),
|
||||||
sheet_id=sheet.sheet_id,
|
sheet_id=sheet.sheet_id,
|
||||||
urls=get_all_urls(result)
|
urls=get_all_urls(result),
|
||||||
|
store_until = get_store_until(sheet.group_id)
|
||||||
)
|
)
|
||||||
insert_result_into_db(archive)
|
insert_result_into_db(archive)
|
||||||
stats["archived"] += 1
|
stats["archived"] += 1
|
||||||
@@ -122,6 +125,14 @@ def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]:
|
|||||||
return db_urls
|
return db_urls
|
||||||
|
|
||||||
|
|
||||||
|
def get_store_until(group_id: str) -> datetime.datetime:
|
||||||
|
with get_db() as session:
|
||||||
|
group = crud.get_group(session, group_id)
|
||||||
|
max_lifespan = group.permissions.get("max_archive_lifespan_months", -1)
|
||||||
|
if max_lifespan == -1: return None
|
||||||
|
|
||||||
|
return datetime.datetime.now() + datetime.timedelta(days=30 * max_lifespan)
|
||||||
|
|
||||||
# TODO: this should live within the auto-archiver??
|
# TODO: this should live within the auto-archiver??
|
||||||
def convert_if_media(media):
|
def convert_if_media(media):
|
||||||
if isinstance(media, Media): return media
|
if isinstance(media, Media): return media
|
||||||
|
|||||||
Reference in New Issue
Block a user