From 906242024448d696a309d1dd357c5dca36c33423 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Tue, 23 May 2023 20:20:28 +0100 Subject: [PATCH] feat: /sheet endpoint and new security protocol --- README.md | 8 ++- src/Pipfile.lock | 42 ++++++++-------- src/db/crud.py | 14 +++--- src/db/schemas.py | 20 +++----- src/main.py | 63 ++++++++++-------------- src/requirements.txt | 14 +++--- src/security.py | 79 +++++++++--------------------- src/worker.py | 113 ++++++++++++++++++++++++++++--------------- 8 files changed, 172 insertions(+), 181 deletions(-) diff --git a/README.md b/README.md index 377e768..8f8b669 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,13 @@ Copy `.env` and `src/.env` to deployment, along with the contents of `secrets/` Then `docker compose up -d`. #### updating packages/app/access -If pipenv packages are updated: `pipenv lock --requirements -r > requirements.txt` (manually comment line `-i https://pypi.org/simple`) and then `docker compose down` + `docker compose up --build -d` to build images with new packages. +If pipenv packages are updated: `pipenv lock --requirements -r > requirements.txt` (or ` pipenv requirements > requirements.txt` depending on pipenv version) (manually comment line `-i https://pypi.org/simple`) and then `docker compose down` + `docker compose up --build -d` to build images with new packages. New users should be added to the `src/.env` file `ALLOWED_EMAILS` prop + +```bash +# CALL /sheet POST endpoint +curl -XPOST -H "Authorization: Bearer GOOGLE_OAUTH_TOKEN" -H "Content-type: application/json" -d '{"sheet_id": "SHEET_ID", "header": 1}' 'http://localhost:8004/sheet' + +``` \ No newline at end of file diff --git a/src/Pipfile.lock b/src/Pipfile.lock index 36ce21b..079c9d1 100644 --- a/src/Pipfile.lock +++ b/src/Pipfile.lock @@ -197,11 +197,11 @@ }, "auto-archiver": { "hashes": [ - "sha256:1b337b9c9dfb7757fd861b3a06de1c4ad3f2e38c17f702ba2ae80d2c19ce6293", - "sha256:2f8695be28e8c8c84c3e83903b6ab90fbd670c3403ed5a59e581ace8ff7c7a98" + "sha256:40d8a9b3b818805ec53a4c63b87f5222e29a33ff35effda19b416bbdca4d477d", + "sha256:84bed4548694b7e9dffc181ef9f2b1f9184578403a6b491aaf95577c08df9698" ], "index": "pypi", - "version": "==0.5.15" + "version": "==0.5.17" }, "beautifulsoup4": { "hashes": [ @@ -228,19 +228,19 @@ }, "boto3": { "hashes": [ - "sha256:110263d6be92383b9d5de61f4b36f732f0c98080dcfaf0aeb2f53f93b45b030a", - "sha256:82d92428599c107d8cf7c716e4d805193f07585968d261d9751a0e0777d8b178" + "sha256:d47a68a0ca6599e8711c7da670fbac24085d9d50cfb4f761204f154d2b6fae26", + "sha256:f0a78f94a7140b60960898fd86677e4e73cc96bd7f3e5c64fc5cc1818d04c7b8" ], "markers": "python_version >= '3.7'", - "version": "==1.26.136" + "version": "==1.26.138" }, "botocore": { "hashes": [ - "sha256:134c9a84d3ce112fd7ed00b626d6a48f9d232742dffd9ee77f190b53c44bbcc8", - "sha256:1f36cfa1586c9ceeaf2835a886c5272dd81c9467cf1d515f2dc6a82a2a658da0" + "sha256:31edc237088c104f7a05887646bbec31d7459dd2e108fd90cbffa315902817e2", + "sha256:3d145f30d10a9c712acee48e7ce906c9456bb25fe50d477c9312c702ccfa50d1" ], "markers": "python_version >= '3.7'", - "version": "==1.29.136" + "version": "==1.29.138" }, "brotli": { "hashes": [ @@ -846,11 +846,11 @@ }, "httpcore": { "hashes": [ - "sha256:628e768aaeec1f7effdc6408ba1c3cdbd7487c1fc570f7d66844ec4f003e1ca4", - "sha256:caf508597c525f9b8bfff187e270666309f63115af30f7d68b16143a403c8356" + "sha256:125f8375ab60036db632f34f4b627a9ad085048eef7cb7d2616fea0f739f98af", + "sha256:5581b9c12379c4288fe70f43c710d16060c10080617001e6b22a3b6dbcbefd36" ], "markers": "python_version >= '3.7'", - "version": "==0.17.1" + "version": "==0.17.2" }, "httplib2": { "hashes": [ @@ -1616,11 +1616,11 @@ }, "requests": { "hashes": [ - "sha256:10e94cc4f3121ee6da529d358cdaeaff2f1c409cd377dbc72b825852f2f7e294", - "sha256:239d7d4458afcb28a692cdd298d87542235f4ca8d36d03a15bfc128a6559a2f4" + "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f", + "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1" ], "index": "pypi", - "version": "==2.30.0" + "version": "==2.31.0" }, "requests-oauthlib": { "hashes": [ @@ -1836,11 +1836,11 @@ }, "typing-extensions": { "hashes": [ - "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb", - "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4" + "sha256:6ad00b63f849b7dcc313b70b6b304ed67b2b2963b3098a33efe18056b1a9a223", + "sha256:ff6b238610c747e44c268aa4bb23c8c735d665a63726df3f9431ce707f2aa768" ], "markers": "python_version >= '3.7'", - "version": "==4.5.0" + "version": "==4.6.0" }, "typing-inspect": { "hashes": [ @@ -1867,11 +1867,11 @@ }, "urllib3": { "hashes": [ - "sha256:8a388717b9476f934a21484e8c8e61875ab60644d29b9b39e11e4b9dc1c6b305", - "sha256:aa751d169e23c7479ce47a0cb0da579e3ede798f994f5816a74e4f4500dcea42" + "sha256:8d36afa7616d8ab714608411b4a3b13e58f463aee519024578e062e141dce20f", + "sha256:8f135f6502756bde6b2a9b28989df5fbe87c9970cecaa69041edcce7f0589b14" ], "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'", - "version": "==1.26.15" + "version": "==1.26.16" }, "uvicorn": { "hashes": [ diff --git a/src/db/crud.py b/src/db/crud.py index f2687a0..35d7abe 100644 --- a/src/db/crud.py +++ b/src/db/crud.py @@ -1,6 +1,6 @@ from functools import cache from sqlalchemy.orm import Session, load_only -from sqlalchemy import Column +from sqlalchemy import Column, or_ from loguru import logger from . import models, schemas import yaml @@ -13,17 +13,17 @@ def get_task(db: Session, task_id: str): def get_tasks(db: Session, skip: int = 0, limit: int = 100): return base_query(db).offset(skip).limit(limit).all() -def search_tasks_by_url(db: Session, url:str, skip: int = 0, limit: int = 100): - return base_query(db).filter(models.Archive.url.like(f'%{url}%')).offset(skip).limit(limit).all() +def search_tasks_by_url(db: Session, url:str, email:str, skip: int = 0, limit: int = 100): + groups = get_user_groups(db, email) + return base_query(db).filter(or_(models.Archive.public==True, models.Archive.author_id==email, models.Archive.group_id.in_(groups))).filter(models.Archive.url.like(f'%{url}%')).offset(skip).limit(limit).all() def search_tasks_by_email(db: Session, email:str, skip: int = 0, limit: int = 100): return base_query(db).filter(models.Archive.author.has(email=email)).offset(skip).limit(limit).all() def create_task(db: Session, task: schemas.ArchiveCreate, tags:list[models.Tag],urls:list[models.ArchiveUrl]): - db_task = models.Archive(id=task.id, url=task.url, author_id=task.author_id, result=task.result, group_id=task.group_id) - logger.debug(tags) - db_task.tags = tags # will this work? TODO: test if I don't call create tag before - db_task.urls = urls # will this work to create ArchiveUrl? TODO: test + db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id) + db_task.tags = tags + db_task.urls = urls db.add(db_task) db.commit() db.refresh(db_task) diff --git a/src/db/schemas.py b/src/db/schemas.py index 4a9bdc0..ae32947 100644 --- a/src/db/schemas.py +++ b/src/db/schemas.py @@ -1,6 +1,7 @@ from pydantic import BaseModel from datetime import datetime + class ArchiveCreate(BaseModel): id: str | None = None url: str @@ -8,11 +9,10 @@ class ArchiveCreate(BaseModel): public: bool = True author_id: str | None = None group_id: str | None = None - tags: list = [] + tags: set = set() # urls: list = [] - class Archive(ArchiveCreate): created_at: datetime updated_at: datetime | None @@ -22,16 +22,12 @@ class Archive(ArchiveCreate): orm_mode = True -# class TagCreate(BaseModel): -# id: str - -# class Tag(TagCreate): -# created_at: datetime -# # class Config: -# # orm_mode = True - class SubmitSheet(BaseModel): - sheet_name: str | None= None + sheet_name: str | None = None sheet_id: str | None = None header: int = 1 - \ No newline at end of file + public: bool = False + author_id: str | None = None + group_id: str | None = None + tags: set | None = set() + columns: dict | None = {} # TODO: implement diff --git a/src/main.py b/src/main.py index eb579ab..8b4805c 100644 --- a/src/main.py +++ b/src/main.py @@ -15,17 +15,18 @@ from worker import create_archive_task, create_sheet_task, celery from db import crud, models, schemas from db.database import engine, SessionLocal from sqlalchemy.orm import Session -from security import get_bearer_auth, get_basic_auth, bearer_security, get_bearer_auth_public +from security import get_bearer_auth, get_basic_auth, bearer_security load_dotenv() # Configuration ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",") -VERSION = "0.4.0" -# min-version refers to the version of auto-archiver-extension on the webstore -BREAKING_CHANGES = {"minVersion": "0.3.0", "message": "The latest update has breaking changes, please update the extension to the most recent version."} +VERSION = "0.5.0" -app = FastAPI() +# min-version refers to the version of auto-archiver-extension on the webstore +BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."} + +app = FastAPI(title="Auto-Archiver API", version=VERSION, contact={"name":"Bellingcat", "url":"https://github.com/bellingcat/auto-archiver-api"}) app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, @@ -39,7 +40,14 @@ def get_db(): session = SessionLocal() try: yield session finally: session.close() - + +# logging configurations +logger.add("logs/api_logs.log", retention="30 days", rotation="3 days") +@app.middleware("http") +async def logging_middleware(request: Request, call_next): + response = await call_next(request) + logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}") + return response @app.get("/") async def home(request: Request): @@ -53,14 +61,6 @@ async def home(request: Request): except Exception as e: logger.error(e) return JSONResponse(status) -# logging configurations -logger.add("logs/api_logs.log", retention="30 days", rotation="3 days") -@app.middleware("http") -async def logging_middleware(request: Request, call_next): - response = await call_next(request) - logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - HTTP {response.status_code}") - return response - # Bearer protected below @@ -69,22 +69,17 @@ def get_user_groups(db: Session = Depends(get_db), email = Depends(get_bearer_au return crud.get_user_groups(db, email) @app.get("/tasks/search-url", response_model=list[schemas.Archive]) -def search(url:str, skip: int = 0, limit: int = 100, db: Session = Depends(get_db), _email = Depends(get_bearer_auth)): - return crud.search_tasks_by_url(db, url, skip=skip, limit=limit) - -# @app.get("/tasks/search", response_model=list[schemas.Task]) -# def search(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), email = Depends(get_bearer_auth)): -# return crud.get_tasks(db, skip=skip, limit=limit) +def search(url:str, skip: int = 0, limit: int = 100, db: Session = Depends(get_db), email = Depends(get_bearer_auth)): + return crud.search_tasks_by_url(db, url, email, skip=skip, limit=limit) @app.get("/tasks/sync", response_model=list[schemas.Archive]) def search(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), email = Depends(get_bearer_auth)): return crud.search_tasks_by_email(db, email, skip=skip, limit=limit) @app.post("/tasks", status_code=201) -def run_task(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth)): +def archive_sheet(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth)): archive.author_id = email url = archive.url - logger.warning(archive) logger.info(f"new {archive.public=} task for {email=} and {archive.group_id=}: {url}") if type(url)!=str or len(url)<=5: raise HTTPException(status_code=422, detail=f"Invalid URL received: {url}") @@ -92,22 +87,10 @@ def run_task(archive:schemas.ArchiveCreate, email = Depends(get_bearer_auth)): task = create_archive_task.delay(archive.json()) return JSONResponse({"id": task.id}) -# @app.post("/tasks", status_code=201) -# def run_task(payload = Body(...), email = Depends(get_bearer_auth)): -# url = payload.get('url') -# public = payload.get('public', True) -# group = payload.get('group', None) -# logger.info(f"new {public=} task for {email=} and {group=}: {url}") -# if type(url)!=str or len(url)<=5: -# raise HTTPException(status_code=422, detail=f"Invalid URL received: {url}") -# task = create_archive_task.delay(url=payload.get('url'), email=email, public=public, group=group) -# return JSONResponse({"id": task.id}) - @app.get("/tasks/{task_id}") def get_status(task_id, email = Depends(get_bearer_auth)): logger.info(f"status check for user {email}") task_result = AsyncResult(task_id, app=celery) - logger.info(task_result) result = { "id": task_id, "status": task_result.status, @@ -116,7 +99,10 @@ def get_status(task_id, email = Depends(get_bearer_auth)): try: if task_result.result and "error" in task_result.result: result["status"] = "FAILURE" - except Exception as e: logger.error(traceback.format_exc()) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + result["status"] = "FAILURE" try: json_result = jsonable_encoder(result, exclude_unset=True) return JSONResponse(json_result) @@ -131,7 +117,7 @@ def get_status(task_id, email = Depends(get_bearer_auth)): @app.delete("/tasks/{task_id}") -def get_status(task_id, db: Session = Depends(get_db), email = Depends(get_bearer_auth)): +def delete_task(task_id, db: Session = Depends(get_db), email = Depends(get_bearer_auth)): logger.info(f"deleting task {task_id} request by {email}") return JSONResponse({ "id": task_id, @@ -139,8 +125,9 @@ def get_status(task_id, db: Session = Depends(get_db), email = Depends(get_beare }) @app.post("/sheet", status_code=201) -def run_task(sheet:schemas.SubmitSheet, email = Depends(get_bearer_auth_public)): - logger.info(f"LAUNCHING SHEET TASK for {email=}") +def archive_sheet(sheet:schemas.SubmitSheet, email = Depends(get_bearer_auth)): + logger.info(f"SHEET TASK for {sheet=}") + sheet.author_id = email if not sheet.sheet_name and not sheet.sheet_id: raise HTTPException(status_code=422, detail=f"sheet name or id is required") task = create_sheet_task.delay(sheet.json()) diff --git a/src/requirements.txt b/src/requirements.txt index cff2614..c815600 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -11,12 +11,12 @@ async-generator==1.10 ; python_version >= '3.5' async-timeout==4.0.2 ; python_version >= '3.6' attrs==23.1.0 ; python_version >= '3.7' authlib==0.15.6 -auto-archiver==0.5.15 +auto-archiver==0.5.17 beautifulsoup4==4.12.2 ; python_full_version >= '3.6.0' billiard==3.6.4.0 blinker==1.6.2 ; python_version >= '3.7' -boto3==1.26.136 ; python_version >= '3.7' -botocore==1.29.136 ; python_version >= '3.7' +boto3==1.26.138 ; python_version >= '3.7' +botocore==1.29.138 ; python_version >= '3.7' brotli==1.0.9 ; platform_python_implementation == 'CPython' bs4==0.0.1 cachetools==5.3.0 ; python_version ~= '3.7' @@ -47,7 +47,7 @@ googleapis-common-protos==1.59.0 ; python_version >= '3.7' greenlet==2.0.2 ; python_version >= '3' and (platform_machine == 'aarch64' or (platform_machine == 'ppc64le' or (platform_machine == 'x86_64' or (platform_machine == 'amd64' or (platform_machine == 'AMD64' or (platform_machine == 'win32' or platform_machine == 'WIN32')))))) gspread==5.9.0 ; python_version not in '3.0, 3.1, 3.2, 3.3' and python_version >= '3.6' h11==0.14.0 ; python_version >= '3.7' -httpcore==0.17.1 ; python_version >= '3.7' +httpcore==0.17.2 ; python_version >= '3.7' httplib2==0.22.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' httpx==0.24.1 ; python_version >= '3.7' humanize==4.6.0 ; python_version >= '3.7' @@ -95,7 +95,7 @@ pytz==2023.3 pyyaml==6.0 ; python_version >= '3.6' redis==3.5.3 regex==2023.5.5 ; python_version >= '3.6' -requests==2.30.0 +requests==2.31.0 requests-oauthlib==1.3.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' requests-toolbelt==1.0.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' rich==13.3.5 ; python_full_version >= '3.7.0' @@ -117,11 +117,11 @@ tornado==6.3.2 ; python_version >= '3.5.2' tqdm==4.65.0 ; python_version >= '3.7' trio==0.22.0 ; python_version >= '3.7' trio-websocket==0.10.2 ; python_version >= '3.7' -typing-extensions==4.5.0 ; python_version >= '3.7' +typing-extensions==4.6.0 ; python_version >= '3.7' typing-inspect==0.8.0 tzlocal==5.0.1 ; python_version >= '3.7' uritemplate==4.1.1 ; python_version >= '3.6' -urllib3==1.26.15 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +urllib3==1.26.16 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' uvicorn==0.22.0 uwsgi==2.0.21 vine==1.3.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' diff --git a/src/security.py b/src/security.py index 2d7bc9d..c24ad00 100644 --- a/src/security.py +++ b/src/security.py @@ -4,21 +4,21 @@ from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBasic, HTTPBasicCredentials, HTTPBearer, HTTPAuthorizationCredentials - # Configuration -GOOGLE_CHROME_APP_ID = os.environ.get("GOOGLE_CHROME_APP_ID") -assert len(GOOGLE_CHROME_APP_ID)>10, "GOOGLE_CHROME_APP_ID env variable not set" -GOOGLE_CHROME_APP_ID_PUBLIC = os.environ.get("GOOGLE_CHROME_APP_ID_PUBLIC") -assert len(GOOGLE_CHROME_APP_ID_PUBLIC)>10, "GOOGLE_CHROME_APP_ID_PUBLIC env variable not set" -logger.info(f"{GOOGLE_CHROME_APP_ID_PUBLIC=}") -ALLOWED_EMAILS = set([e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "").split(",")]) -assert len(ALLOWED_EMAILS)>=1, "at least one ALLOWED_EMAILS is required from the env variable" -logger.info(f"{len(ALLOWED_EMAILS)=}") +CHROME_APP_IDS = set([app_id.strip() for app_id in os.environ.get("CHROME_APP_IDS", "").split(",")]) +assert len(CHROME_APP_IDS) > 0, "CHROME_APP_IDS env variable not properly set, it's a csv" +for app_id in CHROME_APP_IDS: + assert len(app_id) > 10, f"CHROME_APP_IDS got invalid id: {app_id} env variable not set" +logger.info(f"{CHROME_APP_IDS=}") + +BLOCKED_EMAILS = set([e.strip().lower() for e in os.environ.get("BLOCKED_EMAILS", "").split(",")]) +logger.info(f"{len(BLOCKED_EMAILS)=}") basic_security = HTTPBasic() bearer_security = HTTPBearer() -#--------------------- Bearer Auth +# --------------------- Bearer Auth + async def get_bearer_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer_security)): # validates the Bearer token in the case that it requires it @@ -31,20 +31,20 @@ async def get_bearer_auth(credentials: HTTPAuthorizationCredentials = Depends(be detail=info, headers={"WWW-Authenticate": "Bearer"}, ) - + + def authenticate_user(access_token): # https://cloud.google.com/docs/authentication/token-types#access - if type(access_token)!=str or len(access_token)<10: return False, "invalid access_token" - r = requests.get("https://oauth2.googleapis.com/tokeninfo", {"access_token":access_token}) - if r.status_code!=200: return False, "error occurred" + if type(access_token) != str or len(access_token) < 10: return False, "invalid access_token" + r = requests.get("https://oauth2.googleapis.com/tokeninfo", {"access_token": access_token}) + if r.status_code != 200: return False, "error occurred" try: j = r.json() - if j.get("azp") != GOOGLE_CHROME_APP_ID and j.get("aud")!=GOOGLE_CHROME_APP_ID: - return False, f"token does not belong to correct APP_ID" - # if j.get("email") not in ALLOWED_EMAILS: - if not custom_is_email_allowed(j.get("email"), any_bellingcat_email=True): + if j.get("azp") not in CHROME_APP_IDS and j.get("aud") not in CHROME_APP_IDS: + return False, f"token does not belong to valid APP_ID" + if j.get("email") in BLOCKED_EMAILS: return False, f"email '{j.get('email')}' not allowed" - if j.get("email_verified") != "true": + if j.get("email_verified") != "true": return False, f"email '{j.get('email')}' not verified" if int(j.get("expires_in", -1)) <= 0: return False, "Token expired" @@ -53,44 +53,11 @@ def authenticate_user(access_token): logger.warning(f"EXCEPTION occurred: {e}") return False, f"EXCEPTION occurred" -def custom_is_email_allowed(email, any_bellingcat_email=False): - return email.lower() in ALLOWED_EMAILS or (any_bellingcat_email and re.match(r'^[\w.]+@bellingcat\.com$', email)) + +# --------------------- Basic Auth +SFP = os.environ.get("STATIC_FILE_PASSWORD", "") # min length is 20 chars -#--------------------- Bearer Auth ANY EMAIL - -async def get_bearer_auth_public(credentials: HTTPAuthorizationCredentials = Depends(bearer_security)): - # validates the Bearer token in the case that it requires it - access_token = credentials.credentials - valid_user, info = authenticate_user_public(access_token) - if valid_user: return info - logger.debug(f"TOKEN FAILURE: {valid_user=} {info=}") - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=info, - headers={"WWW-Authenticate": "Bearer"}, - ) - -def authenticate_user_public(access_token): - # https://cloud.google.com/docs/authentication/token-types#access - if type(access_token)!=str or len(access_token)<10: return False, "invalid access_token" - r = requests.get("https://oauth2.googleapis.com/tokeninfo", {"access_token":access_token}) - if r.status_code!=200: return False, "error occurred" - try: - j = r.json() - if j.get("azp") != GOOGLE_CHROME_APP_ID_PUBLIC and j.get("aud")!=GOOGLE_CHROME_APP_ID_PUBLIC: - return False, f"token does not belong to correct APP_ID" - if j.get("email_verified") != "true": - return False, f"email '{j.get('email')}' not verified" - if int(j.get("expires_in", -1)) <= 0: - return False, "Token expired" - return True, j.get('email') - except Exception as e: - logger.warning(f"EXCEPTION occurred: {e}") - return False, f"EXCEPTION occurred" - -#--------------------- Basic Auth -SFP = os.environ.get("STATIC_FILE_PASSWORD", "") # min length is 20 chars async def get_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_security)): # validates that the Basic token in the case that it requires it assert len(SFP) >= 20, "Invalid STATIC_FILE_PASSWORD, must be at least 20 chars" @@ -101,4 +68,4 @@ async def get_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_secur status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong static file access credentials", headers={"WWW-Authenticate": "Basic"} - ) \ No newline at end of file + ) diff --git a/src/worker.py b/src/worker.py index 7980da0..474716c 100644 --- a/src/worker.py +++ b/src/worker.py @@ -1,22 +1,21 @@ -import os, re, traceback, yaml, datetime +import os, traceback, yaml, datetime +from typing import List, Set -from celery import Celery, states -from celery.exceptions import Ignore +from celery import Celery from celery.signals import task_failure from auto_archiver import Config, ArchivingOrchestrator, Metadata -# from auto_archiver.enrichers import ScreenshotEnricher from loguru import logger from db import crud, schemas, models -from db.database import engine, SessionLocal +from db.database import SessionLocal from contextlib import contextmanager import json celery = Celery(__name__) celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379") celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379") -USER_GROUPS_FILENAME=os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml") +USER_GROUPS_FILENAME = os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml") @contextmanager @@ -25,51 +24,56 @@ def get_db(): try: yield session finally: session.close() -@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5}) + +@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5}) def create_archive_task(self, archive_json: str): - archive = schemas.ArchiveCreate.parse_raw(archive_json) - if not archive.public and archive.group_id and len(archive.group_id) > 0: - # ensure group is valid for user - with get_db() as session: - db_group = crud.get_group_for_user(session, archive.group_id, archive.author_id) - if not db_group: - logger.error(em := f"User {archive.author_id} is not part of {archive.group_id}, no permission") - return {"error": em} + + if (em := is_group_invalid_for_user(archive.public, archive.group_id, archive.author_id)): return {"error": em} url = archive.url - logger.info(f"{url=}") - logger.info(f"{archive=}") + logger.info(f"{url=} {archive=}") orchestrator = choose_orchestrator(archive.group_id, archive.author_id) result = orchestrator.feed_item(Metadata().set_url(url)) - if not result: - logger.error(f"UNABLE TO archive: {url}") - return {"error": "unable to archive"} - result_json = result.to_json() - with get_db() as session: - # create DB URLs - db_urls = [models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}")) for i, m in enumerate(result.media) for url in m.urls] - # create DB TAGs if needed - db_tags = [crud.create_tag(session, tag) for tag in archive.tags] - # insert archive - db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=self.request.id, url=url, result=json.loads(result_json), public=archive.public, author_id=archive.author_id, group_id=archive.group_id), tags=db_tags, urls=db_urls) - logger.debug(f"Added {db_task.id=} to database on {db_task.created_at}") - return result_json + try: + insert_result_into_db(result, archive.tags, archive.public, archive.group_id, archive.author_id, self.request.id) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + return {"error": e} + return result.to_json() -@celery.task(name="create_sheet_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0}) +@celery.task(name="create_sheet_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0}) def create_sheet_task(self, sheet_json: str): - logger.info(f"STARTING {sheet_json}") sheet = schemas.SubmitSheet.parse_raw(sheet_json) + sheet.tags.add("gsheet") + logger.info(f"SHEET START {sheet=}") + + if (em := is_group_invalid_for_user(sheet.public, sheet.group_id, sheet.author_id)): return {"error": em} + config = Config() + #TODO: use choose_orchestrator and overwrite the feeder config.parse(use_cli=False, yaml_config_filename="secrets/orchestration-sheet.yaml", overwrite_configs={"configurations": {"gsheet_feeder": {"sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "header": sheet.header}}}) orchestrator = ArchivingOrchestrator(config) - # TODO: save into local DB - orchestrator.feed() - return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat()} + stats = {"archived": 0, "failed": 0, "errors": []} + for result in orchestrator.feed(): + try: + insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid()) + stats["archived"]+=1 + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + stats["failed"]+=1 + stats["errors"].append(e) + + logger.info(f"SHEET DONE {sheet=}") + return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat(), **stats} + +@task_failure.connect(sender=create_sheet_task) @task_failure.connect(sender=create_archive_task) def task_failure_notifier(sender=None, **kwargs): logger.warning("😅 From task_failure_notifier ==> Task failed successfully! ") @@ -77,6 +81,7 @@ def task_failure_notifier(sender=None, **kwargs): logger.error(kwargs['traceback']) logger.error("\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))) + def choose_orchestrator(group, email): global ORCHESTRATORS if group not in ORCHESTRATORS: group = get_user_first_group(email) @@ -84,6 +89,7 @@ def choose_orchestrator(group, email): logger.info(f"CHOOSE Orchestrator for {group=}, {email=}") return ArchivingOrchestrator(ORCHESTRATORS.get(group)) + def read_user_groups(): # read yaml safely with open(USER_GROUPS_FILENAME) as inf: @@ -93,6 +99,7 @@ def read_user_groups(): logger.error(f"could not open user groups filename {USER_GROUPS_FILENAME}: {e}") raise e + def get_user_first_group(email): user_groups_yaml = read_user_groups() groups = user_groups_yaml.get("users", {}).get(email, []) @@ -107,12 +114,12 @@ def load_orchestrators(): reads the orchestrators key in the config file to load different orchestrators for different groups """ user_groups_yaml = read_user_groups() - + orchestrators_config = user_groups_yaml.get("orchestrators", {}) assert len(orchestrators_config), f"No orchestrators key found in {USER_GROUPS_FILENAME}. please see the example file" assert "default" in orchestrators_config, "please include a 'default' orchestrator to be used when the user has no group" logger.debug(f"Found {len(orchestrators_config)} group orchestrators.") - + for group, config_filename in orchestrators_config.items(): config = Config() config.parse(use_cli=False, yaml_config_filename=config_filename) @@ -120,7 +127,35 @@ def load_orchestrators(): return ORCHESTRATORS -## INIT +def is_group_invalid_for_user(public: bool, group_id: str, author_id: str): + """ + ensures that, if a group is specified, the user belongs to it. + if public is true the requirement is not needed + returns an error message if invalid, or False if all is good. + """ + if not public and group_id and len(group_id) > 0: + # ensure group is valid for user + with get_db() as session: + db_group = crud.get_group_for_user(session, group_id, author_id) + if not db_group: + logger.error(em := f"User {author_id} is not part of {group_id}, no permission") + return em + return False + +def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id:str): + logger.info(f"INSERTING {public=} {result} into {task_id}") + assert result, "UNABLE TO archive: {url}" + with get_db() as session: + # create DB URLs + db_urls = [models.ArchiveUrl(url=url, key=m.get("id", f"media_{i}")) for i, m in enumerate(result.media) for url in m.urls] + # create DB TAGs if needed + db_tags = [crud.create_tag(session, tag) for tag in tags] + # insert archive + db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id), tags=db_tags, urls=db_urls) + logger.debug(f"Added {db_task.id=} to database on {db_task.created_at}") + + +# INIT ORCHESTRATORS = {} -load_orchestrators() \ No newline at end of file +load_orchestrators()