diff --git a/src/main.py b/src/main.py index e82f041..b4d92ae 100644 --- a/src/main.py +++ b/src/main.py @@ -13,8 +13,10 @@ from datetime import datetime import sqlalchemy from prometheus_fastapi_instrumentator import Instrumentator from prometheus_client import Counter +from contextlib import asynccontextmanager +import asyncio, json -from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db +from worker import REDIS_EXCEPTIONS_CHANNEL, create_archive_task, create_sheet_task, celery, insert_result_into_db, Rdis from db import crud, models, schemas from db.database import engine, SessionLocal @@ -26,12 +28,34 @@ load_dotenv() # Configuration ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",") -VERSION = "0.6.1" - +VERSION = "0.6.2" # min-version refers to the version of auto-archiver-extension on the webstore BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."} -app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}) +@repeat_every(seconds=60 * 60) # 1 hour +async def refresh_user_groups(): + db: Session = next(get_db()) + crud.upsert_user_groups(db) + +@asynccontextmanager +async def lifespan(app: FastAPI): + # see https://fastapi.tiangolo.com/advanced/events/#lifespan + # STARTUP + + models.Base.metadata.create_all(bind=engine) + alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) + # disabling uvicorn logger since we use loguru in logging_middleware + logging.getLogger("uvicorn.access").disabled = True + asyncio.create_task(redis_subscribe_worker_exceptions()) + asyncio.create_task(refresh_user_groups()) + + yield # separates startup from shutdown instructions + + # SHUTDOWN + logger.info("shutting down") + + +app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}, lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, @@ -183,18 +207,17 @@ def submit_manual_archive(manual:schemas.SubmitManual, auth = Depends(token_api_ return JSONResponse({"id": archive_id}) -# on startup -@app.on_event("startup") -async def on_startup(): -# # Not needed if you setup a migration system like Alembic -# await create_db_and_tables() - models.Base.metadata.create_all(bind=engine) - alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) - # disabling uvicorn logger since we use loguru in logging_middleware - logging.getLogger("uvicorn.access").disabled = True - -@app.on_event("startup") -@repeat_every(seconds=60 * 60) # 1 hour -async def on_startup(): - db: Session = next(get_db()) - crud.upsert_user_groups(db) +WORKER_EXCEPTION = Counter( + "worker_exceptions_total", + "Number of times a certain exception has occurred on the worker.", + labelnames=("exception", "task",) +) +async def redis_subscribe_worker_exceptions(): + PubSubExceptions = Rdis.pubsub() + PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL) + while True: + message = PubSubExceptions.get_message() + if message and message["type"] == "message": + data = json.loads(message["data"].decode("utf-8")) + WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc() + await asyncio.sleep(0.5) diff --git a/src/worker.py b/src/worker.py index c4ae172..44c61b0 100644 --- a/src/worker.py +++ b/src/worker.py @@ -12,14 +12,15 @@ from db import crud, schemas, models from db.database import SessionLocal from contextlib import contextmanager import json - +import redis from sqlalchemy import exc celery = Celery(__name__) celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379") USER_GROUPS_FILENAME = os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml") - +REDIS_EXCEPTIONS_CHANNEL = "exceptions-channel" +Rdis = redis.Redis.from_url(celery.conf.broker_url) @contextmanager def get_db(): @@ -55,6 +56,7 @@ def create_archive_task(self, archive_json: str): # Log it, then raise again to store the error as the task result logger.error(e) logger.error(traceback.format_exc()) + redis_publish_exception(e, self.name) raise e return result.to_dict() @@ -87,6 +89,7 @@ def create_sheet_task(self, sheet_json: str): logger.error(type(e)) logger.error(e) logger.error(traceback.format_exc()) + redis_publish_exception(e, self.name) stats["failed"] += 1 stats["errors"].append(str(e)) @@ -96,12 +99,12 @@ def create_sheet_task(self, sheet_json: str): @task_failure.connect(sender=create_sheet_task) @task_failure.connect(sender=create_archive_task) -def task_failure_notifier(sender=None, **kwargs): +def task_failure_notifier(sender, **kwargs): logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ") logger.error(kwargs['exception']) logger.error(kwargs['traceback']) logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))) - + redis_publish_exception(kwargs['exception'], sender.name) def choose_orchestrator(group, email): global ORCHESTRATORS @@ -200,6 +203,14 @@ def convert_if_media(media): logger.debug(f"error parsing {media} : {e}") return False +def redis_publish_exception(exception, task_name): + try: + Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name}, default=str)) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + logger.error(f"Could not publish to {REDIS_EXCEPTIONS_CHANNEL}") + # INIT ORCHESTRATORS = {}