mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-11 04:58:33 +03:00
0.1.5
This commit is contained in:
12
src/main.py
12
src/main.py
@@ -7,7 +7,7 @@ from fastapi.middleware.cors import CORSMiddleware
|
||||
# from fastapi.templating import Jinja2Templates
|
||||
# from pydantic.json import pydantic_encoder
|
||||
from dotenv import load_dotenv
|
||||
import traceback, os, requests
|
||||
import traceback, os, requests, re
|
||||
from loguru import logger
|
||||
|
||||
from worker import create_archive_task, celery
|
||||
@@ -26,7 +26,7 @@ assert len(GOOGLE_CHROME_APP_ID)>10, "GOOGLE_CHROME_APP_ID env variable not set"
|
||||
ALLOWED_EMAILS = set(os.environ.get("ALLOWED_EMAILS", "").split(","))
|
||||
assert len(GOOGLE_CHROME_APP_ID)>=1, "at least one ALLOWED_EMAILS is required from the env variable"
|
||||
ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "chrome-extension://ondkcheoicfckabcnkdgbepofpjmjcmb,chrome-extension://ojcimmjndnlmmlgnjaeojoebaceokpdp").split(",")
|
||||
VERSION = "0.1.4"
|
||||
VERSION = "0.1.5"
|
||||
|
||||
app = FastAPI()
|
||||
app.add_middleware(
|
||||
@@ -119,8 +119,9 @@ def authenticate_user(access_token):
|
||||
j = r.json()
|
||||
if j.get("azp") != GOOGLE_CHROME_APP_ID and j.get("aud")!=GOOGLE_CHROME_APP_ID:
|
||||
return False, f"token does not belong to correct APP_ID"
|
||||
if j.get("email") not in ALLOWED_EMAILS:
|
||||
return False, f"email '{j.get('email')}' not in ALLOWED"
|
||||
# if j.get("email") not in ALLOWED_EMAILS:
|
||||
if not custom_is_email_allowed(j.get("email"), any_bellingcat_email=True):
|
||||
return False, f"email '{j.get('email')}' not allowed"
|
||||
if j.get("email_verified") != "true":
|
||||
return False, f"email '{j.get('email')}' not verified"
|
||||
if int(j.get("expires_in", -1)) <= 0:
|
||||
@@ -130,6 +131,9 @@ def authenticate_user(access_token):
|
||||
logger.warning(f"EXCEPTION occurred: {e}")
|
||||
return False, f"EXCEPTION occurred"
|
||||
|
||||
def custom_is_email_allowed(email, any_bellingcat_email=False):
|
||||
return email in ALLOWED_EMAILS or (any_bellingcat_email and re.match(r'^[\w.]+@bellingcat\.com$', email))
|
||||
|
||||
def validate_user_get_email(access_token):
|
||||
valid_user, info = authenticate_user(access_token)
|
||||
if valid_user != True:
|
||||
|
||||
Reference in New Issue
Block a user