mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-11 21:18:35 +03:00
refactor to use pydantic settings and WAL sqlite mode
This commit is contained in:
@@ -5,12 +5,13 @@ from loguru import logger
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from security import ALLOW_ANY_EMAIL
|
||||
from shared.settings import Settings
|
||||
from . import models, schemas
|
||||
import yaml, os
|
||||
import yaml
|
||||
|
||||
DOMAIN_GROUPS = {}
|
||||
DOMAIN_GROUPS_LOADED = False
|
||||
MAX_LIMIT = 100
|
||||
DATABASE_QUERY_LIMIT = Settings().DATABASE_QUERY_LIMIT
|
||||
|
||||
# --------------- TASK = Archive
|
||||
|
||||
@@ -39,12 +40,12 @@ def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, lim
|
||||
query = query.filter(models.Archive.created_at >= archived_after)
|
||||
if archived_before:
|
||||
query = query.filter(models.Archive.created_at <= archived_before)
|
||||
return query.order_by(models.Archive.created_at.desc()).offset(skip).limit(min(limit, MAX_LIMIT)).all()
|
||||
return query.order_by(models.Archive.created_at.desc()).offset(skip).limit(min(limit, DATABASE_QUERY_LIMIT)).all()
|
||||
|
||||
|
||||
def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int = 100):
|
||||
email = email.lower()
|
||||
return base_query(db).filter(models.Archive.author.has(email=email)).offset(skip).limit(min(limit, MAX_LIMIT)).all()
|
||||
return base_query(db).filter(models.Archive.author.has(email=email)).offset(skip).limit(min(limit, DATABASE_QUERY_LIMIT)).all()
|
||||
|
||||
|
||||
def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]):
|
||||
@@ -76,7 +77,7 @@ def count_by_user_since(db:Session, seconds_delta: int = 15):
|
||||
return db.query(models.Archive.author_id,func.count().label('total'))\
|
||||
.filter(models.Archive.created_at >= time_threshold)\
|
||||
.group_by(models.Archive.author_id)\
|
||||
.order_by(func.count().desc()).limit(5 * MAX_LIMIT).all()
|
||||
.order_by(func.count().desc()).limit(5 * DATABASE_QUERY_LIMIT).all()
|
||||
|
||||
def base_query(db: Session):
|
||||
# allow only some fields to be returned, for example author should remain hidden
|
||||
@@ -98,7 +99,7 @@ def create_tag(db: Session, tag: str):
|
||||
|
||||
|
||||
def search_tags(db: Session, tag: str, skip: int = 0, limit: int = 100):
|
||||
return db.query(models.Tag).filter(models.Tag.url.like(f'%{tag}%')).offset(skip).limit(min(limit, MAX_LIMIT)).all()
|
||||
return db.query(models.Tag).filter(models.Tag.url.like(f'%{tag}%')).offset(skip).limit(min(limit, DATABASE_QUERY_LIMIT)).all()
|
||||
|
||||
|
||||
def is_user_in_group(db: Session, group_name: str, email: str) -> models.Group:
|
||||
@@ -148,7 +149,7 @@ def upsert_user_groups(db: Session):
|
||||
along with new participation of users in groups
|
||||
"""
|
||||
logger.debug("Updating user-groups configuration.")
|
||||
filename = os.environ.get("USER_GROUPS_FILENAME", "user-groups.yaml")
|
||||
filename = Settings().USER_GROUPS_FILENAME
|
||||
|
||||
# read yaml safely
|
||||
with open(filename) as inf:
|
||||
|
||||
Reference in New Issue
Block a user