improve observability

This commit is contained in:
msramalho
2024-03-11 14:57:08 +00:00
parent 74cdc1a3e6
commit 17b3a40e4c
2 changed files with 57 additions and 23 deletions

View File

@@ -13,8 +13,10 @@ from datetime import datetime
import sqlalchemy import sqlalchemy
from prometheus_fastapi_instrumentator import Instrumentator from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter from prometheus_client import Counter
from contextlib import asynccontextmanager
import asyncio, json
from worker import create_archive_task, create_sheet_task, celery, insert_result_into_db from worker import REDIS_EXCEPTIONS_CHANNEL, create_archive_task, create_sheet_task, celery, insert_result_into_db, Rdis
from db import crud, models, schemas from db import crud, models, schemas
from db.database import engine, SessionLocal from db.database import engine, SessionLocal
@@ -26,12 +28,34 @@ load_dotenv()
# Configuration # Configuration
ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",") ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",")
VERSION = "0.6.1" VERSION = "0.6.2"
# min-version refers to the version of auto-archiver-extension on the webstore # min-version refers to the version of auto-archiver-extension on the webstore
BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."} BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."}
app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}) @repeat_every(seconds=60 * 60) # 1 hour
async def refresh_user_groups():
db: Session = next(get_db())
crud.upsert_user_groups(db)
@asynccontextmanager
async def lifespan(app: FastAPI):
# see https://fastapi.tiangolo.com/advanced/events/#lifespan
# STARTUP
models.Base.metadata.create_all(bind=engine)
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
# disabling uvicorn logger since we use loguru in logging_middleware
logging.getLogger("uvicorn.access").disabled = True
asyncio.create_task(redis_subscribe_worker_exceptions())
asyncio.create_task(refresh_user_groups())
yield # separates startup from shutdown instructions
# SHUTDOWN
logger.info("shutting down")
app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}, lifespan=lifespan)
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=ALLOWED_ORIGINS, allow_origins=ALLOWED_ORIGINS,
@@ -183,18 +207,17 @@ def submit_manual_archive(manual:schemas.SubmitManual, auth = Depends(token_api_
return JSONResponse({"id": archive_id}) return JSONResponse({"id": archive_id})
# on startup WORKER_EXCEPTION = Counter(
@app.on_event("startup") "worker_exceptions_total",
async def on_startup(): "Number of times a certain exception has occurred on the worker.",
# # Not needed if you setup a migration system like Alembic labelnames=("exception", "task",)
# await create_db_and_tables() )
models.Base.metadata.create_all(bind=engine) async def redis_subscribe_worker_exceptions():
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head']) PubSubExceptions = Rdis.pubsub()
# disabling uvicorn logger since we use loguru in logging_middleware PubSubExceptions.subscribe(REDIS_EXCEPTIONS_CHANNEL)
logging.getLogger("uvicorn.access").disabled = True while True:
message = PubSubExceptions.get_message()
@app.on_event("startup") if message and message["type"] == "message":
@repeat_every(seconds=60 * 60) # 1 hour data = json.loads(message["data"].decode("utf-8"))
async def on_startup(): WORKER_EXCEPTION.labels(exception=data["exception"], task=data["task"]).inc()
db: Session = next(get_db()) await asyncio.sleep(0.5)
crud.upsert_user_groups(db)

View File

@@ -12,14 +12,15 @@ from db import crud, schemas, models
from db.database import SessionLocal from db.database import SessionLocal
from contextlib import contextmanager from contextlib import contextmanager
import json import json
import redis
from sqlalchemy import exc from sqlalchemy import exc
celery = Celery(__name__) celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379") celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
USER_GROUPS_FILENAME = os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml") USER_GROUPS_FILENAME = os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml")
REDIS_EXCEPTIONS_CHANNEL = "exceptions-channel"
Rdis = redis.Redis.from_url(celery.conf.broker_url)
@contextmanager @contextmanager
def get_db(): def get_db():
@@ -55,6 +56,7 @@ def create_archive_task(self, archive_json: str):
# Log it, then raise again to store the error as the task result # Log it, then raise again to store the error as the task result
logger.error(e) logger.error(e)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
redis_publish_exception(e, self.name)
raise e raise e
return result.to_dict() return result.to_dict()
@@ -87,6 +89,7 @@ def create_sheet_task(self, sheet_json: str):
logger.error(type(e)) logger.error(type(e))
logger.error(e) logger.error(e)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
redis_publish_exception(e, self.name)
stats["failed"] += 1 stats["failed"] += 1
stats["errors"].append(str(e)) stats["errors"].append(str(e))
@@ -96,12 +99,12 @@ def create_sheet_task(self, sheet_json: str):
@task_failure.connect(sender=create_sheet_task) @task_failure.connect(sender=create_sheet_task)
@task_failure.connect(sender=create_archive_task) @task_failure.connect(sender=create_archive_task)
def task_failure_notifier(sender=None, **kwargs): def task_failure_notifier(sender, **kwargs):
logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ") logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ")
logger.error(kwargs['exception']) logger.error(kwargs['exception'])
logger.error(kwargs['traceback']) logger.error(kwargs['traceback'])
logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))) logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback']))))
redis_publish_exception(kwargs['exception'], sender.name)
def choose_orchestrator(group, email): def choose_orchestrator(group, email):
global ORCHESTRATORS global ORCHESTRATORS
@@ -200,6 +203,14 @@ def convert_if_media(media):
logger.debug(f"error parsing {media} : {e}") logger.debug(f"error parsing {media} : {e}")
return False return False
def redis_publish_exception(exception, task_name):
try:
Rdis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps({"exception": exception, "task": task_name}, default=str))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
logger.error(f"Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
# INIT # INIT
ORCHESTRATORS = {} ORCHESTRATORS = {}