mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-11 12:48:28 +03:00
experimental feature for one-click deployment
This commit is contained in:
0
deploy/tests/__init__.py
Normal file
0
deploy/tests/__init__.py
Normal file
354
deploy/tests/test_generate_config.py
Normal file
354
deploy/tests/test_generate_config.py
Normal file
@@ -0,0 +1,354 @@
|
||||
"""Tests for deploy/generate_config.py – config generation from env vars."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import yaml
|
||||
|
||||
from deploy.generate_config import build_config, main
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _env(**overrides):
|
||||
"""Return a clean env dict with only the given overrides (no leak from host)."""
|
||||
# Clear all deploy-relevant env vars, then apply overrides
|
||||
deploy_vars = [
|
||||
"LOG_LEVEL",
|
||||
"SUBTITLES",
|
||||
"GSHEET_URL",
|
||||
"GOOGLE_SERVICE_ACCOUNT_JSON",
|
||||
"S3_BUCKET",
|
||||
"S3_KEY",
|
||||
"S3_SECRET",
|
||||
"S3_REGION",
|
||||
"S3_ENDPOINT",
|
||||
"S3_CDN_URL",
|
||||
"S3_PRIVATE",
|
||||
"TELEGRAM_API_ID",
|
||||
"TELEGRAM_API_HASH",
|
||||
"TELEGRAM_BOT_TOKEN",
|
||||
"ENABLE_SCREENSHOTS",
|
||||
"ENABLE_THUMBNAILS",
|
||||
"ENABLE_CSV_DB",
|
||||
]
|
||||
clean = {k: v for k, v in os.environ.items() if k not in deploy_vars}
|
||||
clean.update(overrides)
|
||||
return clean
|
||||
|
||||
|
||||
# ── Base config (no optional env vars) ────────────────────────────────
|
||||
|
||||
|
||||
class TestBaseConfig:
|
||||
"""When no optional env vars are set, build_config returns a minimal working config."""
|
||||
|
||||
def test_base_steps(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
steps = cfg["steps"]
|
||||
assert steps["feeders"] == ["cli_feeder"]
|
||||
assert steps["extractors"] == ["generic_extractor"]
|
||||
assert steps["enrichers"] == ["hash_enricher"]
|
||||
assert steps["databases"] == ["console_db"]
|
||||
assert steps["storages"] == ["local_storage"]
|
||||
assert steps["formatters"] == ["html_formatter"]
|
||||
|
||||
def test_base_has_required_module_configs(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "local_storage" in cfg
|
||||
assert "generic_extractor" in cfg
|
||||
assert "hash_enricher" in cfg
|
||||
assert "html_formatter" in cfg
|
||||
|
||||
def test_default_log_level_is_info(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["logging"]["level"] == "INFO"
|
||||
|
||||
def test_custom_log_level(self):
|
||||
with patch.dict(os.environ, _env(LOG_LEVEL="DEBUG"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["logging"]["level"] == "DEBUG"
|
||||
|
||||
def test_authentication_present_and_empty(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["authentication"] == {}
|
||||
|
||||
def test_local_storage_defaults(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
ls = cfg["local_storage"]
|
||||
assert ls["save_to"] == "/app/local_archive"
|
||||
assert ls["path_generator"] == "flat"
|
||||
assert ls["filename_generator"] == "static"
|
||||
|
||||
def test_subtitles_default_false(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is False
|
||||
|
||||
def test_subtitles_enabled(self):
|
||||
with patch.dict(os.environ, _env(SUBTITLES="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is True
|
||||
|
||||
def test_subtitles_case_insensitive(self):
|
||||
with patch.dict(os.environ, _env(SUBTITLES="True"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is True
|
||||
|
||||
def test_no_optional_modules_present(self):
|
||||
"""Ensure optional modules don't appear when their env vars are absent."""
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "gsheet_feeder" not in cfg
|
||||
assert "s3_storage" not in cfg
|
||||
assert "telegram_extractor" not in cfg
|
||||
assert "screenshot_enricher" not in cfg
|
||||
assert "thumbnail_enricher" not in cfg
|
||||
assert "csv_db" not in cfg
|
||||
|
||||
def test_config_is_valid_yaml(self):
|
||||
"""The output dict should round-trip through YAML cleanly."""
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
dumped = yaml.dump(cfg)
|
||||
reloaded = yaml.safe_load(dumped)
|
||||
assert reloaded == cfg
|
||||
|
||||
|
||||
# ── Google Sheets ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGSheetConfig:
|
||||
def test_gsheet_adds_feeder_and_db(self):
|
||||
with patch.dict(os.environ, _env(GSHEET_URL="https://docs.google.com/spreadsheets/d/abc"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "gsheet_feeder" in cfg["steps"]["feeders"]
|
||||
assert "gsheet_db" in cfg["steps"]["databases"]
|
||||
|
||||
def test_gsheet_feeder_config(self):
|
||||
url = "https://docs.google.com/spreadsheets/d/abc123"
|
||||
with patch.dict(os.environ, _env(GSHEET_URL=url), clear=True):
|
||||
cfg = build_config()
|
||||
gf = cfg["gsheet_feeder"]
|
||||
assert gf["sheet"] == url
|
||||
assert gf["header"] == 1
|
||||
assert "service_account" in gf
|
||||
assert gf["columns"]["url"] == "link"
|
||||
assert gf["columns"]["status"] == "archive status"
|
||||
|
||||
def test_gsheet_preserves_cli_feeder(self):
|
||||
"""cli_feeder should still be present even when gsheet is added."""
|
||||
with patch.dict(os.environ, _env(GSHEET_URL="https://example.com/sheet"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "cli_feeder" in cfg["steps"]["feeders"]
|
||||
|
||||
def test_service_account_json_written(self, tmp_path):
|
||||
"""When GOOGLE_SERVICE_ACCOUNT_JSON is set, it writes the file."""
|
||||
sa_data = json.dumps({"type": "service_account", "project_id": "test"})
|
||||
secrets_dir = tmp_path / "secrets"
|
||||
with (
|
||||
patch.dict(os.environ, _env(GOOGLE_SERVICE_ACCOUNT_JSON=sa_data), clear=True),
|
||||
patch("deploy.generate_config.SECRETS_DIR", secrets_dir),
|
||||
):
|
||||
build_config()
|
||||
sa_path = secrets_dir / "service_account.json"
|
||||
assert sa_path.exists()
|
||||
assert json.loads(sa_path.read_text())["project_id"] == "test"
|
||||
|
||||
|
||||
# ── S3 storage ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestS3Config:
|
||||
def test_s3_adds_storage(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="my-bucket"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "s3_storage" in cfg["steps"]["storages"]
|
||||
assert "local_storage" in cfg["steps"]["storages"] # local still there
|
||||
|
||||
def test_s3_config_values(self):
|
||||
env = _env(
|
||||
S3_BUCKET="my-bucket",
|
||||
S3_KEY="AKID",
|
||||
S3_SECRET="shhh",
|
||||
S3_REGION="eu-west-1",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
s3 = cfg["s3_storage"]
|
||||
assert s3["bucket"] == "my-bucket"
|
||||
assert s3["key"] == "AKID"
|
||||
assert s3["secret"] == "shhh"
|
||||
assert s3["region"] == "eu-west-1"
|
||||
assert s3["private"] is False
|
||||
assert s3["random_no_duplicate"] is True
|
||||
|
||||
def test_s3_defaults(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b"), clear=True):
|
||||
cfg = build_config()
|
||||
s3 = cfg["s3_storage"]
|
||||
assert s3["region"] == "us-east-1"
|
||||
assert "{region}" in s3["endpoint_url"]
|
||||
|
||||
def test_s3_private_flag(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_PRIVATE="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["s3_storage"]["private"] is True
|
||||
|
||||
def test_s3_custom_endpoint(self):
|
||||
endpoint = "https://nyc3.digitaloceanspaces.com"
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_ENDPOINT=endpoint), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["s3_storage"]["endpoint_url"] == endpoint
|
||||
|
||||
|
||||
# ── Telegram ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestTelegramConfig:
|
||||
def test_telegram_added_when_both_set(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" in cfg["steps"]["extractors"]
|
||||
assert cfg["telegram_extractor"]["api_id"] == "12345"
|
||||
assert cfg["telegram_extractor"]["api_hash"] == "abc"
|
||||
|
||||
def test_telegram_not_added_if_only_id(self):
|
||||
with patch.dict(os.environ, _env(TELEGRAM_API_ID="12345"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" not in cfg["steps"]["extractors"]
|
||||
|
||||
def test_telegram_not_added_if_only_hash(self):
|
||||
with patch.dict(os.environ, _env(TELEGRAM_API_HASH="abc"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" not in cfg["steps"]["extractors"]
|
||||
|
||||
def test_telegram_bot_token_optional(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc", TELEGRAM_BOT_TOKEN="bot:tok")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["telegram_extractor"]["bot_token"] == "bot:tok"
|
||||
|
||||
def test_telegram_no_bot_token(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert "bot_token" not in cfg["telegram_extractor"]
|
||||
|
||||
|
||||
# ── Optional enrichers / databases ────────────────────────────────────
|
||||
|
||||
|
||||
class TestOptionalModules:
|
||||
def test_screenshots_disabled_by_default(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" not in cfg["steps"]["enrichers"]
|
||||
|
||||
def test_screenshots_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
|
||||
assert cfg["screenshot_enricher"]["width"] == 1280
|
||||
|
||||
def test_thumbnails_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_THUMBNAILS="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "thumbnail_enricher" in cfg["steps"]["enrichers"]
|
||||
assert cfg["thumbnail_enricher"]["max_thumbnails"] == 16
|
||||
|
||||
def test_csv_db_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_CSV_DB="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "csv_db" in cfg["steps"]["databases"]
|
||||
assert cfg["csv_db"]["csv_file"] == "/app/local_archive/db.csv"
|
||||
|
||||
def test_case_insensitive_boolean(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="TRUE"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
|
||||
|
||||
|
||||
# ── Combined / full config ────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestCombinedConfig:
|
||||
def test_all_optional_modules_together(self):
|
||||
"""Enable everything at once and verify no conflicts."""
|
||||
env = _env(
|
||||
GSHEET_URL="https://example.com/sheet",
|
||||
S3_BUCKET="bucket",
|
||||
S3_KEY="key",
|
||||
S3_SECRET="secret",
|
||||
TELEGRAM_API_ID="123",
|
||||
TELEGRAM_API_HASH="abc",
|
||||
TELEGRAM_BOT_TOKEN="tok",
|
||||
ENABLE_SCREENSHOTS="true",
|
||||
ENABLE_THUMBNAILS="true",
|
||||
ENABLE_CSV_DB="true",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
|
||||
steps = cfg["steps"]
|
||||
assert "gsheet_feeder" in steps["feeders"]
|
||||
assert "telegram_extractor" in steps["extractors"]
|
||||
assert "screenshot_enricher" in steps["enrichers"]
|
||||
assert "thumbnail_enricher" in steps["enrichers"]
|
||||
assert "csv_db" in steps["databases"]
|
||||
assert "gsheet_db" in steps["databases"]
|
||||
assert "s3_storage" in steps["storages"]
|
||||
assert "local_storage" in steps["storages"]
|
||||
|
||||
# All module configs present
|
||||
for key in [
|
||||
"gsheet_feeder",
|
||||
"s3_storage",
|
||||
"telegram_extractor",
|
||||
"screenshot_enricher",
|
||||
"thumbnail_enricher",
|
||||
"csv_db",
|
||||
]:
|
||||
assert key in cfg, f"{key} config missing"
|
||||
|
||||
def test_full_config_valid_yaml(self):
|
||||
env = _env(
|
||||
GSHEET_URL="https://example.com/sheet",
|
||||
S3_BUCKET="bucket",
|
||||
TELEGRAM_API_ID="123",
|
||||
TELEGRAM_API_HASH="abc",
|
||||
ENABLE_SCREENSHOTS="true",
|
||||
ENABLE_CSV_DB="true",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
dumped = yaml.dump(cfg)
|
||||
reloaded = yaml.safe_load(dumped)
|
||||
assert reloaded == cfg
|
||||
|
||||
|
||||
# ── main() writes file ───────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMainFunction:
|
||||
def test_main_writes_config_file(self, tmp_path):
|
||||
config_path = tmp_path / "orchestration.yaml"
|
||||
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
|
||||
main()
|
||||
assert config_path.exists()
|
||||
cfg = yaml.safe_load(config_path.read_text())
|
||||
assert cfg["steps"]["feeders"] == ["cli_feeder"]
|
||||
|
||||
def test_main_creates_parent_dirs(self, tmp_path):
|
||||
config_path = tmp_path / "nested" / "dir" / "orchestration.yaml"
|
||||
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
|
||||
main()
|
||||
assert config_path.exists()
|
||||
124
deploy/tests/test_gsheet_poller.py
Normal file
124
deploy/tests/test_gsheet_poller.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Tests for deploy/gsheet_poller.py – background Google Sheets polling."""
|
||||
|
||||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
from deploy.gsheet_poller import start_poller, _poll_once
|
||||
|
||||
|
||||
# ── start_poller ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStartPoller:
|
||||
def test_disabled_when_no_gsheet_url(self):
|
||||
"""No thread should be started when GSHEET_URL is empty."""
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": ""}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
start_poller()
|
||||
mock_thread.assert_not_called()
|
||||
|
||||
def test_disabled_when_gsheet_url_absent(self):
|
||||
env = {k: v for k, v in os.environ.items() if k != "GSHEET_URL"}
|
||||
with patch.dict(os.environ, env, clear=True), patch("deploy.gsheet_poller.threading.Thread") as mock_thread:
|
||||
start_poller()
|
||||
mock_thread.assert_not_called()
|
||||
|
||||
def test_starts_thread_when_gsheet_url_set(self):
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "https://example.com/sheet"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_instance = MagicMock()
|
||||
mock_thread.return_value = mock_instance
|
||||
start_poller()
|
||||
mock_thread.assert_called_once()
|
||||
assert mock_thread.call_args.kwargs["daemon"] is True
|
||||
assert mock_thread.call_args.kwargs["name"] == "gsheet-poller"
|
||||
mock_instance.start.assert_called_once()
|
||||
|
||||
def test_default_interval_300(self):
|
||||
env = {"GSHEET_URL": "https://example.com/sheet"}
|
||||
# Remove POLL_INTERVAL if present
|
||||
clean_env = {k: v for k, v in os.environ.items() if k != "POLL_INTERVAL"}
|
||||
clean_env.update(env)
|
||||
with (
|
||||
patch.dict(os.environ, clean_env, clear=True),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
# interval should be passed as arg to _poll_loop
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (300,)
|
||||
|
||||
def test_custom_interval(self):
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "600"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (600,)
|
||||
|
||||
def test_interval_minimum_enforced(self):
|
||||
"""Intervals below 60 should be clamped to 60."""
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "10"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (60,)
|
||||
|
||||
|
||||
# ── _poll_once ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPollOnce:
|
||||
def test_calls_subprocess_with_config(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
mock_run.assert_called_once()
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert "auto_archiver" in " ".join(cmd)
|
||||
assert "--config" in cmd
|
||||
|
||||
def test_handles_nonzero_exit(self):
|
||||
"""Should not raise on non-zero exit, just log a warning."""
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=1, stderr="some error")
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_handles_timeout(self):
|
||||
"""Should not raise on timeout, just log."""
|
||||
import subprocess
|
||||
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=600)
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_handles_exception(self):
|
||||
"""Should not raise on arbitrary exceptions."""
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.side_effect = OSError("broken")
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_uses_correct_config_path(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
cmd = mock_run.call_args[0][0]
|
||||
config_idx = cmd.index("--config")
|
||||
assert cmd[config_idx + 1] == "/app/secrets/orchestration.yaml"
|
||||
|
||||
def test_timeout_set(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
assert mock_run.call_args[1]["timeout"] == 600
|
||||
310
deploy/tests/test_web_ui.py
Normal file
310
deploy/tests/test_web_ui.py
Normal file
@@ -0,0 +1,310 @@
|
||||
"""Tests for deploy/web_ui.py – FastAPI web interface."""
|
||||
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_state():
|
||||
"""Reset in-memory state between tests."""
|
||||
import deploy.web_ui as mod
|
||||
|
||||
mod._valid_sessions.clear()
|
||||
mod._jobs.clear()
|
||||
yield
|
||||
mod._valid_sessions.clear()
|
||||
mod._jobs.clear()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_no_auth():
|
||||
"""Test client with auth disabled (no AUTH_PASSWORD)."""
|
||||
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", ""):
|
||||
from deploy.web_ui import app
|
||||
|
||||
yield TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_with_auth():
|
||||
"""Test client with auth enabled."""
|
||||
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", "secret123"):
|
||||
from deploy.web_ui import app
|
||||
|
||||
yield TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
def _login(client, password="secret123"):
|
||||
"""Helper: log in and return the session cookie."""
|
||||
resp = client.post("/login", data={"password": password}, follow_redirects=False)
|
||||
return resp.cookies.get("aa_session")
|
||||
|
||||
|
||||
# ── Health check ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestHealthCheck:
|
||||
def test_status_returns_ok(self, client_no_auth):
|
||||
resp = client_no_auth.get("/status")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == {"status": "ok"}
|
||||
|
||||
def test_status_no_auth_required(self, client_with_auth):
|
||||
resp = client_with_auth.get("/status")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == {"status": "ok"}
|
||||
|
||||
|
||||
# ── Auth disabled ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestNoAuth:
|
||||
def test_index_accessible(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "Auto Archiver" in resp.text
|
||||
|
||||
def test_login_page_redirects_to_index(self, client_no_auth):
|
||||
resp = client_no_auth.get("/login", follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
assert resp.headers["location"] == "/"
|
||||
|
||||
def test_login_post_redirects_to_index(self, client_no_auth):
|
||||
resp = client_no_auth.post("/login", data={"password": "anything"}, follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
|
||||
def test_no_logout_link_shown(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert "Logout" not in resp.text
|
||||
|
||||
|
||||
# ── Auth enabled ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestAuth:
|
||||
def test_index_redirects_to_login(self, client_with_auth):
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
assert resp.headers["location"] == "/login"
|
||||
|
||||
def test_login_page_renders(self, client_with_auth):
|
||||
resp = client_with_auth.get("/login")
|
||||
assert resp.status_code == 200
|
||||
assert "Password" in resp.text
|
||||
|
||||
def test_wrong_password_returns_401(self, client_with_auth):
|
||||
resp = client_with_auth.post("/login", data={"password": "wrong"})
|
||||
assert resp.status_code == 401
|
||||
assert "Wrong password" in resp.text
|
||||
|
||||
def test_correct_password_sets_cookie(self, client_with_auth):
|
||||
resp = client_with_auth.post("/login", data={"password": "secret123"}, follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
assert "aa_session" in resp.cookies
|
||||
|
||||
def test_authenticated_access(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "Auto Archiver" in resp.text
|
||||
|
||||
def test_logout_clears_session(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/logout", follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
# After logout, index should redirect to login again
|
||||
client_with_auth.cookies.clear()
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
def test_logout_link_shown_when_auth_enabled(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/")
|
||||
assert "Logout" in resp.text
|
||||
|
||||
def test_results_requires_auth(self, client_with_auth):
|
||||
resp = client_with_auth.get("/results", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
def test_invalid_session_rejected(self, client_with_auth):
|
||||
client_with_auth.cookies.set("aa_session", "bogus-token")
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
|
||||
# ── Archive submission ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestArchive:
|
||||
def test_archive_creates_job(self, client_no_auth):
|
||||
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
|
||||
resp = client_no_auth.post(
|
||||
"/archive",
|
||||
data={"urls": "https://example.com\nhttps://example.org"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 303
|
||||
assert resp.headers["location"] == "/"
|
||||
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
assert len(_jobs) == 1
|
||||
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
|
||||
assert _jobs[0]["status"] == "running"
|
||||
|
||||
def test_archive_empty_urls_returns_400(self, client_no_auth):
|
||||
resp = client_no_auth.post("/archive", data={"urls": " \n \n"})
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_archive_strips_whitespace(self, client_no_auth):
|
||||
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
|
||||
client_no_auth.post(
|
||||
"/archive",
|
||||
data={"urls": " https://example.com \n\n https://example.org \n"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
|
||||
|
||||
def test_archive_requires_auth(self, client_with_auth):
|
||||
resp = client_with_auth.post(
|
||||
"/archive",
|
||||
data={"urls": "https://example.com"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 307
|
||||
|
||||
|
||||
# ── Results page ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestResults:
|
||||
def test_results_empty(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "No archived files yet" in resp.text
|
||||
|
||||
def test_results_lists_files(self, client_no_auth, tmp_path):
|
||||
(tmp_path / "test.html").write_text("<html>archived</html>")
|
||||
(tmp_path / "video.mp4").write_bytes(b"\x00" * 10)
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "test.html" in resp.text
|
||||
assert "video.mp4" in resp.text
|
||||
|
||||
def test_results_nonexistent_dir(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path / "nonexistent"):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "No archived files yet" in resp.text
|
||||
|
||||
|
||||
# ── File serving ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestFileServing:
|
||||
def test_serve_existing_file(self, client_no_auth, tmp_path):
|
||||
(tmp_path / "report.html").write_text("<html>done</html>")
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/files/report.html")
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_serve_nonexistent_file(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/files/nope.txt")
|
||||
assert resp.status_code == 404
|
||||
|
||||
def test_path_traversal_blocked(self, client_no_auth, tmp_path):
|
||||
# Create a file outside the archive dir
|
||||
outside = tmp_path / "outside"
|
||||
outside.mkdir()
|
||||
(outside / "secret.txt").write_text("secret")
|
||||
archive = tmp_path / "archive"
|
||||
archive.mkdir()
|
||||
# Symlink into archive pointing outside
|
||||
(archive / "escape").symlink_to(outside / "secret.txt")
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", archive):
|
||||
resp = client_no_auth.get("/files/escape")
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ── Job rendering ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestJobRendering:
|
||||
def test_no_jobs_shows_message(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert "No archiving jobs yet" in resp.text
|
||||
|
||||
def test_jobs_shown_in_table(self, client_no_auth):
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
_jobs.append(
|
||||
{
|
||||
"id": 1,
|
||||
"urls": ["https://example.com"],
|
||||
"status": "done",
|
||||
"started": "2026-01-01 00:00 UTC",
|
||||
"output": "",
|
||||
}
|
||||
)
|
||||
resp = client_no_auth.get("/")
|
||||
assert "example.com" in resp.text
|
||||
assert "done" in resp.text
|
||||
|
||||
def test_many_urls_truncated(self, client_no_auth):
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
_jobs.append(
|
||||
{
|
||||
"id": 1,
|
||||
"urls": [f"https://example.com/{i}" for i in range(10)],
|
||||
"status": "running",
|
||||
"started": "2026-01-01 00:00 UTC",
|
||||
"output": "",
|
||||
}
|
||||
)
|
||||
resp = client_no_auth.get("/")
|
||||
assert "+7 more" in resp.text
|
||||
|
||||
|
||||
# ── HTML template rendering ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestTemplates:
|
||||
"""Verify HTML templates can be .format()-ed without KeyError."""
|
||||
|
||||
def test_login_html_renders(self):
|
||||
from deploy.web_ui import LOGIN_HTML
|
||||
|
||||
result = LOGIN_HTML.format(error="")
|
||||
assert "Auto Archiver" in result
|
||||
|
||||
def test_login_html_renders_with_error(self):
|
||||
from deploy.web_ui import LOGIN_HTML
|
||||
|
||||
result = LOGIN_HTML.format(error='<p class="err">Nope</p>')
|
||||
assert "Nope" in result
|
||||
|
||||
def test_main_html_renders(self):
|
||||
from deploy.web_ui import MAIN_HTML
|
||||
|
||||
result = MAIN_HTML.format(logout="", jobs_html="")
|
||||
assert "Auto Archiver" in result
|
||||
|
||||
def test_results_html_renders(self):
|
||||
from deploy.web_ui import RESULTS_HTML
|
||||
|
||||
result = RESULTS_HTML.format(file_list="<p>empty</p>")
|
||||
assert "Archived Files" in result
|
||||
Reference in New Issue
Block a user