mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-08 03:28:35 +03:00
Merge branch 'main' into dev
This commit is contained in:
@@ -34,6 +34,8 @@ class Settings(BaseSettings):
|
||||
Set[Annotated[str, Len(min_length=10)]], Len(min_length=1)
|
||||
]
|
||||
BLOCKED_EMAILS: Annotated[Set[str], Len(min_length=0)] = set()
|
||||
# if not provided only OAUTH access_tokens are allowed
|
||||
FIREBASE_SERVICE_ACCOUNT_JSON: str = ""
|
||||
|
||||
# redis
|
||||
REDIS_PASSWORD: str = ""
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from http import HTTPStatus
|
||||
from unittest import mock
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
@@ -165,6 +166,46 @@ async def test_authenticate_user():
|
||||
assert mock_get.call_count == 9
|
||||
|
||||
|
||||
@mock.patch("app.web.security.FIREBASE_OAUTH_ENABLED", True)
|
||||
@mock.patch("app.web.security.firebase_admin.initialize_app")
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_with_id_token(m_init):
|
||||
from firebase_admin import exceptions
|
||||
|
||||
from app.web.security import authenticate_user
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
authenticate_user("test")
|
||||
assert "The default Firebase app does not exist." in str(e.value)
|
||||
|
||||
with patch("app.web.security.auth.verify_id_token") as mock_verify:
|
||||
# missing email
|
||||
mock_verify.return_value = {"email": None}
|
||||
assert authenticate_user("fake_token") == (
|
||||
False,
|
||||
"email not found in token",
|
||||
)
|
||||
assert mock_verify.call_count == 1
|
||||
|
||||
# blocked email
|
||||
mock_verify.return_value = {
|
||||
"email": "blocked@example.com",
|
||||
}
|
||||
assert authenticate_user("fake_token") == (
|
||||
False,
|
||||
"email 'blocked@example.com' not allowed",
|
||||
)
|
||||
assert mock_verify.call_count == 2
|
||||
|
||||
# valid email
|
||||
mock_verify.return_value = {"email": "rick@example.com"}
|
||||
assert authenticate_user("fake_token") == (True, "rick@example.com")
|
||||
assert mock_verify.call_count == 3
|
||||
|
||||
mock_verify.side_effect = exceptions.FirebaseError(2, "mocked error")
|
||||
assert authenticate_user("fake_token") == (False, "invalid token")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_exception():
|
||||
with patch("app.web.security.requests.get") as mock_get:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
VERSION = "0.9.2"
|
||||
VERSION = "0.9.4"
|
||||
|
||||
API_DESCRIPTION = """
|
||||
#### API for the Auto-Archiver project, a tool to archive web pages and Google Sheets.
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import secrets
|
||||
from http import HTTPStatus
|
||||
|
||||
import firebase_admin
|
||||
import requests
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from firebase_admin import auth, credentials, exceptions
|
||||
from loguru import logger
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -16,6 +18,13 @@ from app.web.db.user_state import UserState
|
||||
settings = get_settings()
|
||||
bearer_security = HTTPBearer()
|
||||
|
||||
FIREBASE_OAUTH_ENABLED = settings.FIREBASE_SERVICE_ACCOUNT_JSON != ""
|
||||
if FIREBASE_OAUTH_ENABLED:
|
||||
logger.debug("Firebase OAUTH enabled, initializing...")
|
||||
firebase_admin.initialize_app(
|
||||
credentials.Certificate(settings.FIREBASE_SERVICE_ACCOUNT_JSON)
|
||||
)
|
||||
|
||||
|
||||
def secure_compare(token, api_key) -> bool:
|
||||
return secrets.compare_digest(token.encode("utf8"), api_key.encode("utf8"))
|
||||
@@ -72,6 +81,12 @@ async def get_user_auth(
|
||||
|
||||
|
||||
def authenticate_user(access_token) -> (bool, str):
|
||||
if FIREBASE_OAUTH_ENABLED:
|
||||
try:
|
||||
return firebase_login_attempt(access_token)
|
||||
except exceptions.FirebaseError as e:
|
||||
logger.warning(f"Error verifying ID token: {str(e)[:80]}...")
|
||||
|
||||
# https://cloud.google.com/docs/authentication/token-types#access
|
||||
if not isinstance(access_token, str) or len(access_token) < 10:
|
||||
return False, "invalid access_token"
|
||||
@@ -100,6 +115,17 @@ def authenticate_user(access_token) -> (bool, str):
|
||||
return False, "exception occurred"
|
||||
|
||||
|
||||
def firebase_login_attempt(access_token) -> (bool, str):
|
||||
j = auth.verify_id_token(access_token)
|
||||
email = j.get("email", None)
|
||||
logger.debug(f"Successfully verified the ID token for {email}")
|
||||
if email is None:
|
||||
return False, "email not found in token"
|
||||
if email in settings.BLOCKED_EMAILS:
|
||||
return False, f"email '{email}' not allowed"
|
||||
return True, email
|
||||
|
||||
|
||||
def get_user_state(
|
||||
email: str = Depends(get_user_auth),
|
||||
db: Session = Depends(get_db_dependency),
|
||||
|
||||
Reference in New Issue
Block a user