diff --git a/app/shared/db/database.py b/app/shared/db/database.py index 08297a1..8b909b9 100644 --- a/app/shared/db/database.py +++ b/app/shared/db/database.py @@ -21,6 +21,8 @@ def make_engine(database_url: str): pool_size=15, # Increase pool size max_overflow=20, # Allow more temporary connections pool_recycle=1800, # Recycle connections every 30 minutes + pool_pre_ping=True, # Detect and replace stale connections + pool_timeout=30, # Timeout waiting for a connection from pool ) @event.listens_for(engine, "connect") diff --git a/app/shared/task_messaging.py b/app/shared/task_messaging.py index 95438b5..66e4a0d 100644 --- a/app/shared/task_messaging.py +++ b/app/shared/task_messaging.py @@ -12,10 +12,13 @@ def get_celery(name: str = "") -> Celery: name, broker_url=get_settings().celery_broker_url, result_backend=get_settings().celery_broker_url, - broker_connection_retry_on_startup=False, + broker_connection_retry_on_startup=True, broker_transport_options={ "queue_order_strategy": "priority", + "visibility_timeout": 43200, # 12 hours - must be > longest task time_limit }, + result_expires=86400, # expire task results after 24 hours to prevent Redis memory buildup + worker_cancel_long_running_tasks_on_connection_loss=True, ) diff --git a/app/worker/main.py b/app/worker/main.py index a7100fc..36569ca 100644 --- a/app/worker/main.py +++ b/app/worker/main.py @@ -16,6 +16,14 @@ from app.shared.utils.misc import get_all_urls from app.worker.worker_log import logger, setup_celery_logger +# Time limits for tasks (in seconds) +# soft_time_limit raises SoftTimeLimitExceeded inside the task so it can clean up +# time_limit hard-kills the task if soft limit didn't work +SINGLE_URL_SOFT_TIME_LIMIT = 30 * 60 # 30 minutes +SINGLE_URL_HARD_TIME_LIMIT = 35 * 60 # 35 minutes +SHEET_SOFT_TIME_LIMIT = 6 * 60 * 60 # 6 hours +SHEET_HARD_TIME_LIMIT = 6.5 * 60 * 60 # 6.5 hours + settings = get_settings() celery = get_celery("worker") @@ -35,6 +43,10 @@ AA_LOGGER_ID = None autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 1}, + soft_time_limit=SINGLE_URL_SOFT_TIME_LIMIT, + time_limit=SINGLE_URL_HARD_TIME_LIMIT, + acks_late=True, + reject_on_worker_lost=True, ) def create_archive_task(self, archive_json: str): global AA_LOGGER_ID @@ -43,6 +55,7 @@ def create_archive_task(self, archive_json: str): # call auto-archiver args = get_orchestrator_args(archive.group_id, False, [archive.url]) result = None + orchestrator = None try: orchestrator = ArchivingOrchestrator() orchestrator.logger_id = AA_LOGGER_ID # ensure single logger @@ -55,6 +68,8 @@ def create_archive_task(self, archive_json: str): except Exception as e: log_error(e, "create_archive_task") raise e + finally: + cleanup_orchestrator(orchestrator) assert result, f"UNABLE TO archive: {archive.url}" # prepare and insert in DB @@ -67,7 +82,14 @@ def create_archive_task(self, archive_json: str): return archive.result -@celery.task(name="create_sheet_task", bind=True) +@celery.task( + name="create_sheet_task", + bind=True, + soft_time_limit=SHEET_SOFT_TIME_LIMIT, + time_limit=SHEET_HARD_TIME_LIMIT, + acks_late=True, + reject_on_worker_lost=True, +) def create_sheet_task(self, sheet_json: str): global AA_LOGGER_ID sheet = schemas.SubmitSheet.model_validate_json(sheet_json) @@ -112,6 +134,8 @@ def create_sheet_task(self, sheet_json: str): except SystemExit as e: log_error(e, "create_sheet_task: SystemExit from AA") + finally: + cleanup_orchestrator(orchestrator) if stats["archived"] > 0: with get_db() as session: @@ -129,6 +153,19 @@ def create_sheet_task(self, sheet_json: str): ).model_dump() +def cleanup_orchestrator(orchestrator): + """ + Clean up orchestrator resources to prevent leaks between tasks. + """ + if orchestrator is None: + return + try: + if hasattr(orchestrator, "extractors"): + orchestrator.cleanup() + except Exception as e: + logger.warning(f"Error cleaning up orchestrator: {e}") + + def get_orchestrator_args( group_id: str, orchestrator_for_sheet: bool, cli_args: list = None ) -> list: diff --git a/docker-compose.yml b/docker-compose.yml index c373f45..b3789b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,7 +36,11 @@ services: dockerfile: docker/worker/Dockerfile restart: always env_file: .env.prod - command: celery --app=app.worker.main.celery worker -Q high_priority,low_priority --concurrency=${CONCURRENCY} --max-tasks-per-child=100 -O fair + command: celery --app=app.worker.main.celery worker -Q high_priority,low_priority --concurrency=${CONCURRENCY} --max-tasks-per-child=50 -O fair --without-heartbeat --without-mingle + deploy: + resources: + limits: + memory: ${WORKER_MEMORY_LIMIT:-4g} volumes: - ./logs:/aa-api/logs - ./database:/aa-api/database @@ -57,6 +61,7 @@ services: interval: 30s timeout: 10s retries: 3 + start_period: 30s redis: init: true @@ -64,6 +69,10 @@ services: restart: always env_file: .env.prod command: redis-server /conf/redis.conf --requirepass ${REDIS_PASSWORD} + deploy: + resources: + limits: + memory: ${REDIS_MEMORY_LIMIT:-1g} volumes: - ./redis/data:/data - ./redis/config:/conf @@ -72,3 +81,4 @@ services: interval: 30s timeout: 10s retries: 3 + start_period: 10s diff --git a/redis/config/redis.conf b/redis/config/redis.conf index e69de29..9b41275 100644 --- a/redis/config/redis.conf +++ b/redis/config/redis.conf @@ -0,0 +1,17 @@ +# Memory management - prevent Redis from consuming all system memory +# Adjust maxmemory based on your Digital Ocean droplet size +# For a 4GB droplet, 512MB is reasonable; for 8GB, use 1gb +maxmemory 1536mb +maxmemory-policy allkeys-lru + +# Persistence - save snapshots periodically +save 900 1 +save 300 10 +save 60 10000 + +# Disable transparent huge pages warning +activedefrag yes + +# Connection limits +timeout 300 +tcp-keepalive 60