diff --git a/.env.example b/.env.example index 46cc8cc..28b0ebc 100644 --- a/.env.example +++ b/.env.example @@ -33,3 +33,6 @@ MAIL_PORT=587 MAIL_STARTTLS=False MAIL_SSL_TLS=True + +# celery workers config +CONCURRENCY=2 \ No newline at end of file diff --git a/app/shared/task_messaging.py b/app/shared/task_messaging.py index 7f0f09e..21fb3d1 100644 --- a/app/shared/task_messaging.py +++ b/app/shared/task_messaging.py @@ -5,13 +5,17 @@ import redis from app.shared.settings import get_settings + @lru_cache -def get_celery(name:str="") -> Celery: +def get_celery(name: str = "") -> Celery: return Celery( name, broker_url=get_settings().CELERY_BROKER_URL, result_backend=get_settings().CELERY_BROKER_URL, - broker_connection_retry_on_startup=False + broker_connection_retry_on_startup=False, + broker_transport_options={ + 'queue_order_strategy': 'priority', + } ) diff --git a/app/web/db/crud.py b/app/web/db/crud.py index be2a915..e3a78a0 100644 --- a/app/web/db/crud.py +++ b/app/web/db/crud.py @@ -12,6 +12,7 @@ from app.shared.db import models from app.shared.settings import get_settings from app.shared.user_groups import UserGroups from app.shared.utils.misc import fnv1a_hash_mod +from app.web.utils.misc import convert_priority_to_queue_dict DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT @@ -21,12 +22,14 @@ def get_limit(user_limit: int): # --------------- TASK = Archive + def base_query(db: Session): # NOTE: load_only is for optimization and not obfuscation, use .with_entities() if needed return db.query(models.Archive)\ .filter(models.Archive.deleted == False)\ .options(load_only(models.Archive.id, models.Archive.created_at, models.Archive.url, models.Archive.result, models.Archive.store_until)) + def get_archive(db: Session, id: str, email: str): query = base_query(db).filter(models.Archive.id == id) if email != ALLOW_ANY_EMAIL: @@ -34,7 +37,8 @@ def get_archive(db: Session, id: str, email: str): query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups))) return query.first() -def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False)-> list[models.Archive]: + +def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None, absolute_search: bool = False) -> list[models.Archive]: # searches for partial URLs, if email is * no ownership filtering happens query = base_query(db) if email != ALLOW_ANY_EMAIL: @@ -84,13 +88,15 @@ def count_by_user_since(db: Session, seconds_delta: int = 15): .order_by(func.count().desc())\ .limit(500).all() -async def find_by_store_until(db: AsyncSession, store_until_is_before:datetime) -> dict: - res = await db.execute( + +async def find_by_store_until(db: AsyncSession, store_until_is_before: datetime) -> dict: + res = await db.execute( select(models.Archive) - .filter(models.Archive.deleted ==False, models.Archive.store_until < store_until_is_before) + .filter(models.Archive.deleted == False, models.Archive.store_until < store_until_is_before) ) return res.scalars() + async def soft_delete_expired_archives(db: AsyncSession) -> dict: to_delete = await find_by_store_until(db, datetime.now()) counter = 0 @@ -107,6 +113,11 @@ def is_user_in_group(email: str, group_name: str) -> models.Group: return len(group_name) and len(email) and group_name in get_user_groups(email) +async def get_group_priority_async(db: AsyncSession, group_id: str) -> dict: + db_group = await db.get(models.Group, group_id) + priority = db_group.permissions.get("priority", "low") if db_group else "low" + return convert_priority_to_queue_dict(priority) + @lru_cache def get_user_groups(email: str) -> list[str]: """ @@ -129,7 +140,7 @@ def get_user_groups(email: str) -> list[str]: # --------------- INIT User-Groups -def upsert_group(db: Session, group_name: str, description: str, orchestrator: str, orchestrator_sheet: str, service_account_email:str, permissions: dict, domains: list) -> models.Group: +def upsert_group(db: Session, group_name: str, description: str, orchestrator: str, orchestrator_sheet: str, service_account_email: str, permissions: dict, domains: list) -> models.Group: db_group = db.query(models.Group).filter(models.Group.id == group_name).first() if db_group is None: db_group = models.Group(id=group_name, description=description, orchestrator=orchestrator, orchestrator_sheet=orchestrator_sheet, service_account_email=service_account_email, permissions=permissions, domains=domains) @@ -238,6 +249,7 @@ async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, i filtered.append(sheet) return filtered + async def delete_stale_sheets(db: AsyncSession, inactivity_days: int) -> dict: time_threshold = datetime.now() - timedelta(days=inactivity_days) result = await db.execute( diff --git a/app/web/db/user_state.py b/app/web/db/user_state.py index a97df37..52e17e4 100644 --- a/app/web/db/user_state.py +++ b/app/web/db/user_state.py @@ -9,6 +9,8 @@ from app.shared.db import models from app.shared.user_groups import GroupInfo, GroupPermissions from app.shared.schemas import Usage, UsageResponse from app.web.db import crud +from app.web.utils.misc import convert_priority_to_queue_dict + class UserState: """ @@ -261,7 +263,7 @@ class UserState: else: if group_id not in self.permissions: return False quota = self.permissions[group_id].max_monthly_urls - + if quota == -1: return True @@ -269,6 +271,7 @@ class UserState: current_year = datetime.now().year user_urls = self.db.query(models.Archive).filter( models.Archive.author_id == self.email, + models.Archive.group_id == group_id, func.extract('month', models.Archive.created_at) == current_month, func.extract('year', models.Archive.created_at) == current_year ).count() @@ -288,13 +291,14 @@ class UserState: if quota == -1: return True - + current_month = datetime.now().month current_year = datetime.now().year # find and sum all user bytes over this month user_bytes = self.db.query(models.Archive).filter( models.Archive.author_id == self.email, + models.Archive.group_id == group_id, func.extract('month', models.Archive.created_at) == current_month, func.extract('year', models.Archive.created_at) == current_year ).with_entities(func.coalesce(func.sum( @@ -327,3 +331,12 @@ class UserState: return False return frequency in self.permissions[group_id].sheet_frequency + + def priority_group(self, group_id: str) -> str: + priority = "low" + for group in self.user_groups: + if group.id != group_id: continue + if not group.permissions: continue + priority = group.permissions.get("priority", priority) + break + return convert_priority_to_queue_dict(priority) diff --git a/app/web/endpoints/sheet.py b/app/web/endpoints/sheet.py index 89699d0..9177668 100644 --- a/app/web/endpoints/sheet.py +++ b/app/web/endpoints/sheet.py @@ -75,6 +75,7 @@ def archive_user_sheet( if not user.can_manually_trigger(sheet.group_id): raise HTTPException(status_code=429, detail="User cannot manually trigger sheet archiving in this group.") - task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=id, author_id=user.email, group_id=sheet.group_id).model_dump_json()]).delay() + group_queue = user.priority_group(sheet.group_id) + task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=id, author_id=user.email, group_id=sheet.group_id).model_dump_json()]).apply_async(**group_queue) return JSONResponse({"id": task.id}, status_code=201) \ No newline at end of file diff --git a/app/web/endpoints/url.py b/app/web/endpoints/url.py index 98f905c..e7f7000 100644 --- a/app/web/endpoints/url.py +++ b/app/web/endpoints/url.py @@ -43,7 +43,8 @@ def archive_url( archive_create = schemas.ArchiveCreate(**archive.model_dump()) - task = celery.signature("create_archive_task", args=[archive_create.model_dump_json()]).delay() + group_queue = user.priority_group(archive_create.group_id) + task = celery.signature("create_archive_task", args=[archive_create.model_dump_json()]).apply_async(**group_queue) task_response = schemas.Task(id=task.id) return JSONResponse(task_response.model_dump(), status_code=201) diff --git a/app/web/events.py b/app/web/events.py index 4dfdf25..101d055 100644 --- a/app/web/events.py +++ b/app/web/events.py @@ -81,7 +81,8 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni async with get_db_async() as db: sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit) for s in sheets: - task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async() + group_queue = await crud.get_group_priority_async(db, s.group_id) + task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async(**group_queue) triggered_jobs.append({"sheet_id": s.id, "task_id": task.id}) logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}") diff --git a/app/web/utils/misc.py b/app/web/utils/misc.py index cfa856b..870a60b 100644 --- a/app/web/utils/misc.py +++ b/app/web/utils/misc.py @@ -1,7 +1,15 @@ import base64 from fastapi.encoders import jsonable_encoder + def custom_jsonable_encoder(obj): if isinstance(obj, bytes): return base64.b64encode(obj).decode('utf-8') return jsonable_encoder(obj) + + +def convert_priority_to_queue_dict(priority: str) -> dict: + return { + "priority": 0 if priority == "high" else 10, + "queue": f"{priority}_priority" + } diff --git a/app/worker/main.py b/app/worker/main.py index 8ced19b..f5f6c7d 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -27,7 +27,6 @@ USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME # TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags @celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0}) def create_archive_task(self, archive_json: str): - logger.info(archive_json) archive = schemas.ArchiveCreate.model_validate_json(archive_json) # call auto-archiver @@ -49,7 +48,8 @@ def create_archive_task(self, archive_json: str): @celery.task(name="create_sheet_task", bind=True) def create_sheet_task(self, sheet_json: str): sheet = schemas.SubmitSheet.model_validate_json(sheet_json) - logger.info(f"SHEET START {sheet=}") + queue_name = create_sheet_task.request.delivery_info.get('routing_key', 'No queue info') + logger.info(f"[queue={queue_name}] SHEET START {sheet=}") orchestrator = load_orchestrator(sheet.group_id, True, {"configurations": {"gsheet_feeder": {"sheet_id": sheet.sheet_id}}}) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 3a7129d..82e81e3 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -14,7 +14,7 @@ services: worker: - command: watchmedo auto-restart --patterns="*.py" --recursive --ignore-directories -- celery -- --app=app.worker.main.celery worker --loglevel=debug --logfile=/aa-api/logs/celery.log + command: watchmedo auto-restart --patterns="*.py" --recursive --ignore-directories -- celery -- --app=app.worker.main.celery worker --loglevel=debug --logfile=/aa-api/logs/celery.log -Q high_priority,low_priority --concurrency=$CONCURRENCY restart: "no" env_file: .env.dev volumes: diff --git a/docker-compose.yml b/docker-compose.yml index 2b59517..90b5f28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: dockerfile: worker.Dockerfile restart: always env_file: .env.prod - command: celery --app=app.worker.main.celery worker --loglevel=warning --logfile=/aa-api/logs/celery.log + command: celery --app=app.worker.main.celery worker --loglevel=warning --logfile=/aa-api/logs/celery.log -Q high_priority,low_priority --concurrency=$CONCURRENCY volumes: - ./logs:/aa-api/logs - ./database:/aa-api/database