mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-08 03:28:35 +03:00
introduces low/high priority queue and custom concurrency
This commit is contained in:
@@ -33,3 +33,6 @@ MAIL_PORT=587
|
||||
MAIL_STARTTLS=False
|
||||
MAIL_SSL_TLS=True
|
||||
|
||||
|
||||
# celery workers config
|
||||
CONCURRENCY=2
|
||||
@@ -5,13 +5,17 @@ import redis
|
||||
|
||||
from app.shared.settings import get_settings
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_celery(name:str="") -> Celery:
|
||||
def get_celery(name: str = "") -> Celery:
|
||||
return Celery(
|
||||
name,
|
||||
broker_url=get_settings().CELERY_BROKER_URL,
|
||||
result_backend=get_settings().CELERY_BROKER_URL,
|
||||
broker_connection_retry_on_startup=False
|
||||
broker_connection_retry_on_startup=False,
|
||||
broker_transport_options={
|
||||
'queue_order_strategy': 'priority',
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from app.shared.db import models
|
||||
from app.shared.settings import get_settings
|
||||
from app.shared.user_groups import UserGroups
|
||||
from app.shared.utils.misc import fnv1a_hash_mod
|
||||
from app.web.utils.misc import convert_priority_to_queue_dict
|
||||
|
||||
DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT
|
||||
|
||||
@@ -21,12 +22,14 @@ def get_limit(user_limit: int):
|
||||
|
||||
# --------------- TASK = Archive
|
||||
|
||||
|
||||
def base_query(db: Session):
|
||||
# NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed
|
||||
return db.query(models.Archive)\
|
||||
.filter(models.Archive.deleted == False)\
|
||||
.options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result, models.Archive.store_until))
|
||||
|
||||
|
||||
def get_archive(db: Session, id: str, email: str):
|
||||
query = base_query(db).filter(models.Archive.id == id)
|
||||
if email != ALLOW_ANY_EMAIL:
|
||||
@@ -34,7 +37,8 @@ def get_archive(db: Session, id: str, email: str):
|
||||
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
||||
return query.first()
|
||||
|
||||
def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False)-> list[models.Archive]:
|
||||
|
||||
def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False) -> list[models.Archive]:
|
||||
# searches for partial URLs, if email is * no ownership filtering happens
|
||||
query = base_query(db)
|
||||
if email != ALLOW_ANY_EMAIL:
|
||||
@@ -84,13 +88,15 @@ def count_by_user_since(db: Session, seconds_delta: int = 15):
|
||||
.order_by(func.count().desc())\
|
||||
.limit(500).all()
|
||||
|
||||
async def find_by_store_until(db: AsyncSession, store_until_is_before:datetime) -> dict:
|
||||
res = await db.execute(
|
||||
|
||||
async def find_by_store_until(db: AsyncSession, store_until_is_before: datetime) -> dict:
|
||||
res = await db.execute(
|
||||
select(models.Archive)
|
||||
.filter(models.Archive.deleted ==False, models.Archive.store_until < store_until_is_before)
|
||||
.filter(models.Archive.deleted == False, models.Archive.store_until < store_until_is_before)
|
||||
)
|
||||
return res.scalars()
|
||||
|
||||
|
||||
async def soft_delete_expired_archives(db: AsyncSession) -> dict:
|
||||
to_delete = await find_by_store_until(db, datetime.now())
|
||||
counter = 0
|
||||
@@ -107,6 +113,11 @@ def is_user_in_group(email: str, group_name: str) -> models.Group:
|
||||
return len(group_name) and len(email) and group_name in get_user_groups(email)
|
||||
|
||||
|
||||
async def get_group_priority_async(db: AsyncSession, group_id: str) -> dict:
|
||||
db_group = await db.get(models.Group, group_id)
|
||||
priority = db_group.permissions.get("priority", "low") if db_group else "low"
|
||||
return convert_priority_to_queue_dict(priority)
|
||||
|
||||
@lru_cache
|
||||
def get_user_groups(email: str) -> list[str]:
|
||||
"""
|
||||
@@ -129,7 +140,7 @@ def get_user_groups(email: str) -> list[str]:
|
||||
|
||||
# --------------- INIT User-Groups
|
||||
|
||||
def upsert_group(db: Session, group_name: str, description: str, orchestrator: str, orchestrator_sheet: str, service_account_email:str, permissions: dict, domains: list) -> models.Group:
|
||||
def upsert_group(db: Session, group_name: str, description: str, orchestrator: str, orchestrator_sheet: str, service_account_email: str, permissions: dict, domains: list) -> models.Group:
|
||||
db_group = db.query(models.Group).filter(models.Group.id == group_name).first()
|
||||
if db_group is None:
|
||||
db_group = models.Group(id=group_name, description=description, orchestrator=orchestrator, orchestrator_sheet=orchestrator_sheet, service_account_email=service_account_email, permissions=permissions, domains=domains)
|
||||
@@ -238,6 +249,7 @@ async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, i
|
||||
filtered.append(sheet)
|
||||
return filtered
|
||||
|
||||
|
||||
async def delete_stale_sheets(db: AsyncSession, inactivity_days: int) -> dict:
|
||||
time_threshold = datetime.now() - timedelta(days=inactivity_days)
|
||||
result = await db.execute(
|
||||
|
||||
@@ -9,6 +9,8 @@ from app.shared.db import models
|
||||
from app.shared.user_groups import GroupInfo, GroupPermissions
|
||||
from app.shared.schemas import Usage, UsageResponse
|
||||
from app.web.db import crud
|
||||
from app.web.utils.misc import convert_priority_to_queue_dict
|
||||
|
||||
|
||||
class UserState:
|
||||
"""
|
||||
@@ -261,7 +263,7 @@ class UserState:
|
||||
else:
|
||||
if group_id not in self.permissions: return False
|
||||
quota = self.permissions[group_id].max_monthly_urls
|
||||
|
||||
|
||||
if quota == -1:
|
||||
return True
|
||||
|
||||
@@ -269,6 +271,7 @@ class UserState:
|
||||
current_year = datetime.now().year
|
||||
user_urls = self.db.query(models.Archive).filter(
|
||||
models.Archive.author_id == self.email,
|
||||
models.Archive.group_id == group_id,
|
||||
func.extract('month', models.Archive.created_at) == current_month,
|
||||
func.extract('year', models.Archive.created_at) == current_year
|
||||
).count()
|
||||
@@ -288,13 +291,14 @@ class UserState:
|
||||
|
||||
if quota == -1:
|
||||
return True
|
||||
|
||||
|
||||
current_month = datetime.now().month
|
||||
current_year = datetime.now().year
|
||||
|
||||
# find and sum all user bytes over this month
|
||||
user_bytes = self.db.query(models.Archive).filter(
|
||||
models.Archive.author_id == self.email,
|
||||
models.Archive.group_id == group_id,
|
||||
func.extract('month', models.Archive.created_at) == current_month,
|
||||
func.extract('year', models.Archive.created_at) == current_year
|
||||
).with_entities(func.coalesce(func.sum(
|
||||
@@ -327,3 +331,12 @@ class UserState:
|
||||
return False
|
||||
|
||||
return frequency in self.permissions[group_id].sheet_frequency
|
||||
|
||||
def priority_group(self, group_id: str) -> str:
|
||||
priority = "low"
|
||||
for group in self.user_groups:
|
||||
if group.id != group_id: continue
|
||||
if not group.permissions: continue
|
||||
priority = group.permissions.get("priority", priority)
|
||||
break
|
||||
return convert_priority_to_queue_dict(priority)
|
||||
|
||||
@@ -75,6 +75,7 @@ def archive_user_sheet(
|
||||
if not user.can_manually_trigger(sheet.group_id):
|
||||
raise HTTPException(status_code=429, detail="User cannot manually trigger sheet archiving in this group.")
|
||||
|
||||
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=id, author_id=user.email, group_id=sheet.group_id).model_dump_json()]).delay()
|
||||
group_queue = user.priority_group(sheet.group_id)
|
||||
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=id, author_id=user.email, group_id=sheet.group_id).model_dump_json()]).apply_async(**group_queue)
|
||||
|
||||
return JSONResponse({"id": task.id}, status_code=201)
|
||||
@@ -43,7 +43,8 @@ def archive_url(
|
||||
|
||||
archive_create = schemas.ArchiveCreate(**archive.model_dump())
|
||||
|
||||
task = celery.signature("create_archive_task", args=[archive_create.model_dump_json()]).delay()
|
||||
group_queue = user.priority_group(archive_create.group_id)
|
||||
task = celery.signature("create_archive_task", args=[archive_create.model_dump_json()]).apply_async(**group_queue)
|
||||
task_response = schemas.Task(id=task.id)
|
||||
return JSONResponse(task_response.model_dump(), status_code=201)
|
||||
|
||||
|
||||
@@ -81,7 +81,8 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni
|
||||
async with get_db_async() as db:
|
||||
sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit)
|
||||
for s in sheets:
|
||||
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async()
|
||||
group_queue = await crud.get_group_priority_async(db, s.group_id)
|
||||
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async(**group_queue)
|
||||
|
||||
triggered_jobs.append({"sheet_id": s.id, "task_id": task.id})
|
||||
logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}")
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
import base64
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
|
||||
|
||||
def custom_jsonable_encoder(obj):
|
||||
if isinstance(obj, bytes):
|
||||
return base64.b64encode(obj).decode('utf-8')
|
||||
return jsonable_encoder(obj)
|
||||
|
||||
|
||||
def convert_priority_to_queue_dict(priority: str) -> dict:
|
||||
return {
|
||||
"priority": 0 if priority == "high" else 10,
|
||||
"queue": f"{priority}_priority"
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME
|
||||
# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags
|
||||
@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0})
|
||||
def create_archive_task(self, archive_json: str):
|
||||
logger.info(archive_json)
|
||||
archive = schemas.ArchiveCreate.model_validate_json(archive_json)
|
||||
|
||||
# call auto-archiver
|
||||
@@ -49,7 +48,8 @@ def create_archive_task(self, archive_json: str):
|
||||
@celery.task(name="create_sheet_task", bind=True)
|
||||
def create_sheet_task(self, sheet_json: str):
|
||||
sheet = schemas.SubmitSheet.model_validate_json(sheet_json)
|
||||
logger.info(f"SHEET START {sheet=}")
|
||||
queue_name = create_sheet_task.request.delivery_info.get('routing_key', 'No queue info')
|
||||
logger.info(f"[queue={queue_name}] SHEET START {sheet=}")
|
||||
|
||||
orchestrator = load_orchestrator(sheet.group_id, True, {"configurations": {"gsheet_feeder": {"sheet_id": sheet.sheet_id}}})
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ services:
|
||||
|
||||
|
||||
worker:
|
||||
command: watchmedo auto-restart --patterns="*.py" --recursive --ignore-directories -- celery -- --app=app.worker.main.celery worker --loglevel=debug --logfile=/aa-api/logs/celery.log
|
||||
command: watchmedo auto-restart --patterns="*.py" --recursive --ignore-directories -- celery -- --app=app.worker.main.celery worker --loglevel=debug --logfile=/aa-api/logs/celery.log -Q high_priority,low_priority --concurrency=$CONCURRENCY
|
||||
restart: "no"
|
||||
env_file: .env.dev
|
||||
volumes:
|
||||
|
||||
@@ -33,7 +33,7 @@ services:
|
||||
dockerfile: worker.Dockerfile
|
||||
restart: always
|
||||
env_file: .env.prod
|
||||
command: celery --app=app.worker.main.celery worker --loglevel=warning --logfile=/aa-api/logs/celery.log
|
||||
command: celery --app=app.worker.main.celery worker --loglevel=warning --logfile=/aa-api/logs/celery.log -Q high_priority,low_priority --concurrency=$CONCURRENCY
|
||||
volumes:
|
||||
- ./logs:/aa-api/logs
|
||||
- ./database:/aa-api/database
|
||||
|
||||
Reference in New Issue
Block a user