From ac4c09810bdf0127ea1def8d5d42a5ee3d5691cd Mon Sep 17 00:00:00 2001
From: msramalho <19508417+msramalho@users.noreply.github.com>
Date: Thu, 12 Mar 2026 11:47:20 +0000
Subject: [PATCH] experimental feature for one-click deployment
---
.github/workflows/tests-deploy.yaml | 29 +++
README.md | 35 ++-
deploy/Dockerfile | 34 +++
deploy/__init__.py | 1 +
deploy/generate_config.py | 163 ++++++++++++
deploy/gsheet_poller.py | 71 ++++++
deploy/pytest.ini | 2 +
deploy/start.py | 37 +++
deploy/tests/__init__.py | 0
deploy/tests/test_generate_config.py | 354 +++++++++++++++++++++++++++
deploy/tests/test_gsheet_poller.py | 124 ++++++++++
deploy/tests/test_web_ui.py | 310 +++++++++++++++++++++++
deploy/web_ui.py | 269 ++++++++++++++++++++
railway.json | 99 ++++++++
14 files changed, 1527 insertions(+), 1 deletion(-)
create mode 100644 .github/workflows/tests-deploy.yaml
create mode 100644 deploy/Dockerfile
create mode 100644 deploy/__init__.py
create mode 100644 deploy/generate_config.py
create mode 100644 deploy/gsheet_poller.py
create mode 100644 deploy/pytest.ini
create mode 100644 deploy/start.py
create mode 100644 deploy/tests/__init__.py
create mode 100644 deploy/tests/test_generate_config.py
create mode 100644 deploy/tests/test_gsheet_poller.py
create mode 100644 deploy/tests/test_web_ui.py
create mode 100644 deploy/web_ui.py
create mode 100644 railway.json
diff --git a/.github/workflows/tests-deploy.yaml b/.github/workflows/tests-deploy.yaml
new file mode 100644
index 0000000..e2356d6
--- /dev/null
+++ b/.github/workflows/tests-deploy.yaml
@@ -0,0 +1,29 @@
+name: Deploy Tests
+
+on:
+ push:
+ branches: [ main ]
+ paths:
+ - deploy/**
+ pull_request:
+ paths:
+ - deploy/**
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v6
+ with:
+ python-version: "3.12"
+
+ - name: Install dependencies
+ run: pip install pytest fastapi httpx python-multipart pyyaml
+
+ - name: Run Deploy Tests
+ working-directory: deploy
+ run: python -m pytest tests/ -v
diff --git a/README.md b/README.md
index b273e98..306f912 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,40 @@ Auto Archiver is a Python tool to automatically archive content on the web in a
Read the [article about Auto Archiver on bellingcat.com](https://www.bellingcat.com/resources/2022/09/22/preserve-vital-online-content-with-bellingcats-auto-archiver-tool/).
-## Installation
+## One-Click Cloud Deploy
+
+Deploy your own Auto Archiver instance to the cloud — no coding required:
+
+[](https://railway.app/new/template?template=https://github.com/bellingcat/auto-archiver&envs=AUTH_PASSWORD,GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&optionalEnvs=GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&AUTH_PASSWORDDesc=Password+to+access+your+archiver+web+interface&GSHEET_URLDesc=Google+Sheet+URL+to+monitor+for+new+URLs+(leave+empty+to+disable)&POLL_INTERVALDesc=Seconds+between+Google+Sheet+checks+(min+60)&POLL_INTERVALDefault=300&S3_BUCKETDesc=S3+bucket+name+for+storage+(leave+empty+for+local+only)&S3_REGIONDefault=us-east-1&LOG_LEVELDefault=INFO)
+
+**What you get:** A web interface where you can paste URLs and archive them instantly. Optionally connect a Google Sheet for automated monitoring, S3 for cloud storage, and Telegram for archiving channels.
+
+**Only required setting:** `AUTH_PASSWORD` — everything else is optional and can be configured later via the Railway dashboard.
+
+
+📋 Environment variables reference
+
+| Variable | Required | Description |
+|----------|----------|-------------|
+| `AUTH_PASSWORD` | **Yes** | Password to access the web interface |
+| `GSHEET_URL` | No | Google Sheet URL to monitor for new URLs [use this template](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?gid=0#gid=0) |
+| `GOOGLE_SERVICE_ACCOUNT_JSON` | No | Google service account JSON (required with Sheets) [follow these instructions](https://auto-archiver.readthedocs.io/en/v1.0.1/how_to/gsheets_setup.html) |
+| `POLL_INTERVAL` | No | Seconds between Sheet checks (default: 300) |
+| `S3_BUCKET` | No | S3 bucket name for archived content, ideal for cloud hosting your archives but not mandatory, any S3-compatible storage works |
+| `S3_KEY` / `S3_SECRET` | No | S3 credentials |
+| `S3_REGION` | No | S3 region (default: us-east-1) |
+| `S3_ENDPOINT` | No | S3 endpoint URL |
+| `TELEGRAM_API_ID` / `TELEGRAM_API_HASH` | No | Telegram API credentials |
+| `TELEGRAM_BOT_TOKEN` | No | Telegram bot token |
+| `ENABLE_SCREENSHOTS` | No | Set to `true` for full-page screenshots |
+| `ENABLE_THUMBNAILS` | No | Set to `true` for video thumbnails |
+| `ENABLE_CSV_DB` | No | Set to `true` for CSV logging |
+| `LOG_LEVEL` | No | DEBUG, INFO, WARNING, ERROR (default: INFO) |
+
+
+
+
+## Traditional Installation
View the [Installation Guide](https://auto-archiver.readthedocs.io/en/latest/installation/installation.html) for full instructions
diff --git a/deploy/Dockerfile b/deploy/Dockerfile
new file mode 100644
index 0000000..58379d4
--- /dev/null
+++ b/deploy/Dockerfile
@@ -0,0 +1,34 @@
+# ── Cloud Deploy ──────────────────────────────────────────────────────
+# Thin web UI + config generator layer on top of the published
+# auto-archiver Docker image. Used by the Railway one-click deploy.
+#
+# Build:
+# docker build -f deploy/Dockerfile -t auto-archiver-deploy .
+#
+# Run:
+# docker run -p 8080:8080 -e PORT=8080 -e AUTH_PASSWORD=secret auto-archiver-deploy
+# ──────────────────────────────────────────────────────────────────────
+
+FROM bellingcat/auto-archiver:latest
+
+USER root
+
+# Install the lightweight web layer dependencies
+RUN pip install --no-cache-dir fastapi uvicorn[standard] python-multipart pyyaml
+
+# Copy deploy scripts into the image
+COPY deploy/ /app/deploy/
+
+# Ensure writable dirs exist
+RUN mkdir -p /app/local_archive /app/secrets && \
+ chown -R 1000:1000 /app/local_archive /app/secrets /app/deploy
+
+USER 1000
+
+# Railway sets PORT; default to 8080
+ENV PORT=8080
+
+EXPOSE ${PORT}
+
+# Override the CLI entrypoint with the web server
+ENTRYPOINT ["python3", "-m", "deploy.start"]
diff --git a/deploy/__init__.py b/deploy/__init__.py
new file mode 100644
index 0000000..d2373c6
--- /dev/null
+++ b/deploy/__init__.py
@@ -0,0 +1 @@
+# Cloud deployment layer for auto-archiver
diff --git a/deploy/generate_config.py b/deploy/generate_config.py
new file mode 100644
index 0000000..884911c
--- /dev/null
+++ b/deploy/generate_config.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python3
+"""
+Generates orchestration.yaml from environment variables.
+
+This script bridges Railway's env-var-based configuration with
+auto-archiver's YAML-based configuration system. It runs at container
+startup before the web UI server starts.
+"""
+
+import os
+from pathlib import Path
+
+import yaml
+
+
+CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
+SECRETS_DIR = Path("/app/secrets")
+
+
+def build_config() -> dict:
+ """Build an orchestration config dict from environment variables."""
+
+ # -- Base config: always present ------------------------------------
+ config = {
+ "steps": {
+ "feeders": ["cli_feeder"],
+ "extractors": ["generic_extractor"],
+ "enrichers": ["hash_enricher"],
+ "databases": ["console_db"],
+ "storages": ["local_storage"],
+ "formatters": ["html_formatter"],
+ },
+ "logging": {
+ "level": os.environ.get("LOG_LEVEL", "INFO"),
+ },
+ "local_storage": {
+ "save_to": "/app/local_archive",
+ "path_generator": "flat",
+ "filename_generator": "static",
+ },
+ "generic_extractor": {
+ "subtitles": os.environ.get("SUBTITLES", "false").lower() == "true",
+ "comments": False,
+ "livestreams": False,
+ "live_from_start": False,
+ "end_means_success": True,
+ "allow_playlist": False,
+ },
+ "hash_enricher": {
+ "algorithm": "SHA-256",
+ },
+ "html_formatter": {
+ "detect_thumbnails": True,
+ },
+ "authentication": {},
+ }
+
+ # -- Google Sheets feeder (optional) --------------------------------
+ gsheet_url = os.environ.get("GSHEET_URL", "")
+ if gsheet_url:
+ config["steps"]["feeders"].append("gsheet_feeder")
+ config["steps"]["databases"].append("gsheet_db")
+ config["gsheet_feeder"] = {
+ "sheet": gsheet_url,
+ "header": 1,
+ "service_account": str(SECRETS_DIR / "service_account.json"),
+ "use_sheet_names_in_stored_paths": False,
+ "columns": {
+ "url": "link",
+ "status": "archive status",
+ "folder": "destination folder",
+ "archive": "archive location",
+ "date": "archive date",
+ "thumbnail": "thumbnail",
+ "timestamp": "upload timestamp",
+ "title": "upload title",
+ "text": "textual content",
+ "screenshot": "screenshot",
+ "hash": "hash",
+ "pdq_hash": "perceptual hashes",
+ },
+ }
+
+ # -- Google service account JSON (optional) -------------------------
+ sa_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")
+ if sa_json:
+ SECRETS_DIR.mkdir(parents=True, exist_ok=True)
+ sa_path = SECRETS_DIR / "service_account.json"
+ sa_path.write_text(sa_json)
+ print(f"[deploy] Wrote Google service account to {sa_path}")
+
+ # -- S3 storage (optional) ------------------------------------------
+ s3_bucket = os.environ.get("S3_BUCKET", "")
+ if s3_bucket:
+ config["steps"]["storages"].append("s3_storage")
+ config["s3_storage"] = {
+ "bucket": s3_bucket,
+ "region": os.environ.get("S3_REGION", "us-east-1"),
+ "key": os.environ.get("S3_KEY", ""),
+ "secret": os.environ.get("S3_SECRET", ""),
+ "endpoint_url": os.environ.get("S3_ENDPOINT", "https://s3.{region}.amazonaws.com"),
+ "cdn_url": os.environ.get(
+ "S3_CDN_URL",
+ "https://{bucket}.s3.{region}.amazonaws.com/{key}",
+ ),
+ "private": os.environ.get("S3_PRIVATE", "false").lower() == "true",
+ "random_no_duplicate": True,
+ "key_path": "random",
+ }
+
+ # -- Telegram extractor (optional) ----------------------------------
+ tg_api_id = os.environ.get("TELEGRAM_API_ID", "")
+ tg_api_hash = os.environ.get("TELEGRAM_API_HASH", "")
+ if tg_api_id and tg_api_hash:
+ config["steps"]["extractors"].append("telegram_extractor")
+ config["telegram_extractor"] = {
+ "api_id": tg_api_id,
+ "api_hash": tg_api_hash,
+ }
+ bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
+ if bot_token:
+ config["telegram_extractor"]["bot_token"] = bot_token
+
+ # -- Screenshot enricher (optional) ---------------------------------
+ if os.environ.get("ENABLE_SCREENSHOTS", "").lower() == "true":
+ config["steps"]["enrichers"].append("screenshot_enricher")
+ config["screenshot_enricher"] = {
+ "width": 1280,
+ "height": 7200,
+ "save_to_pdf": True,
+ }
+
+ # -- Thumbnail enricher (optional) ----------------------------------
+ if os.environ.get("ENABLE_THUMBNAILS", "").lower() == "true":
+ config["steps"]["enrichers"].append("thumbnail_enricher")
+ config["thumbnail_enricher"] = {
+ "thumbnails_per_minute": 60,
+ "max_thumbnails": 16,
+ }
+
+ # -- CSV database (optional) ----------------------------------------
+ if os.environ.get("ENABLE_CSV_DB", "").lower() == "true":
+ config["steps"]["databases"].append("csv_db")
+ config["csv_db"] = {
+ "csv_file": "/app/local_archive/db.csv",
+ }
+
+ return config
+
+
+def main():
+ config = build_config()
+
+ CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
+ with open(CONFIG_PATH, "w") as f:
+ yaml.dump(config, f, default_flow_style=False, sort_keys=False)
+
+ print(f"[deploy] Generated config at {CONFIG_PATH}")
+ print(f"[deploy] Active steps: {config['steps']}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/deploy/gsheet_poller.py b/deploy/gsheet_poller.py
new file mode 100644
index 0000000..bf114d7
--- /dev/null
+++ b/deploy/gsheet_poller.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+"""
+Background Google Sheets poller for auto-archiver cloud deployments.
+
+When GSHEET_URL is set, periodically runs auto-archiver with gsheet_feeder
+to check for new URLs in the configured spreadsheet. Runs as a daemon thread
+alongside the web UI.
+"""
+
+import logging
+import os
+import subprocess
+import threading
+import time
+
+logger = logging.getLogger("gsheet_poller")
+
+CONFIG_PATH = "/app/secrets/orchestration.yaml"
+
+
+def _poll_once():
+ """Run auto-archiver once to process any new rows in the Google Sheet."""
+ logger.info("Polling Google Sheet for new URLs...")
+ try:
+ result = subprocess.run(
+ ["python3", "-m", "auto_archiver", "--config", CONFIG_PATH],
+ capture_output=True,
+ text=True,
+ cwd="/app",
+ timeout=600, # 10 minute timeout per poll
+ )
+ if result.returncode == 0:
+ logger.info("Sheet poll completed successfully.")
+ else:
+ logger.warning("Sheet poll exited with code %d: %s", result.returncode, result.stderr[-500:])
+ except subprocess.TimeoutExpired:
+ logger.error("Sheet poll timed out after 600s")
+ except Exception:
+ logger.exception("Sheet poll failed")
+
+
+def _poll_loop(interval: int):
+ """Run the poll loop at the given interval (seconds)."""
+ logger.info("Google Sheets poller started (interval=%ds)", interval)
+ while True:
+ _poll_once()
+ time.sleep(interval)
+
+
+def start_poller():
+ """
+ Start the Google Sheets poller as a daemon thread if GSHEET_URL is set.
+ Call this once at application startup.
+ """
+ gsheet_url = os.environ.get("GSHEET_URL", "")
+ if not gsheet_url:
+ logger.info("GSHEET_URL not set – Sheet poller disabled.")
+ return
+
+ interval = int(os.environ.get("POLL_INTERVAL", "300"))
+ if interval < 60:
+ interval = 60 # minimum 1 minute
+
+ thread = threading.Thread(
+ target=_poll_loop,
+ args=(interval,),
+ daemon=True,
+ name="gsheet-poller",
+ )
+ thread.start()
+ logger.info("Google Sheets poller thread started.")
diff --git a/deploy/pytest.ini b/deploy/pytest.ini
new file mode 100644
index 0000000..5ee6477
--- /dev/null
+++ b/deploy/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+testpaths = tests
diff --git a/deploy/start.py b/deploy/start.py
new file mode 100644
index 0000000..40aa3a1
--- /dev/null
+++ b/deploy/start.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python3
+"""
+Startup entrypoint for cloud deployments.
+
+1. Generates orchestration.yaml from environment variables
+2. Starts the Google Sheets poller (if GSHEET_URL is set)
+3. Starts the FastAPI web UI
+"""
+
+import os
+import logging
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
+)
+
+# Generate config from env vars
+from deploy.generate_config import main as generate_config # noqa: E402
+
+generate_config()
+
+# Start gsheet poller (no-op if GSHEET_URL not set)
+from deploy.gsheet_poller import start_poller # noqa: E402
+
+start_poller()
+
+# Start web server
+import uvicorn # noqa: E402
+
+port = int(os.environ.get("PORT", "8080"))
+uvicorn.run(
+ "deploy.web_ui:app",
+ host="0.0.0.0",
+ port=port,
+ log_level="info",
+)
diff --git a/deploy/tests/__init__.py b/deploy/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/deploy/tests/test_generate_config.py b/deploy/tests/test_generate_config.py
new file mode 100644
index 0000000..3e17e94
--- /dev/null
+++ b/deploy/tests/test_generate_config.py
@@ -0,0 +1,354 @@
+"""Tests for deploy/generate_config.py – config generation from env vars."""
+
+import json
+import os
+from unittest.mock import patch
+
+import yaml
+
+from deploy.generate_config import build_config, main
+
+
+# ── Helpers ───────────────────────────────────────────────────────────
+
+
+def _env(**overrides):
+ """Return a clean env dict with only the given overrides (no leak from host)."""
+ # Clear all deploy-relevant env vars, then apply overrides
+ deploy_vars = [
+ "LOG_LEVEL",
+ "SUBTITLES",
+ "GSHEET_URL",
+ "GOOGLE_SERVICE_ACCOUNT_JSON",
+ "S3_BUCKET",
+ "S3_KEY",
+ "S3_SECRET",
+ "S3_REGION",
+ "S3_ENDPOINT",
+ "S3_CDN_URL",
+ "S3_PRIVATE",
+ "TELEGRAM_API_ID",
+ "TELEGRAM_API_HASH",
+ "TELEGRAM_BOT_TOKEN",
+ "ENABLE_SCREENSHOTS",
+ "ENABLE_THUMBNAILS",
+ "ENABLE_CSV_DB",
+ ]
+ clean = {k: v for k, v in os.environ.items() if k not in deploy_vars}
+ clean.update(overrides)
+ return clean
+
+
+# ── Base config (no optional env vars) ────────────────────────────────
+
+
+class TestBaseConfig:
+ """When no optional env vars are set, build_config returns a minimal working config."""
+
+ def test_base_steps(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ steps = cfg["steps"]
+ assert steps["feeders"] == ["cli_feeder"]
+ assert steps["extractors"] == ["generic_extractor"]
+ assert steps["enrichers"] == ["hash_enricher"]
+ assert steps["databases"] == ["console_db"]
+ assert steps["storages"] == ["local_storage"]
+ assert steps["formatters"] == ["html_formatter"]
+
+ def test_base_has_required_module_configs(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert "local_storage" in cfg
+ assert "generic_extractor" in cfg
+ assert "hash_enricher" in cfg
+ assert "html_formatter" in cfg
+
+ def test_default_log_level_is_info(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert cfg["logging"]["level"] == "INFO"
+
+ def test_custom_log_level(self):
+ with patch.dict(os.environ, _env(LOG_LEVEL="DEBUG"), clear=True):
+ cfg = build_config()
+ assert cfg["logging"]["level"] == "DEBUG"
+
+ def test_authentication_present_and_empty(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert cfg["authentication"] == {}
+
+ def test_local_storage_defaults(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ ls = cfg["local_storage"]
+ assert ls["save_to"] == "/app/local_archive"
+ assert ls["path_generator"] == "flat"
+ assert ls["filename_generator"] == "static"
+
+ def test_subtitles_default_false(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert cfg["generic_extractor"]["subtitles"] is False
+
+ def test_subtitles_enabled(self):
+ with patch.dict(os.environ, _env(SUBTITLES="true"), clear=True):
+ cfg = build_config()
+ assert cfg["generic_extractor"]["subtitles"] is True
+
+ def test_subtitles_case_insensitive(self):
+ with patch.dict(os.environ, _env(SUBTITLES="True"), clear=True):
+ cfg = build_config()
+ assert cfg["generic_extractor"]["subtitles"] is True
+
+ def test_no_optional_modules_present(self):
+ """Ensure optional modules don't appear when their env vars are absent."""
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert "gsheet_feeder" not in cfg
+ assert "s3_storage" not in cfg
+ assert "telegram_extractor" not in cfg
+ assert "screenshot_enricher" not in cfg
+ assert "thumbnail_enricher" not in cfg
+ assert "csv_db" not in cfg
+
+ def test_config_is_valid_yaml(self):
+ """The output dict should round-trip through YAML cleanly."""
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ dumped = yaml.dump(cfg)
+ reloaded = yaml.safe_load(dumped)
+ assert reloaded == cfg
+
+
+# ── Google Sheets ─────────────────────────────────────────────────────
+
+
+class TestGSheetConfig:
+ def test_gsheet_adds_feeder_and_db(self):
+ with patch.dict(os.environ, _env(GSHEET_URL="https://docs.google.com/spreadsheets/d/abc"), clear=True):
+ cfg = build_config()
+ assert "gsheet_feeder" in cfg["steps"]["feeders"]
+ assert "gsheet_db" in cfg["steps"]["databases"]
+
+ def test_gsheet_feeder_config(self):
+ url = "https://docs.google.com/spreadsheets/d/abc123"
+ with patch.dict(os.environ, _env(GSHEET_URL=url), clear=True):
+ cfg = build_config()
+ gf = cfg["gsheet_feeder"]
+ assert gf["sheet"] == url
+ assert gf["header"] == 1
+ assert "service_account" in gf
+ assert gf["columns"]["url"] == "link"
+ assert gf["columns"]["status"] == "archive status"
+
+ def test_gsheet_preserves_cli_feeder(self):
+ """cli_feeder should still be present even when gsheet is added."""
+ with patch.dict(os.environ, _env(GSHEET_URL="https://example.com/sheet"), clear=True):
+ cfg = build_config()
+ assert "cli_feeder" in cfg["steps"]["feeders"]
+
+ def test_service_account_json_written(self, tmp_path):
+ """When GOOGLE_SERVICE_ACCOUNT_JSON is set, it writes the file."""
+ sa_data = json.dumps({"type": "service_account", "project_id": "test"})
+ secrets_dir = tmp_path / "secrets"
+ with (
+ patch.dict(os.environ, _env(GOOGLE_SERVICE_ACCOUNT_JSON=sa_data), clear=True),
+ patch("deploy.generate_config.SECRETS_DIR", secrets_dir),
+ ):
+ build_config()
+ sa_path = secrets_dir / "service_account.json"
+ assert sa_path.exists()
+ assert json.loads(sa_path.read_text())["project_id"] == "test"
+
+
+# ── S3 storage ────────────────────────────────────────────────────────
+
+
+class TestS3Config:
+ def test_s3_adds_storage(self):
+ with patch.dict(os.environ, _env(S3_BUCKET="my-bucket"), clear=True):
+ cfg = build_config()
+ assert "s3_storage" in cfg["steps"]["storages"]
+ assert "local_storage" in cfg["steps"]["storages"] # local still there
+
+ def test_s3_config_values(self):
+ env = _env(
+ S3_BUCKET="my-bucket",
+ S3_KEY="AKID",
+ S3_SECRET="shhh",
+ S3_REGION="eu-west-1",
+ )
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+ s3 = cfg["s3_storage"]
+ assert s3["bucket"] == "my-bucket"
+ assert s3["key"] == "AKID"
+ assert s3["secret"] == "shhh"
+ assert s3["region"] == "eu-west-1"
+ assert s3["private"] is False
+ assert s3["random_no_duplicate"] is True
+
+ def test_s3_defaults(self):
+ with patch.dict(os.environ, _env(S3_BUCKET="b"), clear=True):
+ cfg = build_config()
+ s3 = cfg["s3_storage"]
+ assert s3["region"] == "us-east-1"
+ assert "{region}" in s3["endpoint_url"]
+
+ def test_s3_private_flag(self):
+ with patch.dict(os.environ, _env(S3_BUCKET="b", S3_PRIVATE="true"), clear=True):
+ cfg = build_config()
+ assert cfg["s3_storage"]["private"] is True
+
+ def test_s3_custom_endpoint(self):
+ endpoint = "https://nyc3.digitaloceanspaces.com"
+ with patch.dict(os.environ, _env(S3_BUCKET="b", S3_ENDPOINT=endpoint), clear=True):
+ cfg = build_config()
+ assert cfg["s3_storage"]["endpoint_url"] == endpoint
+
+
+# ── Telegram ──────────────────────────────────────────────────────────
+
+
+class TestTelegramConfig:
+ def test_telegram_added_when_both_set(self):
+ env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+ assert "telegram_extractor" in cfg["steps"]["extractors"]
+ assert cfg["telegram_extractor"]["api_id"] == "12345"
+ assert cfg["telegram_extractor"]["api_hash"] == "abc"
+
+ def test_telegram_not_added_if_only_id(self):
+ with patch.dict(os.environ, _env(TELEGRAM_API_ID="12345"), clear=True):
+ cfg = build_config()
+ assert "telegram_extractor" not in cfg["steps"]["extractors"]
+
+ def test_telegram_not_added_if_only_hash(self):
+ with patch.dict(os.environ, _env(TELEGRAM_API_HASH="abc"), clear=True):
+ cfg = build_config()
+ assert "telegram_extractor" not in cfg["steps"]["extractors"]
+
+ def test_telegram_bot_token_optional(self):
+ env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc", TELEGRAM_BOT_TOKEN="bot:tok")
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+ assert cfg["telegram_extractor"]["bot_token"] == "bot:tok"
+
+ def test_telegram_no_bot_token(self):
+ env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+ assert "bot_token" not in cfg["telegram_extractor"]
+
+
+# ── Optional enrichers / databases ────────────────────────────────────
+
+
+class TestOptionalModules:
+ def test_screenshots_disabled_by_default(self):
+ with patch.dict(os.environ, _env(), clear=True):
+ cfg = build_config()
+ assert "screenshot_enricher" not in cfg["steps"]["enrichers"]
+
+ def test_screenshots_enabled(self):
+ with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="true"), clear=True):
+ cfg = build_config()
+ assert "screenshot_enricher" in cfg["steps"]["enrichers"]
+ assert cfg["screenshot_enricher"]["width"] == 1280
+
+ def test_thumbnails_enabled(self):
+ with patch.dict(os.environ, _env(ENABLE_THUMBNAILS="true"), clear=True):
+ cfg = build_config()
+ assert "thumbnail_enricher" in cfg["steps"]["enrichers"]
+ assert cfg["thumbnail_enricher"]["max_thumbnails"] == 16
+
+ def test_csv_db_enabled(self):
+ with patch.dict(os.environ, _env(ENABLE_CSV_DB="true"), clear=True):
+ cfg = build_config()
+ assert "csv_db" in cfg["steps"]["databases"]
+ assert cfg["csv_db"]["csv_file"] == "/app/local_archive/db.csv"
+
+ def test_case_insensitive_boolean(self):
+ with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="TRUE"), clear=True):
+ cfg = build_config()
+ assert "screenshot_enricher" in cfg["steps"]["enrichers"]
+
+
+# ── Combined / full config ────────────────────────────────────────────
+
+
+class TestCombinedConfig:
+ def test_all_optional_modules_together(self):
+ """Enable everything at once and verify no conflicts."""
+ env = _env(
+ GSHEET_URL="https://example.com/sheet",
+ S3_BUCKET="bucket",
+ S3_KEY="key",
+ S3_SECRET="secret",
+ TELEGRAM_API_ID="123",
+ TELEGRAM_API_HASH="abc",
+ TELEGRAM_BOT_TOKEN="tok",
+ ENABLE_SCREENSHOTS="true",
+ ENABLE_THUMBNAILS="true",
+ ENABLE_CSV_DB="true",
+ )
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+
+ steps = cfg["steps"]
+ assert "gsheet_feeder" in steps["feeders"]
+ assert "telegram_extractor" in steps["extractors"]
+ assert "screenshot_enricher" in steps["enrichers"]
+ assert "thumbnail_enricher" in steps["enrichers"]
+ assert "csv_db" in steps["databases"]
+ assert "gsheet_db" in steps["databases"]
+ assert "s3_storage" in steps["storages"]
+ assert "local_storage" in steps["storages"]
+
+ # All module configs present
+ for key in [
+ "gsheet_feeder",
+ "s3_storage",
+ "telegram_extractor",
+ "screenshot_enricher",
+ "thumbnail_enricher",
+ "csv_db",
+ ]:
+ assert key in cfg, f"{key} config missing"
+
+ def test_full_config_valid_yaml(self):
+ env = _env(
+ GSHEET_URL="https://example.com/sheet",
+ S3_BUCKET="bucket",
+ TELEGRAM_API_ID="123",
+ TELEGRAM_API_HASH="abc",
+ ENABLE_SCREENSHOTS="true",
+ ENABLE_CSV_DB="true",
+ )
+ with patch.dict(os.environ, env, clear=True):
+ cfg = build_config()
+ dumped = yaml.dump(cfg)
+ reloaded = yaml.safe_load(dumped)
+ assert reloaded == cfg
+
+
+# ── main() writes file ───────────────────────────────────────────────
+
+
+class TestMainFunction:
+ def test_main_writes_config_file(self, tmp_path):
+ config_path = tmp_path / "orchestration.yaml"
+ with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
+ main()
+ assert config_path.exists()
+ cfg = yaml.safe_load(config_path.read_text())
+ assert cfg["steps"]["feeders"] == ["cli_feeder"]
+
+ def test_main_creates_parent_dirs(self, tmp_path):
+ config_path = tmp_path / "nested" / "dir" / "orchestration.yaml"
+ with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
+ main()
+ assert config_path.exists()
diff --git a/deploy/tests/test_gsheet_poller.py b/deploy/tests/test_gsheet_poller.py
new file mode 100644
index 0000000..32c0c98
--- /dev/null
+++ b/deploy/tests/test_gsheet_poller.py
@@ -0,0 +1,124 @@
+"""Tests for deploy/gsheet_poller.py – background Google Sheets polling."""
+
+import os
+from unittest.mock import patch, MagicMock
+
+
+from deploy.gsheet_poller import start_poller, _poll_once
+
+
+# ── start_poller ──────────────────────────────────────────────────────
+
+
+class TestStartPoller:
+ def test_disabled_when_no_gsheet_url(self):
+ """No thread should be started when GSHEET_URL is empty."""
+ with (
+ patch.dict(os.environ, {"GSHEET_URL": ""}, clear=False),
+ patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
+ ):
+ start_poller()
+ mock_thread.assert_not_called()
+
+ def test_disabled_when_gsheet_url_absent(self):
+ env = {k: v for k, v in os.environ.items() if k != "GSHEET_URL"}
+ with patch.dict(os.environ, env, clear=True), patch("deploy.gsheet_poller.threading.Thread") as mock_thread:
+ start_poller()
+ mock_thread.assert_not_called()
+
+ def test_starts_thread_when_gsheet_url_set(self):
+ with (
+ patch.dict(os.environ, {"GSHEET_URL": "https://example.com/sheet"}, clear=False),
+ patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
+ ):
+ mock_instance = MagicMock()
+ mock_thread.return_value = mock_instance
+ start_poller()
+ mock_thread.assert_called_once()
+ assert mock_thread.call_args.kwargs["daemon"] is True
+ assert mock_thread.call_args.kwargs["name"] == "gsheet-poller"
+ mock_instance.start.assert_called_once()
+
+ def test_default_interval_300(self):
+ env = {"GSHEET_URL": "https://example.com/sheet"}
+ # Remove POLL_INTERVAL if present
+ clean_env = {k: v for k, v in os.environ.items() if k != "POLL_INTERVAL"}
+ clean_env.update(env)
+ with (
+ patch.dict(os.environ, clean_env, clear=True),
+ patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
+ ):
+ mock_thread.return_value = MagicMock()
+ start_poller()
+ # interval should be passed as arg to _poll_loop
+ args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
+ assert args == (300,)
+
+ def test_custom_interval(self):
+ with (
+ patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "600"}, clear=False),
+ patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
+ ):
+ mock_thread.return_value = MagicMock()
+ start_poller()
+ args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
+ assert args == (600,)
+
+ def test_interval_minimum_enforced(self):
+ """Intervals below 60 should be clamped to 60."""
+ with (
+ patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "10"}, clear=False),
+ patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
+ ):
+ mock_thread.return_value = MagicMock()
+ start_poller()
+ args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
+ assert args == (60,)
+
+
+# ── _poll_once ────────────────────────────────────────────────────────
+
+
+class TestPollOnce:
+ def test_calls_subprocess_with_config(self):
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.return_value = MagicMock(returncode=0, stderr="")
+ _poll_once()
+ mock_run.assert_called_once()
+ cmd = mock_run.call_args[0][0]
+ assert "auto_archiver" in " ".join(cmd)
+ assert "--config" in cmd
+
+ def test_handles_nonzero_exit(self):
+ """Should not raise on non-zero exit, just log a warning."""
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.return_value = MagicMock(returncode=1, stderr="some error")
+ _poll_once() # should not raise
+
+ def test_handles_timeout(self):
+ """Should not raise on timeout, just log."""
+ import subprocess
+
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=600)
+ _poll_once() # should not raise
+
+ def test_handles_exception(self):
+ """Should not raise on arbitrary exceptions."""
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.side_effect = OSError("broken")
+ _poll_once() # should not raise
+
+ def test_uses_correct_config_path(self):
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.return_value = MagicMock(returncode=0, stderr="")
+ _poll_once()
+ cmd = mock_run.call_args[0][0]
+ config_idx = cmd.index("--config")
+ assert cmd[config_idx + 1] == "/app/secrets/orchestration.yaml"
+
+ def test_timeout_set(self):
+ with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
+ mock_run.return_value = MagicMock(returncode=0, stderr="")
+ _poll_once()
+ assert mock_run.call_args[1]["timeout"] == 600
diff --git a/deploy/tests/test_web_ui.py b/deploy/tests/test_web_ui.py
new file mode 100644
index 0000000..b254455
--- /dev/null
+++ b/deploy/tests/test_web_ui.py
@@ -0,0 +1,310 @@
+"""Tests for deploy/web_ui.py – FastAPI web interface."""
+
+from unittest.mock import patch, AsyncMock
+
+import pytest
+from fastapi.testclient import TestClient
+
+
+# ── Fixtures ──────────────────────────────────────────────────────────
+
+
+@pytest.fixture(autouse=True)
+def _reset_state():
+ """Reset in-memory state between tests."""
+ import deploy.web_ui as mod
+
+ mod._valid_sessions.clear()
+ mod._jobs.clear()
+ yield
+ mod._valid_sessions.clear()
+ mod._jobs.clear()
+
+
+@pytest.fixture
+def client_no_auth():
+ """Test client with auth disabled (no AUTH_PASSWORD)."""
+ with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", ""):
+ from deploy.web_ui import app
+
+ yield TestClient(app, raise_server_exceptions=False)
+
+
+@pytest.fixture
+def client_with_auth():
+ """Test client with auth enabled."""
+ with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", "secret123"):
+ from deploy.web_ui import app
+
+ yield TestClient(app, raise_server_exceptions=False)
+
+
+def _login(client, password="secret123"):
+ """Helper: log in and return the session cookie."""
+ resp = client.post("/login", data={"password": password}, follow_redirects=False)
+ return resp.cookies.get("aa_session")
+
+
+# ── Health check ──────────────────────────────────────────────────────
+
+
+class TestHealthCheck:
+ def test_status_returns_ok(self, client_no_auth):
+ resp = client_no_auth.get("/status")
+ assert resp.status_code == 200
+ assert resp.json() == {"status": "ok"}
+
+ def test_status_no_auth_required(self, client_with_auth):
+ resp = client_with_auth.get("/status")
+ assert resp.status_code == 200
+ assert resp.json() == {"status": "ok"}
+
+
+# ── Auth disabled ─────────────────────────────────────────────────────
+
+
+class TestNoAuth:
+ def test_index_accessible(self, client_no_auth):
+ resp = client_no_auth.get("/")
+ assert resp.status_code == 200
+ assert "Auto Archiver" in resp.text
+
+ def test_login_page_redirects_to_index(self, client_no_auth):
+ resp = client_no_auth.get("/login", follow_redirects=False)
+ assert resp.status_code == 302
+ assert resp.headers["location"] == "/"
+
+ def test_login_post_redirects_to_index(self, client_no_auth):
+ resp = client_no_auth.post("/login", data={"password": "anything"}, follow_redirects=False)
+ assert resp.status_code == 302
+
+ def test_no_logout_link_shown(self, client_no_auth):
+ resp = client_no_auth.get("/")
+ assert "Logout" not in resp.text
+
+
+# ── Auth enabled ──────────────────────────────────────────────────────
+
+
+class TestAuth:
+ def test_index_redirects_to_login(self, client_with_auth):
+ resp = client_with_auth.get("/", follow_redirects=False)
+ assert resp.status_code == 307
+ assert resp.headers["location"] == "/login"
+
+ def test_login_page_renders(self, client_with_auth):
+ resp = client_with_auth.get("/login")
+ assert resp.status_code == 200
+ assert "Password" in resp.text
+
+ def test_wrong_password_returns_401(self, client_with_auth):
+ resp = client_with_auth.post("/login", data={"password": "wrong"})
+ assert resp.status_code == 401
+ assert "Wrong password" in resp.text
+
+ def test_correct_password_sets_cookie(self, client_with_auth):
+ resp = client_with_auth.post("/login", data={"password": "secret123"}, follow_redirects=False)
+ assert resp.status_code == 302
+ assert "aa_session" in resp.cookies
+
+ def test_authenticated_access(self, client_with_auth):
+ cookie = _login(client_with_auth)
+ client_with_auth.cookies.set("aa_session", cookie)
+ resp = client_with_auth.get("/")
+ assert resp.status_code == 200
+ assert "Auto Archiver" in resp.text
+
+ def test_logout_clears_session(self, client_with_auth):
+ cookie = _login(client_with_auth)
+ client_with_auth.cookies.set("aa_session", cookie)
+ resp = client_with_auth.get("/logout", follow_redirects=False)
+ assert resp.status_code == 302
+ # After logout, index should redirect to login again
+ client_with_auth.cookies.clear()
+ resp = client_with_auth.get("/", follow_redirects=False)
+ assert resp.status_code == 307
+
+ def test_logout_link_shown_when_auth_enabled(self, client_with_auth):
+ cookie = _login(client_with_auth)
+ client_with_auth.cookies.set("aa_session", cookie)
+ resp = client_with_auth.get("/")
+ assert "Logout" in resp.text
+
+ def test_results_requires_auth(self, client_with_auth):
+ resp = client_with_auth.get("/results", follow_redirects=False)
+ assert resp.status_code == 307
+
+ def test_invalid_session_rejected(self, client_with_auth):
+ client_with_auth.cookies.set("aa_session", "bogus-token")
+ resp = client_with_auth.get("/", follow_redirects=False)
+ assert resp.status_code == 307
+
+
+# ── Archive submission ────────────────────────────────────────────────
+
+
+class TestArchive:
+ def test_archive_creates_job(self, client_no_auth):
+ with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
+ resp = client_no_auth.post(
+ "/archive",
+ data={"urls": "https://example.com\nhttps://example.org"},
+ follow_redirects=False,
+ )
+ assert resp.status_code == 303
+ assert resp.headers["location"] == "/"
+
+ from deploy.web_ui import _jobs
+
+ assert len(_jobs) == 1
+ assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
+ assert _jobs[0]["status"] == "running"
+
+ def test_archive_empty_urls_returns_400(self, client_no_auth):
+ resp = client_no_auth.post("/archive", data={"urls": " \n \n"})
+ assert resp.status_code == 400
+
+ def test_archive_strips_whitespace(self, client_no_auth):
+ with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
+ client_no_auth.post(
+ "/archive",
+ data={"urls": " https://example.com \n\n https://example.org \n"},
+ follow_redirects=False,
+ )
+ from deploy.web_ui import _jobs
+
+ assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
+
+ def test_archive_requires_auth(self, client_with_auth):
+ resp = client_with_auth.post(
+ "/archive",
+ data={"urls": "https://example.com"},
+ follow_redirects=False,
+ )
+ assert resp.status_code == 307
+
+
+# ── Results page ──────────────────────────────────────────────────────
+
+
+class TestResults:
+ def test_results_empty(self, client_no_auth, tmp_path):
+ with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
+ resp = client_no_auth.get("/results")
+ assert resp.status_code == 200
+ assert "No archived files yet" in resp.text
+
+ def test_results_lists_files(self, client_no_auth, tmp_path):
+ (tmp_path / "test.html").write_text("archived")
+ (tmp_path / "video.mp4").write_bytes(b"\x00" * 10)
+ with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
+ resp = client_no_auth.get("/results")
+ assert resp.status_code == 200
+ assert "test.html" in resp.text
+ assert "video.mp4" in resp.text
+
+ def test_results_nonexistent_dir(self, client_no_auth, tmp_path):
+ with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path / "nonexistent"):
+ resp = client_no_auth.get("/results")
+ assert resp.status_code == 200
+ assert "No archived files yet" in resp.text
+
+
+# ── File serving ──────────────────────────────────────────────────────
+
+
+class TestFileServing:
+ def test_serve_existing_file(self, client_no_auth, tmp_path):
+ (tmp_path / "report.html").write_text("done")
+ with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
+ resp = client_no_auth.get("/files/report.html")
+ assert resp.status_code == 200
+
+ def test_serve_nonexistent_file(self, client_no_auth, tmp_path):
+ with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
+ resp = client_no_auth.get("/files/nope.txt")
+ assert resp.status_code == 404
+
+ def test_path_traversal_blocked(self, client_no_auth, tmp_path):
+ # Create a file outside the archive dir
+ outside = tmp_path / "outside"
+ outside.mkdir()
+ (outside / "secret.txt").write_text("secret")
+ archive = tmp_path / "archive"
+ archive.mkdir()
+ # Symlink into archive pointing outside
+ (archive / "escape").symlink_to(outside / "secret.txt")
+ with patch("deploy.web_ui.ARCHIVE_DIR", archive):
+ resp = client_no_auth.get("/files/escape")
+ assert resp.status_code == 403
+
+
+# ── Job rendering ─────────────────────────────────────────────────────
+
+
+class TestJobRendering:
+ def test_no_jobs_shows_message(self, client_no_auth):
+ resp = client_no_auth.get("/")
+ assert "No archiving jobs yet" in resp.text
+
+ def test_jobs_shown_in_table(self, client_no_auth):
+ from deploy.web_ui import _jobs
+
+ _jobs.append(
+ {
+ "id": 1,
+ "urls": ["https://example.com"],
+ "status": "done",
+ "started": "2026-01-01 00:00 UTC",
+ "output": "",
+ }
+ )
+ resp = client_no_auth.get("/")
+ assert "example.com" in resp.text
+ assert "done" in resp.text
+
+ def test_many_urls_truncated(self, client_no_auth):
+ from deploy.web_ui import _jobs
+
+ _jobs.append(
+ {
+ "id": 1,
+ "urls": [f"https://example.com/{i}" for i in range(10)],
+ "status": "running",
+ "started": "2026-01-01 00:00 UTC",
+ "output": "",
+ }
+ )
+ resp = client_no_auth.get("/")
+ assert "+7 more" in resp.text
+
+
+# ── HTML template rendering ──────────────────────────────────────────
+
+
+class TestTemplates:
+ """Verify HTML templates can be .format()-ed without KeyError."""
+
+ def test_login_html_renders(self):
+ from deploy.web_ui import LOGIN_HTML
+
+ result = LOGIN_HTML.format(error="")
+ assert "Auto Archiver" in result
+
+ def test_login_html_renders_with_error(self):
+ from deploy.web_ui import LOGIN_HTML
+
+ result = LOGIN_HTML.format(error='
Nope
')
+ assert "Nope" in result
+
+ def test_main_html_renders(self):
+ from deploy.web_ui import MAIN_HTML
+
+ result = MAIN_HTML.format(logout="", jobs_html="")
+ assert "Auto Archiver" in result
+
+ def test_results_html_renders(self):
+ from deploy.web_ui import RESULTS_HTML
+
+ result = RESULTS_HTML.format(file_list="empty
")
+ assert "Archived Files" in result
diff --git a/deploy/web_ui.py b/deploy/web_ui.py
new file mode 100644
index 0000000..9ffcef3
--- /dev/null
+++ b/deploy/web_ui.py
@@ -0,0 +1,269 @@
+#!/usr/bin/env python3
+"""
+Minimal web UI for auto-archiver cloud deployments.
+
+Provides:
+ - GET / → HTML form to submit URLs for archiving
+ - POST /archive → Runs auto-archiver on submitted URLs
+ - GET /results → Lists archived files available for download
+ - GET /files/{path} → Serves archived files
+ - GET /status → Health check
+"""
+
+import asyncio
+import html
+import os
+import secrets
+from datetime import datetime, timezone
+from pathlib import Path
+
+from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
+from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
+
+AUTH_PASSWORD = os.environ.get("AUTH_PASSWORD", "")
+ARCHIVE_DIR = Path("/app/local_archive")
+CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
+COOKIE_NAME = "aa_session"
+
+# In-memory session tokens (reset on restart, which is fine for this use case)
+_valid_sessions: set[str] = set()
+# In-memory job log
+_jobs: list[dict] = []
+
+app = FastAPI(title="Auto Archiver", docs_url=None, redoc_url=None)
+
+
+# ── Auth helpers ──────────────────────────────────────────────────────
+
+
+def _check_auth(request: Request):
+ """Dependency: redirect to /login if auth is enabled and session is missing."""
+ if not AUTH_PASSWORD:
+ return # auth disabled
+ token = request.cookies.get(COOKIE_NAME, "")
+ if token not in _valid_sessions:
+ raise HTTPException(
+ status_code=status.HTTP_307_TEMPORARY_REDIRECT,
+ headers={"Location": "/login"},
+ )
+
+
+# ── Pages ─────────────────────────────────────────────────────────────
+
+LOGIN_HTML = """
+
+Auto Archiver – Login
+
+🔐 Auto Archiver
+"""
+
+
+MAIN_HTML = """
+
+Auto Archiver
+
+
+ 📦 Auto Archiver
+ Browse files
+ {logout}
+
+
+{jobs_html}
+"""
+
+
+RESULTS_HTML = """
+
+Auto Archiver – Files
+
+📁 Archived Files
+← Back
+{file_list}
+"""
+
+
+# ── Routes ────────────────────────────────────────────────────────────
+
+
+@app.get("/login", response_class=HTMLResponse)
+async def login_page():
+ if not AUTH_PASSWORD:
+ return RedirectResponse("/", status_code=302)
+ return LOGIN_HTML.format(error="")
+
+
+@app.post("/login")
+async def login_submit(password: str = Form(...)):
+ if not AUTH_PASSWORD:
+ return RedirectResponse("/", status_code=302)
+ if password != AUTH_PASSWORD:
+ return HTMLResponse(
+ LOGIN_HTML.format(error='Wrong password.
'),
+ status_code=401,
+ )
+ token = secrets.token_urlsafe(32)
+ _valid_sessions.add(token)
+ resp = RedirectResponse("/", status_code=302)
+ resp.set_cookie(COOKIE_NAME, token, httponly=True, samesite="lax", max_age=86400 * 30)
+ return resp
+
+
+@app.get("/", response_class=HTMLResponse)
+async def index(request: Request, _=Depends(_check_auth)):
+ logout = 'Logout ' if AUTH_PASSWORD else ""
+ jobs_html = _render_jobs()
+ return MAIN_HTML.format(logout=logout, jobs_html=jobs_html)
+
+
+@app.post("/archive")
+async def archive(request: Request, urls: str = Form(...), _=Depends(_check_auth)):
+ url_list = [u.strip() for u in urls.strip().splitlines() if u.strip()]
+ if not url_list:
+ raise HTTPException(400, "No URLs provided")
+
+ job = {
+ "id": len(_jobs) + 1,
+ "urls": url_list,
+ "status": "running",
+ "started": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
+ "output": "",
+ }
+ _jobs.insert(0, job)
+
+ # Run in background so the user sees the page immediately
+ asyncio.create_task(_run_archive(job))
+ return RedirectResponse("/", status_code=303)
+
+
+@app.get("/results", response_class=HTMLResponse)
+async def results(request: Request, _=Depends(_check_auth)):
+ if not ARCHIVE_DIR.exists():
+ return RESULTS_HTML.format(file_list="No archived files yet.
")
+
+ files = sorted(ARCHIVE_DIR.rglob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
+ files = [f for f in files if f.is_file()]
+
+ if not files:
+ return RESULTS_HTML.format(file_list="No archived files yet.
")
+
+ items = []
+ for f in files[:200]: # cap listing
+ rel = f.relative_to(ARCHIVE_DIR)
+ items.append(f'{html.escape(str(rel))} ')
+
+ return RESULTS_HTML.format(file_list="")
+
+
+@app.get("/files/{path:path}")
+async def serve_file(path: str, request: Request, _=Depends(_check_auth)):
+ full = ARCHIVE_DIR / path
+ if not full.exists() or not full.is_file():
+ raise HTTPException(404, "File not found")
+ # Security: ensure the resolved path is within ARCHIVE_DIR
+ try:
+ full.resolve().relative_to(ARCHIVE_DIR.resolve())
+ except ValueError:
+ raise HTTPException(403, "Forbidden")
+ return FileResponse(full)
+
+
+@app.get("/status")
+async def health():
+ return {"status": "ok"}
+
+
+@app.get("/logout")
+async def logout(request: Request):
+ token = request.cookies.get(COOKIE_NAME, "")
+ _valid_sessions.discard(token)
+ resp = RedirectResponse("/login", status_code=302)
+ resp.delete_cookie(COOKIE_NAME)
+ return resp
+
+
+# ── Helpers ───────────────────────────────────────────────────────────
+
+
+async def _run_archive(job: dict):
+ """Run auto-archiver as a subprocess for the given URLs."""
+ cmd = [
+ "python3",
+ "-m",
+ "auto_archiver",
+ "--config",
+ str(CONFIG_PATH),
+ ] + job["urls"]
+
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ cwd="/app",
+ )
+ stdout, _ = await proc.communicate()
+ job["output"] = stdout.decode(errors="replace")[-5000:] # keep last 5k chars
+ job["status"] = "done" if proc.returncode == 0 else "failed"
+ except Exception as e:
+ job["output"] = str(e)
+ job["status"] = "failed"
+
+
+def _render_jobs() -> str:
+ if not _jobs:
+ return 'No archiving jobs yet. Submit URLs above to get started.
'
+
+ rows = []
+ for j in _jobs[:50]:
+ urls_str = html.escape(", ".join(j["urls"][:3]))
+ if len(j["urls"]) > 3:
+ urls_str += f" (+{len(j['urls']) - 3} more)"
+ status_cls = j["status"]
+ rows.append(
+ f"{j['id']} "
+ f"{urls_str} "
+ f'{j["status"]} '
+ f"{j['started']} "
+ )
+
+ return (
+ "Recent Jobs "
+ "# URLs Status Started "
+ "" + "\n".join(rows) + "
"
+ )
diff --git a/railway.json b/railway.json
new file mode 100644
index 0000000..488d0e3
--- /dev/null
+++ b/railway.json
@@ -0,0 +1,99 @@
+{
+ "$schema": "https://railway.app/railway.schema.json",
+ "build": {
+ "dockerfilePath": "deploy/Dockerfile"
+ },
+ "deploy": {
+ "startCommand": "python3 -m deploy.start",
+ "healthcheckPath": "/status",
+ "healthcheckTimeout": 30,
+ "restartPolicyType": "ON_FAILURE",
+ "restartPolicyMaxRetries": 5
+ },
+ "variables": {
+ "AUTH_PASSWORD": {
+ "description": "Password to access your archiver web interface",
+ "required": true
+ },
+ "GSHEET_URL": {
+ "description": "Google Sheet URL to monitor for new URLs (leave empty to disable)",
+ "required": false,
+ "default": ""
+ },
+ "GOOGLE_SERVICE_ACCOUNT_JSON": {
+ "description": "Full JSON contents of your Google service account key (required for Sheets)",
+ "required": false,
+ "default": ""
+ },
+ "POLL_INTERVAL": {
+ "description": "Seconds between Google Sheet checks (min 60)",
+ "required": false,
+ "default": "300"
+ },
+ "S3_BUCKET": {
+ "description": "S3 bucket name for storage (leave empty for local-only)",
+ "required": false,
+ "default": ""
+ },
+ "S3_KEY": {
+ "description": "S3 access key ID",
+ "required": false,
+ "default": ""
+ },
+ "S3_SECRET": {
+ "description": "S3 secret access key",
+ "required": false,
+ "default": ""
+ },
+ "S3_REGION": {
+ "description": "S3 region (e.g. us-east-1, nyc3 for DO Spaces)",
+ "required": false,
+ "default": "us-east-1"
+ },
+ "S3_ENDPOINT": {
+ "description": "S3 endpoint URL template",
+ "required": false,
+ "default": "https://s3.{region}.amazonaws.com"
+ },
+ "S3_CDN_URL": {
+ "description": "Public CDN URL template for archived files",
+ "required": false,
+ "default": "https://{bucket}.s3.{region}.amazonaws.com/{key}"
+ },
+ "TELEGRAM_API_ID": {
+ "description": "Telegram API ID from https://my.telegram.org",
+ "required": false,
+ "default": ""
+ },
+ "TELEGRAM_API_HASH": {
+ "description": "Telegram API hash from https://my.telegram.org",
+ "required": false,
+ "default": ""
+ },
+ "TELEGRAM_BOT_TOKEN": {
+ "description": "Telegram bot token from @BotFather",
+ "required": false,
+ "default": ""
+ },
+ "ENABLE_SCREENSHOTS": {
+ "description": "Set to true to capture full-page screenshots",
+ "required": false,
+ "default": "false"
+ },
+ "ENABLE_THUMBNAILS": {
+ "description": "Set to true to generate video thumbnails",
+ "required": false,
+ "default": "false"
+ },
+ "ENABLE_CSV_DB": {
+ "description": "Set to true to save a CSV log of archived items",
+ "required": false,
+ "default": "false"
+ },
+ "LOG_LEVEL": {
+ "description": "Logging level: DEBUG, INFO, WARNING, ERROR",
+ "required": false,
+ "default": "INFO"
+ }
+ }
+}