mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-11 04:58:33 +03:00
allows search to happen with API_TOKEN
This commit is contained in:
@@ -2,3 +2,8 @@ DATABASE_PATH="sqlite:///./auto-archiver.db"
|
||||
USER_GROUPS_FILENAME=user-groups.yaml
|
||||
CHROME_APP_IDS=000000000000000000000000000000000000000000000.apps.googleusercontent.com,000000000000000000000000000000000000000000001.apps.googleusercontent.com
|
||||
#ALLOWED_ORIGINS="http://localhost:8004" # dev only
|
||||
|
||||
|
||||
STATIC_FILE="/app/your-file.txt"
|
||||
STATIC_FILE_PASSWORD=TODO
|
||||
API_BEARER_TOKEN=TODO
|
||||
@@ -3,6 +3,8 @@ from sqlalchemy.orm import Session, load_only
|
||||
from sqlalchemy import Column, or_
|
||||
from loguru import logger
|
||||
from datetime import datetime
|
||||
|
||||
from security import ALLOW_ANY_EMAIL
|
||||
from . import models, schemas
|
||||
import yaml, os
|
||||
|
||||
@@ -21,9 +23,13 @@ def get_tasks(db: Session, skip: int = 0, limit: int = 100):
|
||||
|
||||
|
||||
def search_tasks_by_url(db: Session, url: str, email: str, skip: int = 0, limit: int = 100, archived_after: datetime = None, archived_before: datetime = None):
|
||||
email = email.lower()
|
||||
groups = get_user_groups(db, email)
|
||||
query = base_query(db).filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups))).filter(models.Archive.url.like(f'%{url}%'))
|
||||
# searches for partial URLs, if email is * no ownership filtering happens
|
||||
query = base_query(db)
|
||||
if email != ALLOW_ANY_EMAIL:
|
||||
email = email.lower()
|
||||
groups = get_user_groups(db, email)
|
||||
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
||||
query = query.filter(models.Archive.url.like(f'%{url}%'))
|
||||
if archived_after:
|
||||
query = query.filter(models.Archive.created_at >= archived_after)
|
||||
if archived_before:
|
||||
|
||||
@@ -18,7 +18,7 @@ from worker import create_archive_task, create_sheet_task, celery, insert_result
|
||||
from db import crud, models, schemas
|
||||
from db.database import engine, SessionLocal
|
||||
from sqlalchemy.orm import Session
|
||||
from security import get_bearer_auth, get_basic_auth, get_server_auth, bearer_security
|
||||
from security import get_bearer_auth, get_basic_auth, get_server_auth, bearer_security, get_bearer_auth_token_or_jwt
|
||||
from auto_archiver import Metadata
|
||||
|
||||
load_dotenv()
|
||||
@@ -82,8 +82,7 @@ def get_user_groups(db: Session = Depends(get_db), email = Depends(get_bearer_au
|
||||
return crud.get_user_groups(db, email)
|
||||
|
||||
@app.get("/tasks/search-url", response_model=list[schemas.Archive])
|
||||
def search_by_url(url:str, skip: int = 0, limit: int = 100, archived_after:datetime=None, archived_before:datetime=None, db: Session = Depends(get_db), email = Depends(get_bearer_auth)):
|
||||
#TODO: test strip
|
||||
def search_by_url(url:str, skip: int = 0, limit: int = 100, archived_after:datetime=None, archived_before:datetime=None, db: Session = Depends(get_db), email = Depends(get_bearer_auth_token_or_jwt)):
|
||||
return crud.search_tasks_by_url(db, url.strip(), email, skip=skip, limit=limit, archived_after=archived_after, archived_before=archived_before)
|
||||
|
||||
@app.get("/tasks/sync", response_model=list[schemas.Archive])
|
||||
|
||||
@@ -18,8 +18,19 @@ basic_security = HTTPBasic()
|
||||
bearer_security = HTTPBearer()
|
||||
|
||||
# --------------------- Bearer Auth
|
||||
ALLOW_ANY_EMAIL = "*"
|
||||
|
||||
|
||||
API_BEARER_TOKEN = os.environ.get("API_BEARER_TOKEN", "") # min length is 20 chars
|
||||
async def get_bearer_auth_token_or_jwt(credentials: HTTPAuthorizationCredentials = Depends(bearer_security)):
|
||||
# tries to use the static API_KEY and defaults to google JWT auth
|
||||
access_token = credentials.credentials
|
||||
if len(API_BEARER_TOKEN) >= 20:
|
||||
current_token_bytes = access_token.encode("utf8")
|
||||
is_correct_token = secrets.compare_digest(current_token_bytes, API_BEARER_TOKEN.encode("utf8"))
|
||||
if is_correct_token: return ALLOW_ANY_EMAIL # any email works
|
||||
return await get_bearer_auth(credentials)
|
||||
|
||||
async def get_bearer_auth(credentials: HTTPAuthorizationCredentials = Depends(bearer_security)):
|
||||
# validates the Bearer token in the case that it requires it
|
||||
access_token = credentials.credentials
|
||||
|
||||
Reference in New Issue
Block a user