mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-11 21:18:35 +03:00
refactor shared setting to avoid circular dependency
This commit is contained in:
@@ -7,4 +7,7 @@ API_DESCRIPTION = """
|
||||
- You can use this API to archive single URLs or entire Google Sheets.
|
||||
- Once you submit a URL or Sheet for archiving, the API will return a task_id that you can use to check the status of the archiving process. It works asynchronously.
|
||||
"""
|
||||
BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."}
|
||||
BREAKING_CHANGES = {"minVersion": "0.3.1", "message": "The latest update has breaking changes, please update the extension to the most recent version."}
|
||||
|
||||
# changing this will corrupt the database logic
|
||||
ALLOW_ANY_EMAIL = "*"
|
||||
|
||||
@@ -4,7 +4,7 @@ from sqlalchemy import Column, or_, func
|
||||
from loguru import logger
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
from shared.settings import get_settings
|
||||
from . import models, schemas
|
||||
import yaml
|
||||
@@ -158,7 +158,6 @@ def upsert_user_groups(db: Session):
|
||||
try:
|
||||
with open(filename) as inf:
|
||||
user_groups_yaml = yaml.safe_load(inf)
|
||||
logger.error(user_groups_yaml)
|
||||
except Exception as e:
|
||||
logger.error(f"could not open user groups filename {filename}: {e}")
|
||||
raise e
|
||||
|
||||
@@ -4,7 +4,8 @@ from fastapi.responses import JSONResponse
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from web.security import ALLOW_ANY_EMAIL, get_token_or_user_auth
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
from web.security import get_token_or_user_auth
|
||||
from db import schemas
|
||||
from worker import create_sheet_task
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ def test_data(db_session):
|
||||
|
||||
def test_get_archive(test_data, db_session):
|
||||
from db import crud
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
|
||||
print(db_session.query(models.Group).all())
|
||||
|
||||
@@ -88,7 +88,7 @@ def test_get_archive(test_data, db_session):
|
||||
|
||||
def test_search_archives_by_url(test_data, db_session):
|
||||
from db import crud
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
|
||||
# rick's archives are private
|
||||
assert len(crud.search_archives_by_url(db_session, "https://example-0.com", "rick@example.com")) == 34
|
||||
@@ -133,7 +133,7 @@ def test_search_archives_by_url(test_data, db_session):
|
||||
|
||||
|
||||
def test_search_archives_by_email(test_data, db_session):
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
from db import crud
|
||||
|
||||
# lower/upper case
|
||||
@@ -157,7 +157,7 @@ def test_search_archives_by_email(test_data, db_session):
|
||||
@patch("db.crud.DATABASE_QUERY_LIMIT", new=25)
|
||||
def test_max_query_limit(test_data, db_session):
|
||||
from db import crud
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
|
||||
assert len(crud.search_archives_by_url(db_session, "https://example", ALLOW_ANY_EMAIL)) == 25
|
||||
assert len(crud.search_archives_by_url(db_session, "https://example", ALLOW_ANY_EMAIL, limit=1000)) == 25
|
||||
@@ -289,7 +289,7 @@ def test_create_tag(db_session):
|
||||
|
||||
def test_is_user_in_group(test_data, db_session):
|
||||
from db import crud
|
||||
from web.security import ALLOW_ANY_EMAIL
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
|
||||
# see user-groups.test.yaml
|
||||
test_pairs = [
|
||||
|
||||
@@ -4,6 +4,8 @@ from fastapi import HTTPException
|
||||
from fastapi.security import HTTPAuthorizationCredentials
|
||||
import pytest
|
||||
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
|
||||
|
||||
def test_secure_compare():
|
||||
from web.security import secure_compare
|
||||
@@ -14,7 +16,7 @@ def test_secure_compare():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_token_or_user_auth_with_api():
|
||||
from web.security import get_token_or_user_auth, ALLOW_ANY_EMAIL
|
||||
from web.security import get_token_or_user_auth
|
||||
mock_api = HTTPAuthorizationCredentials(scheme="lorem", credentials="this_is_the_test_api_token")
|
||||
assert await get_token_or_user_auth(mock_api) == ALLOW_ANY_EMAIL
|
||||
|
||||
|
||||
@@ -2,10 +2,9 @@ from loguru import logger
|
||||
import requests, secrets
|
||||
from fastapi import HTTPException, status, Depends
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from core.config import ALLOW_ANY_EMAIL
|
||||
from shared.settings import get_settings
|
||||
|
||||
ALLOW_ANY_EMAIL = "*"
|
||||
|
||||
settings = get_settings()
|
||||
bearer_security = HTTPBearer()
|
||||
|
||||
@@ -63,7 +62,7 @@ def authenticate_user(access_token):
|
||||
if r.status_code != 200: return False, "invalid token"
|
||||
try:
|
||||
j = r.json()
|
||||
if j.get("azp") not in settings.CHROME_APP_IDS and j.get("aud") not in settings.CHROME_APP_IDS:
|
||||
if j.get("azp") not in settings.CHROME_APP_IDS and j.get("aud") not in seuser_grouttings.CHROME_APP_IDS:
|
||||
return False, f"token does not belong to valid APP_ID"
|
||||
if j.get("email") in settings.BLOCKED_EMAILS:
|
||||
return False, f"email '{j.get('email')}' not allowed"
|
||||
|
||||
Reference in New Issue
Block a user