mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-08 03:18:28 +03:00
Compare commits
22 Commits
feat-one-c
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e651bb849 | ||
|
|
6581bbe139 | ||
|
|
e633be1721 | ||
|
|
bc06de8e5c | ||
|
|
20fddce3a3 | ||
|
|
6efa439cdb | ||
|
|
ef77d1fc86 | ||
|
|
a57a5ee005 | ||
|
|
2582f567ac | ||
|
|
4e5c1a6218 | ||
|
|
12d9c469b2 | ||
|
|
792838f1a1 | ||
|
|
17c4ae15eb | ||
|
|
a08af07348 | ||
|
|
e54077f4e8 | ||
|
|
319c0528da | ||
|
|
ae0e53e434 | ||
|
|
82fc786d56 | ||
|
|
aa65299844 | ||
|
|
1b69ec1f00 | ||
|
|
304e5d40b1 | ||
|
|
63cfe34e23 |
29
.github/workflows/tests-deploy.yaml
vendored
29
.github/workflows/tests-deploy.yaml
vendored
@@ -1,29 +0,0 @@
|
||||
name: Deploy Tests
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
paths:
|
||||
- deploy/**
|
||||
pull_request:
|
||||
paths:
|
||||
- deploy/**
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
- name: Set up Python 3.12
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install pytest fastapi httpx python-multipart pyyaml
|
||||
|
||||
- name: Run Deploy Tests
|
||||
working-directory: deploy
|
||||
run: python -m pytest tests/ -v
|
||||
@@ -1,18 +1,17 @@
|
||||
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
|
||||
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
|
||||
|
||||
ENV RUNNING_IN_DOCKER=1 \
|
||||
LANG=C.UTF-8 \
|
||||
LC_ALL=C.UTF-8 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONFAULTHANDLER=1 \
|
||||
PATH="/root/.local/bin:$PATH"
|
||||
PYTHONFAULTHANDLER=1
|
||||
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
# Installing system dependencies
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
|
||||
# Poetry and runtime
|
||||
FROM base AS runtime
|
||||
|
||||
35
README.md
35
README.md
@@ -22,40 +22,7 @@ Auto Archiver is a Python tool to automatically archive content on the web in a
|
||||
Read the [article about Auto Archiver on bellingcat.com](https://www.bellingcat.com/resources/2022/09/22/preserve-vital-online-content-with-bellingcats-auto-archiver-tool/).
|
||||
|
||||
|
||||
## One-Click Cloud Deploy
|
||||
|
||||
Deploy your own Auto Archiver instance to the cloud — no coding required:
|
||||
|
||||
[](https://railway.app/new/template?template=https://github.com/bellingcat/auto-archiver&envs=AUTH_PASSWORD,GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&optionalEnvs=GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&AUTH_PASSWORDDesc=Password+to+access+your+archiver+web+interface&GSHEET_URLDesc=Google+Sheet+URL+to+monitor+for+new+URLs+(leave+empty+to+disable)&POLL_INTERVALDesc=Seconds+between+Google+Sheet+checks+(min+60)&POLL_INTERVALDefault=300&S3_BUCKETDesc=S3+bucket+name+for+storage+(leave+empty+for+local+only)&S3_REGIONDefault=us-east-1&LOG_LEVELDefault=INFO)
|
||||
|
||||
**What you get:** A web interface where you can paste URLs and archive them instantly. Optionally connect a Google Sheet for automated monitoring, S3 for cloud storage, and Telegram for archiving channels.
|
||||
|
||||
**Only required setting:** `AUTH_PASSWORD` — everything else is optional and can be configured later via the Railway dashboard.
|
||||
|
||||
<details>
|
||||
<summary>📋 Environment variables reference</summary>
|
||||
|
||||
| Variable | Required | Description |
|
||||
|----------|----------|-------------|
|
||||
| `AUTH_PASSWORD` | **Yes** | Password to access the web interface |
|
||||
| `GSHEET_URL` | No | Google Sheet URL to monitor for new URLs [use this template](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?gid=0#gid=0) |
|
||||
| `GOOGLE_SERVICE_ACCOUNT_JSON` | No | Google service account JSON (required with Sheets) [follow these instructions](https://auto-archiver.readthedocs.io/en/v1.0.1/how_to/gsheets_setup.html) |
|
||||
| `POLL_INTERVAL` | No | Seconds between Sheet checks (default: 300) |
|
||||
| `S3_BUCKET` | No | S3 bucket name for archived content, ideal for cloud hosting your archives but not mandatory, any S3-compatible storage works |
|
||||
| `S3_KEY` / `S3_SECRET` | No | S3 credentials |
|
||||
| `S3_REGION` | No | S3 region (default: us-east-1) |
|
||||
| `S3_ENDPOINT` | No | S3 endpoint URL |
|
||||
| `TELEGRAM_API_ID` / `TELEGRAM_API_HASH` | No | Telegram API credentials |
|
||||
| `TELEGRAM_BOT_TOKEN` | No | Telegram bot token |
|
||||
| `ENABLE_SCREENSHOTS` | No | Set to `true` for full-page screenshots |
|
||||
| `ENABLE_THUMBNAILS` | No | Set to `true` for video thumbnails |
|
||||
| `ENABLE_CSV_DB` | No | Set to `true` for CSV logging |
|
||||
| `LOG_LEVEL` | No | DEBUG, INFO, WARNING, ERROR (default: INFO) |
|
||||
|
||||
</details>
|
||||
|
||||
|
||||
## Traditional Installation
|
||||
## Installation
|
||||
|
||||
View the [Installation Guide](https://auto-archiver.readthedocs.io/en/latest/installation/installation.html) for full instructions
|
||||
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
# ── Cloud Deploy ──────────────────────────────────────────────────────
|
||||
# Thin web UI + config generator layer on top of the published
|
||||
# auto-archiver Docker image. Used by the Railway one-click deploy.
|
||||
#
|
||||
# Build:
|
||||
# docker build -f deploy/Dockerfile -t auto-archiver-deploy .
|
||||
#
|
||||
# Run:
|
||||
# docker run -p 8080:8080 -e PORT=8080 -e AUTH_PASSWORD=secret auto-archiver-deploy
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
FROM bellingcat/auto-archiver:latest
|
||||
|
||||
USER root
|
||||
|
||||
# Install the lightweight web layer dependencies
|
||||
RUN pip install --no-cache-dir fastapi uvicorn[standard] python-multipart pyyaml
|
||||
|
||||
# Copy deploy scripts into the image
|
||||
COPY deploy/ /app/deploy/
|
||||
|
||||
# Ensure writable dirs exist
|
||||
RUN mkdir -p /app/local_archive /app/secrets && \
|
||||
chown -R 1000:1000 /app/local_archive /app/secrets /app/deploy
|
||||
|
||||
USER 1000
|
||||
|
||||
# Railway sets PORT; default to 8080
|
||||
ENV PORT=8080
|
||||
|
||||
EXPOSE ${PORT}
|
||||
|
||||
# Override the CLI entrypoint with the web server
|
||||
ENTRYPOINT ["python3", "-m", "deploy.start"]
|
||||
@@ -1 +0,0 @@
|
||||
# Cloud deployment layer for auto-archiver
|
||||
@@ -1,163 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generates orchestration.yaml from environment variables.
|
||||
|
||||
This script bridges Railway's env-var-based configuration with
|
||||
auto-archiver's YAML-based configuration system. It runs at container
|
||||
startup before the web UI server starts.
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
|
||||
SECRETS_DIR = Path("/app/secrets")
|
||||
|
||||
|
||||
def build_config() -> dict:
|
||||
"""Build an orchestration config dict from environment variables."""
|
||||
|
||||
# -- Base config: always present ------------------------------------
|
||||
config = {
|
||||
"steps": {
|
||||
"feeders": ["cli_feeder"],
|
||||
"extractors": ["generic_extractor"],
|
||||
"enrichers": ["hash_enricher"],
|
||||
"databases": ["console_db"],
|
||||
"storages": ["local_storage"],
|
||||
"formatters": ["html_formatter"],
|
||||
},
|
||||
"logging": {
|
||||
"level": os.environ.get("LOG_LEVEL", "INFO"),
|
||||
},
|
||||
"local_storage": {
|
||||
"save_to": "/app/local_archive",
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
},
|
||||
"generic_extractor": {
|
||||
"subtitles": os.environ.get("SUBTITLES", "false").lower() == "true",
|
||||
"comments": False,
|
||||
"livestreams": False,
|
||||
"live_from_start": False,
|
||||
"end_means_success": True,
|
||||
"allow_playlist": False,
|
||||
},
|
||||
"hash_enricher": {
|
||||
"algorithm": "SHA-256",
|
||||
},
|
||||
"html_formatter": {
|
||||
"detect_thumbnails": True,
|
||||
},
|
||||
"authentication": {},
|
||||
}
|
||||
|
||||
# -- Google Sheets feeder (optional) --------------------------------
|
||||
gsheet_url = os.environ.get("GSHEET_URL", "")
|
||||
if gsheet_url:
|
||||
config["steps"]["feeders"].append("gsheet_feeder")
|
||||
config["steps"]["databases"].append("gsheet_db")
|
||||
config["gsheet_feeder"] = {
|
||||
"sheet": gsheet_url,
|
||||
"header": 1,
|
||||
"service_account": str(SECRETS_DIR / "service_account.json"),
|
||||
"use_sheet_names_in_stored_paths": False,
|
||||
"columns": {
|
||||
"url": "link",
|
||||
"status": "archive status",
|
||||
"folder": "destination folder",
|
||||
"archive": "archive location",
|
||||
"date": "archive date",
|
||||
"thumbnail": "thumbnail",
|
||||
"timestamp": "upload timestamp",
|
||||
"title": "upload title",
|
||||
"text": "textual content",
|
||||
"screenshot": "screenshot",
|
||||
"hash": "hash",
|
||||
"pdq_hash": "perceptual hashes",
|
||||
},
|
||||
}
|
||||
|
||||
# -- Google service account JSON (optional) -------------------------
|
||||
sa_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")
|
||||
if sa_json:
|
||||
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
sa_path = SECRETS_DIR / "service_account.json"
|
||||
sa_path.write_text(sa_json)
|
||||
print(f"[deploy] Wrote Google service account to {sa_path}")
|
||||
|
||||
# -- S3 storage (optional) ------------------------------------------
|
||||
s3_bucket = os.environ.get("S3_BUCKET", "")
|
||||
if s3_bucket:
|
||||
config["steps"]["storages"].append("s3_storage")
|
||||
config["s3_storage"] = {
|
||||
"bucket": s3_bucket,
|
||||
"region": os.environ.get("S3_REGION", "us-east-1"),
|
||||
"key": os.environ.get("S3_KEY", ""),
|
||||
"secret": os.environ.get("S3_SECRET", ""),
|
||||
"endpoint_url": os.environ.get("S3_ENDPOINT", "https://s3.{region}.amazonaws.com"),
|
||||
"cdn_url": os.environ.get(
|
||||
"S3_CDN_URL",
|
||||
"https://{bucket}.s3.{region}.amazonaws.com/{key}",
|
||||
),
|
||||
"private": os.environ.get("S3_PRIVATE", "false").lower() == "true",
|
||||
"random_no_duplicate": True,
|
||||
"key_path": "random",
|
||||
}
|
||||
|
||||
# -- Telegram extractor (optional) ----------------------------------
|
||||
tg_api_id = os.environ.get("TELEGRAM_API_ID", "")
|
||||
tg_api_hash = os.environ.get("TELEGRAM_API_HASH", "")
|
||||
if tg_api_id and tg_api_hash:
|
||||
config["steps"]["extractors"].append("telegram_extractor")
|
||||
config["telegram_extractor"] = {
|
||||
"api_id": tg_api_id,
|
||||
"api_hash": tg_api_hash,
|
||||
}
|
||||
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
if bot_token:
|
||||
config["telegram_extractor"]["bot_token"] = bot_token
|
||||
|
||||
# -- Screenshot enricher (optional) ---------------------------------
|
||||
if os.environ.get("ENABLE_SCREENSHOTS", "").lower() == "true":
|
||||
config["steps"]["enrichers"].append("screenshot_enricher")
|
||||
config["screenshot_enricher"] = {
|
||||
"width": 1280,
|
||||
"height": 7200,
|
||||
"save_to_pdf": True,
|
||||
}
|
||||
|
||||
# -- Thumbnail enricher (optional) ----------------------------------
|
||||
if os.environ.get("ENABLE_THUMBNAILS", "").lower() == "true":
|
||||
config["steps"]["enrichers"].append("thumbnail_enricher")
|
||||
config["thumbnail_enricher"] = {
|
||||
"thumbnails_per_minute": 60,
|
||||
"max_thumbnails": 16,
|
||||
}
|
||||
|
||||
# -- CSV database (optional) ----------------------------------------
|
||||
if os.environ.get("ENABLE_CSV_DB", "").lower() == "true":
|
||||
config["steps"]["databases"].append("csv_db")
|
||||
config["csv_db"] = {
|
||||
"csv_file": "/app/local_archive/db.csv",
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def main():
|
||||
config = build_config()
|
||||
|
||||
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(CONFIG_PATH, "w") as f:
|
||||
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
print(f"[deploy] Generated config at {CONFIG_PATH}")
|
||||
print(f"[deploy] Active steps: {config['steps']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,71 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Background Google Sheets poller for auto-archiver cloud deployments.
|
||||
|
||||
When GSHEET_URL is set, periodically runs auto-archiver with gsheet_feeder
|
||||
to check for new URLs in the configured spreadsheet. Runs as a daemon thread
|
||||
alongside the web UI.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
|
||||
logger = logging.getLogger("gsheet_poller")
|
||||
|
||||
CONFIG_PATH = "/app/secrets/orchestration.yaml"
|
||||
|
||||
|
||||
def _poll_once():
|
||||
"""Run auto-archiver once to process any new rows in the Google Sheet."""
|
||||
logger.info("Polling Google Sheet for new URLs...")
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["python3", "-m", "auto_archiver", "--config", CONFIG_PATH],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd="/app",
|
||||
timeout=600, # 10 minute timeout per poll
|
||||
)
|
||||
if result.returncode == 0:
|
||||
logger.info("Sheet poll completed successfully.")
|
||||
else:
|
||||
logger.warning("Sheet poll exited with code %d: %s", result.returncode, result.stderr[-500:])
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error("Sheet poll timed out after 600s")
|
||||
except Exception:
|
||||
logger.exception("Sheet poll failed")
|
||||
|
||||
|
||||
def _poll_loop(interval: int):
|
||||
"""Run the poll loop at the given interval (seconds)."""
|
||||
logger.info("Google Sheets poller started (interval=%ds)", interval)
|
||||
while True:
|
||||
_poll_once()
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def start_poller():
|
||||
"""
|
||||
Start the Google Sheets poller as a daemon thread if GSHEET_URL is set.
|
||||
Call this once at application startup.
|
||||
"""
|
||||
gsheet_url = os.environ.get("GSHEET_URL", "")
|
||||
if not gsheet_url:
|
||||
logger.info("GSHEET_URL not set – Sheet poller disabled.")
|
||||
return
|
||||
|
||||
interval = int(os.environ.get("POLL_INTERVAL", "300"))
|
||||
if interval < 60:
|
||||
interval = 60 # minimum 1 minute
|
||||
|
||||
thread = threading.Thread(
|
||||
target=_poll_loop,
|
||||
args=(interval,),
|
||||
daemon=True,
|
||||
name="gsheet-poller",
|
||||
)
|
||||
thread.start()
|
||||
logger.info("Google Sheets poller thread started.")
|
||||
@@ -1,2 +0,0 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
@@ -1,37 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Startup entrypoint for cloud deployments.
|
||||
|
||||
1. Generates orchestration.yaml from environment variables
|
||||
2. Starts the Google Sheets poller (if GSHEET_URL is set)
|
||||
3. Starts the FastAPI web UI
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||||
)
|
||||
|
||||
# Generate config from env vars
|
||||
from deploy.generate_config import main as generate_config # noqa: E402
|
||||
|
||||
generate_config()
|
||||
|
||||
# Start gsheet poller (no-op if GSHEET_URL not set)
|
||||
from deploy.gsheet_poller import start_poller # noqa: E402
|
||||
|
||||
start_poller()
|
||||
|
||||
# Start web server
|
||||
import uvicorn # noqa: E402
|
||||
|
||||
port = int(os.environ.get("PORT", "8080"))
|
||||
uvicorn.run(
|
||||
"deploy.web_ui:app",
|
||||
host="0.0.0.0",
|
||||
port=port,
|
||||
log_level="info",
|
||||
)
|
||||
@@ -1,354 +0,0 @@
|
||||
"""Tests for deploy/generate_config.py – config generation from env vars."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import yaml
|
||||
|
||||
from deploy.generate_config import build_config, main
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _env(**overrides):
|
||||
"""Return a clean env dict with only the given overrides (no leak from host)."""
|
||||
# Clear all deploy-relevant env vars, then apply overrides
|
||||
deploy_vars = [
|
||||
"LOG_LEVEL",
|
||||
"SUBTITLES",
|
||||
"GSHEET_URL",
|
||||
"GOOGLE_SERVICE_ACCOUNT_JSON",
|
||||
"S3_BUCKET",
|
||||
"S3_KEY",
|
||||
"S3_SECRET",
|
||||
"S3_REGION",
|
||||
"S3_ENDPOINT",
|
||||
"S3_CDN_URL",
|
||||
"S3_PRIVATE",
|
||||
"TELEGRAM_API_ID",
|
||||
"TELEGRAM_API_HASH",
|
||||
"TELEGRAM_BOT_TOKEN",
|
||||
"ENABLE_SCREENSHOTS",
|
||||
"ENABLE_THUMBNAILS",
|
||||
"ENABLE_CSV_DB",
|
||||
]
|
||||
clean = {k: v for k, v in os.environ.items() if k not in deploy_vars}
|
||||
clean.update(overrides)
|
||||
return clean
|
||||
|
||||
|
||||
# ── Base config (no optional env vars) ────────────────────────────────
|
||||
|
||||
|
||||
class TestBaseConfig:
|
||||
"""When no optional env vars are set, build_config returns a minimal working config."""
|
||||
|
||||
def test_base_steps(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
steps = cfg["steps"]
|
||||
assert steps["feeders"] == ["cli_feeder"]
|
||||
assert steps["extractors"] == ["generic_extractor"]
|
||||
assert steps["enrichers"] == ["hash_enricher"]
|
||||
assert steps["databases"] == ["console_db"]
|
||||
assert steps["storages"] == ["local_storage"]
|
||||
assert steps["formatters"] == ["html_formatter"]
|
||||
|
||||
def test_base_has_required_module_configs(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "local_storage" in cfg
|
||||
assert "generic_extractor" in cfg
|
||||
assert "hash_enricher" in cfg
|
||||
assert "html_formatter" in cfg
|
||||
|
||||
def test_default_log_level_is_info(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["logging"]["level"] == "INFO"
|
||||
|
||||
def test_custom_log_level(self):
|
||||
with patch.dict(os.environ, _env(LOG_LEVEL="DEBUG"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["logging"]["level"] == "DEBUG"
|
||||
|
||||
def test_authentication_present_and_empty(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["authentication"] == {}
|
||||
|
||||
def test_local_storage_defaults(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
ls = cfg["local_storage"]
|
||||
assert ls["save_to"] == "/app/local_archive"
|
||||
assert ls["path_generator"] == "flat"
|
||||
assert ls["filename_generator"] == "static"
|
||||
|
||||
def test_subtitles_default_false(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is False
|
||||
|
||||
def test_subtitles_enabled(self):
|
||||
with patch.dict(os.environ, _env(SUBTITLES="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is True
|
||||
|
||||
def test_subtitles_case_insensitive(self):
|
||||
with patch.dict(os.environ, _env(SUBTITLES="True"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["generic_extractor"]["subtitles"] is True
|
||||
|
||||
def test_no_optional_modules_present(self):
|
||||
"""Ensure optional modules don't appear when their env vars are absent."""
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "gsheet_feeder" not in cfg
|
||||
assert "s3_storage" not in cfg
|
||||
assert "telegram_extractor" not in cfg
|
||||
assert "screenshot_enricher" not in cfg
|
||||
assert "thumbnail_enricher" not in cfg
|
||||
assert "csv_db" not in cfg
|
||||
|
||||
def test_config_is_valid_yaml(self):
|
||||
"""The output dict should round-trip through YAML cleanly."""
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
dumped = yaml.dump(cfg)
|
||||
reloaded = yaml.safe_load(dumped)
|
||||
assert reloaded == cfg
|
||||
|
||||
|
||||
# ── Google Sheets ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGSheetConfig:
|
||||
def test_gsheet_adds_feeder_and_db(self):
|
||||
with patch.dict(os.environ, _env(GSHEET_URL="https://docs.google.com/spreadsheets/d/abc"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "gsheet_feeder" in cfg["steps"]["feeders"]
|
||||
assert "gsheet_db" in cfg["steps"]["databases"]
|
||||
|
||||
def test_gsheet_feeder_config(self):
|
||||
url = "https://docs.google.com/spreadsheets/d/abc123"
|
||||
with patch.dict(os.environ, _env(GSHEET_URL=url), clear=True):
|
||||
cfg = build_config()
|
||||
gf = cfg["gsheet_feeder"]
|
||||
assert gf["sheet"] == url
|
||||
assert gf["header"] == 1
|
||||
assert "service_account" in gf
|
||||
assert gf["columns"]["url"] == "link"
|
||||
assert gf["columns"]["status"] == "archive status"
|
||||
|
||||
def test_gsheet_preserves_cli_feeder(self):
|
||||
"""cli_feeder should still be present even when gsheet is added."""
|
||||
with patch.dict(os.environ, _env(GSHEET_URL="https://example.com/sheet"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "cli_feeder" in cfg["steps"]["feeders"]
|
||||
|
||||
def test_service_account_json_written(self, tmp_path):
|
||||
"""When GOOGLE_SERVICE_ACCOUNT_JSON is set, it writes the file."""
|
||||
sa_data = json.dumps({"type": "service_account", "project_id": "test"})
|
||||
secrets_dir = tmp_path / "secrets"
|
||||
with (
|
||||
patch.dict(os.environ, _env(GOOGLE_SERVICE_ACCOUNT_JSON=sa_data), clear=True),
|
||||
patch("deploy.generate_config.SECRETS_DIR", secrets_dir),
|
||||
):
|
||||
build_config()
|
||||
sa_path = secrets_dir / "service_account.json"
|
||||
assert sa_path.exists()
|
||||
assert json.loads(sa_path.read_text())["project_id"] == "test"
|
||||
|
||||
|
||||
# ── S3 storage ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestS3Config:
|
||||
def test_s3_adds_storage(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="my-bucket"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "s3_storage" in cfg["steps"]["storages"]
|
||||
assert "local_storage" in cfg["steps"]["storages"] # local still there
|
||||
|
||||
def test_s3_config_values(self):
|
||||
env = _env(
|
||||
S3_BUCKET="my-bucket",
|
||||
S3_KEY="AKID",
|
||||
S3_SECRET="shhh",
|
||||
S3_REGION="eu-west-1",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
s3 = cfg["s3_storage"]
|
||||
assert s3["bucket"] == "my-bucket"
|
||||
assert s3["key"] == "AKID"
|
||||
assert s3["secret"] == "shhh"
|
||||
assert s3["region"] == "eu-west-1"
|
||||
assert s3["private"] is False
|
||||
assert s3["random_no_duplicate"] is True
|
||||
|
||||
def test_s3_defaults(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b"), clear=True):
|
||||
cfg = build_config()
|
||||
s3 = cfg["s3_storage"]
|
||||
assert s3["region"] == "us-east-1"
|
||||
assert "{region}" in s3["endpoint_url"]
|
||||
|
||||
def test_s3_private_flag(self):
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_PRIVATE="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["s3_storage"]["private"] is True
|
||||
|
||||
def test_s3_custom_endpoint(self):
|
||||
endpoint = "https://nyc3.digitaloceanspaces.com"
|
||||
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_ENDPOINT=endpoint), clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["s3_storage"]["endpoint_url"] == endpoint
|
||||
|
||||
|
||||
# ── Telegram ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestTelegramConfig:
|
||||
def test_telegram_added_when_both_set(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" in cfg["steps"]["extractors"]
|
||||
assert cfg["telegram_extractor"]["api_id"] == "12345"
|
||||
assert cfg["telegram_extractor"]["api_hash"] == "abc"
|
||||
|
||||
def test_telegram_not_added_if_only_id(self):
|
||||
with patch.dict(os.environ, _env(TELEGRAM_API_ID="12345"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" not in cfg["steps"]["extractors"]
|
||||
|
||||
def test_telegram_not_added_if_only_hash(self):
|
||||
with patch.dict(os.environ, _env(TELEGRAM_API_HASH="abc"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "telegram_extractor" not in cfg["steps"]["extractors"]
|
||||
|
||||
def test_telegram_bot_token_optional(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc", TELEGRAM_BOT_TOKEN="bot:tok")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert cfg["telegram_extractor"]["bot_token"] == "bot:tok"
|
||||
|
||||
def test_telegram_no_bot_token(self):
|
||||
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
assert "bot_token" not in cfg["telegram_extractor"]
|
||||
|
||||
|
||||
# ── Optional enrichers / databases ────────────────────────────────────
|
||||
|
||||
|
||||
class TestOptionalModules:
|
||||
def test_screenshots_disabled_by_default(self):
|
||||
with patch.dict(os.environ, _env(), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" not in cfg["steps"]["enrichers"]
|
||||
|
||||
def test_screenshots_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
|
||||
assert cfg["screenshot_enricher"]["width"] == 1280
|
||||
|
||||
def test_thumbnails_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_THUMBNAILS="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "thumbnail_enricher" in cfg["steps"]["enrichers"]
|
||||
assert cfg["thumbnail_enricher"]["max_thumbnails"] == 16
|
||||
|
||||
def test_csv_db_enabled(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_CSV_DB="true"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "csv_db" in cfg["steps"]["databases"]
|
||||
assert cfg["csv_db"]["csv_file"] == "/app/local_archive/db.csv"
|
||||
|
||||
def test_case_insensitive_boolean(self):
|
||||
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="TRUE"), clear=True):
|
||||
cfg = build_config()
|
||||
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
|
||||
|
||||
|
||||
# ── Combined / full config ────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestCombinedConfig:
|
||||
def test_all_optional_modules_together(self):
|
||||
"""Enable everything at once and verify no conflicts."""
|
||||
env = _env(
|
||||
GSHEET_URL="https://example.com/sheet",
|
||||
S3_BUCKET="bucket",
|
||||
S3_KEY="key",
|
||||
S3_SECRET="secret",
|
||||
TELEGRAM_API_ID="123",
|
||||
TELEGRAM_API_HASH="abc",
|
||||
TELEGRAM_BOT_TOKEN="tok",
|
||||
ENABLE_SCREENSHOTS="true",
|
||||
ENABLE_THUMBNAILS="true",
|
||||
ENABLE_CSV_DB="true",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
|
||||
steps = cfg["steps"]
|
||||
assert "gsheet_feeder" in steps["feeders"]
|
||||
assert "telegram_extractor" in steps["extractors"]
|
||||
assert "screenshot_enricher" in steps["enrichers"]
|
||||
assert "thumbnail_enricher" in steps["enrichers"]
|
||||
assert "csv_db" in steps["databases"]
|
||||
assert "gsheet_db" in steps["databases"]
|
||||
assert "s3_storage" in steps["storages"]
|
||||
assert "local_storage" in steps["storages"]
|
||||
|
||||
# All module configs present
|
||||
for key in [
|
||||
"gsheet_feeder",
|
||||
"s3_storage",
|
||||
"telegram_extractor",
|
||||
"screenshot_enricher",
|
||||
"thumbnail_enricher",
|
||||
"csv_db",
|
||||
]:
|
||||
assert key in cfg, f"{key} config missing"
|
||||
|
||||
def test_full_config_valid_yaml(self):
|
||||
env = _env(
|
||||
GSHEET_URL="https://example.com/sheet",
|
||||
S3_BUCKET="bucket",
|
||||
TELEGRAM_API_ID="123",
|
||||
TELEGRAM_API_HASH="abc",
|
||||
ENABLE_SCREENSHOTS="true",
|
||||
ENABLE_CSV_DB="true",
|
||||
)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
cfg = build_config()
|
||||
dumped = yaml.dump(cfg)
|
||||
reloaded = yaml.safe_load(dumped)
|
||||
assert reloaded == cfg
|
||||
|
||||
|
||||
# ── main() writes file ───────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMainFunction:
|
||||
def test_main_writes_config_file(self, tmp_path):
|
||||
config_path = tmp_path / "orchestration.yaml"
|
||||
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
|
||||
main()
|
||||
assert config_path.exists()
|
||||
cfg = yaml.safe_load(config_path.read_text())
|
||||
assert cfg["steps"]["feeders"] == ["cli_feeder"]
|
||||
|
||||
def test_main_creates_parent_dirs(self, tmp_path):
|
||||
config_path = tmp_path / "nested" / "dir" / "orchestration.yaml"
|
||||
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
|
||||
main()
|
||||
assert config_path.exists()
|
||||
@@ -1,124 +0,0 @@
|
||||
"""Tests for deploy/gsheet_poller.py – background Google Sheets polling."""
|
||||
|
||||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
from deploy.gsheet_poller import start_poller, _poll_once
|
||||
|
||||
|
||||
# ── start_poller ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStartPoller:
|
||||
def test_disabled_when_no_gsheet_url(self):
|
||||
"""No thread should be started when GSHEET_URL is empty."""
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": ""}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
start_poller()
|
||||
mock_thread.assert_not_called()
|
||||
|
||||
def test_disabled_when_gsheet_url_absent(self):
|
||||
env = {k: v for k, v in os.environ.items() if k != "GSHEET_URL"}
|
||||
with patch.dict(os.environ, env, clear=True), patch("deploy.gsheet_poller.threading.Thread") as mock_thread:
|
||||
start_poller()
|
||||
mock_thread.assert_not_called()
|
||||
|
||||
def test_starts_thread_when_gsheet_url_set(self):
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "https://example.com/sheet"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_instance = MagicMock()
|
||||
mock_thread.return_value = mock_instance
|
||||
start_poller()
|
||||
mock_thread.assert_called_once()
|
||||
assert mock_thread.call_args.kwargs["daemon"] is True
|
||||
assert mock_thread.call_args.kwargs["name"] == "gsheet-poller"
|
||||
mock_instance.start.assert_called_once()
|
||||
|
||||
def test_default_interval_300(self):
|
||||
env = {"GSHEET_URL": "https://example.com/sheet"}
|
||||
# Remove POLL_INTERVAL if present
|
||||
clean_env = {k: v for k, v in os.environ.items() if k != "POLL_INTERVAL"}
|
||||
clean_env.update(env)
|
||||
with (
|
||||
patch.dict(os.environ, clean_env, clear=True),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
# interval should be passed as arg to _poll_loop
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (300,)
|
||||
|
||||
def test_custom_interval(self):
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "600"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (600,)
|
||||
|
||||
def test_interval_minimum_enforced(self):
|
||||
"""Intervals below 60 should be clamped to 60."""
|
||||
with (
|
||||
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "10"}, clear=False),
|
||||
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
|
||||
):
|
||||
mock_thread.return_value = MagicMock()
|
||||
start_poller()
|
||||
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
|
||||
assert args == (60,)
|
||||
|
||||
|
||||
# ── _poll_once ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPollOnce:
|
||||
def test_calls_subprocess_with_config(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
mock_run.assert_called_once()
|
||||
cmd = mock_run.call_args[0][0]
|
||||
assert "auto_archiver" in " ".join(cmd)
|
||||
assert "--config" in cmd
|
||||
|
||||
def test_handles_nonzero_exit(self):
|
||||
"""Should not raise on non-zero exit, just log a warning."""
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=1, stderr="some error")
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_handles_timeout(self):
|
||||
"""Should not raise on timeout, just log."""
|
||||
import subprocess
|
||||
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=600)
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_handles_exception(self):
|
||||
"""Should not raise on arbitrary exceptions."""
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.side_effect = OSError("broken")
|
||||
_poll_once() # should not raise
|
||||
|
||||
def test_uses_correct_config_path(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
cmd = mock_run.call_args[0][0]
|
||||
config_idx = cmd.index("--config")
|
||||
assert cmd[config_idx + 1] == "/app/secrets/orchestration.yaml"
|
||||
|
||||
def test_timeout_set(self):
|
||||
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stderr="")
|
||||
_poll_once()
|
||||
assert mock_run.call_args[1]["timeout"] == 600
|
||||
@@ -1,310 +0,0 @@
|
||||
"""Tests for deploy/web_ui.py – FastAPI web interface."""
|
||||
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_state():
|
||||
"""Reset in-memory state between tests."""
|
||||
import deploy.web_ui as mod
|
||||
|
||||
mod._valid_sessions.clear()
|
||||
mod._jobs.clear()
|
||||
yield
|
||||
mod._valid_sessions.clear()
|
||||
mod._jobs.clear()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_no_auth():
|
||||
"""Test client with auth disabled (no AUTH_PASSWORD)."""
|
||||
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", ""):
|
||||
from deploy.web_ui import app
|
||||
|
||||
yield TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_with_auth():
|
||||
"""Test client with auth enabled."""
|
||||
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", "secret123"):
|
||||
from deploy.web_ui import app
|
||||
|
||||
yield TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
def _login(client, password="secret123"):
|
||||
"""Helper: log in and return the session cookie."""
|
||||
resp = client.post("/login", data={"password": password}, follow_redirects=False)
|
||||
return resp.cookies.get("aa_session")
|
||||
|
||||
|
||||
# ── Health check ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestHealthCheck:
|
||||
def test_status_returns_ok(self, client_no_auth):
|
||||
resp = client_no_auth.get("/status")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == {"status": "ok"}
|
||||
|
||||
def test_status_no_auth_required(self, client_with_auth):
|
||||
resp = client_with_auth.get("/status")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == {"status": "ok"}
|
||||
|
||||
|
||||
# ── Auth disabled ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestNoAuth:
|
||||
def test_index_accessible(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "Auto Archiver" in resp.text
|
||||
|
||||
def test_login_page_redirects_to_index(self, client_no_auth):
|
||||
resp = client_no_auth.get("/login", follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
assert resp.headers["location"] == "/"
|
||||
|
||||
def test_login_post_redirects_to_index(self, client_no_auth):
|
||||
resp = client_no_auth.post("/login", data={"password": "anything"}, follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
|
||||
def test_no_logout_link_shown(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert "Logout" not in resp.text
|
||||
|
||||
|
||||
# ── Auth enabled ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestAuth:
|
||||
def test_index_redirects_to_login(self, client_with_auth):
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
assert resp.headers["location"] == "/login"
|
||||
|
||||
def test_login_page_renders(self, client_with_auth):
|
||||
resp = client_with_auth.get("/login")
|
||||
assert resp.status_code == 200
|
||||
assert "Password" in resp.text
|
||||
|
||||
def test_wrong_password_returns_401(self, client_with_auth):
|
||||
resp = client_with_auth.post("/login", data={"password": "wrong"})
|
||||
assert resp.status_code == 401
|
||||
assert "Wrong password" in resp.text
|
||||
|
||||
def test_correct_password_sets_cookie(self, client_with_auth):
|
||||
resp = client_with_auth.post("/login", data={"password": "secret123"}, follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
assert "aa_session" in resp.cookies
|
||||
|
||||
def test_authenticated_access(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "Auto Archiver" in resp.text
|
||||
|
||||
def test_logout_clears_session(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/logout", follow_redirects=False)
|
||||
assert resp.status_code == 302
|
||||
# After logout, index should redirect to login again
|
||||
client_with_auth.cookies.clear()
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
def test_logout_link_shown_when_auth_enabled(self, client_with_auth):
|
||||
cookie = _login(client_with_auth)
|
||||
client_with_auth.cookies.set("aa_session", cookie)
|
||||
resp = client_with_auth.get("/")
|
||||
assert "Logout" in resp.text
|
||||
|
||||
def test_results_requires_auth(self, client_with_auth):
|
||||
resp = client_with_auth.get("/results", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
def test_invalid_session_rejected(self, client_with_auth):
|
||||
client_with_auth.cookies.set("aa_session", "bogus-token")
|
||||
resp = client_with_auth.get("/", follow_redirects=False)
|
||||
assert resp.status_code == 307
|
||||
|
||||
|
||||
# ── Archive submission ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestArchive:
|
||||
def test_archive_creates_job(self, client_no_auth):
|
||||
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
|
||||
resp = client_no_auth.post(
|
||||
"/archive",
|
||||
data={"urls": "https://example.com\nhttps://example.org"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 303
|
||||
assert resp.headers["location"] == "/"
|
||||
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
assert len(_jobs) == 1
|
||||
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
|
||||
assert _jobs[0]["status"] == "running"
|
||||
|
||||
def test_archive_empty_urls_returns_400(self, client_no_auth):
|
||||
resp = client_no_auth.post("/archive", data={"urls": " \n \n"})
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_archive_strips_whitespace(self, client_no_auth):
|
||||
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
|
||||
client_no_auth.post(
|
||||
"/archive",
|
||||
data={"urls": " https://example.com \n\n https://example.org \n"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
|
||||
|
||||
def test_archive_requires_auth(self, client_with_auth):
|
||||
resp = client_with_auth.post(
|
||||
"/archive",
|
||||
data={"urls": "https://example.com"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert resp.status_code == 307
|
||||
|
||||
|
||||
# ── Results page ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestResults:
|
||||
def test_results_empty(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "No archived files yet" in resp.text
|
||||
|
||||
def test_results_lists_files(self, client_no_auth, tmp_path):
|
||||
(tmp_path / "test.html").write_text("<html>archived</html>")
|
||||
(tmp_path / "video.mp4").write_bytes(b"\x00" * 10)
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "test.html" in resp.text
|
||||
assert "video.mp4" in resp.text
|
||||
|
||||
def test_results_nonexistent_dir(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path / "nonexistent"):
|
||||
resp = client_no_auth.get("/results")
|
||||
assert resp.status_code == 200
|
||||
assert "No archived files yet" in resp.text
|
||||
|
||||
|
||||
# ── File serving ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestFileServing:
|
||||
def test_serve_existing_file(self, client_no_auth, tmp_path):
|
||||
(tmp_path / "report.html").write_text("<html>done</html>")
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/files/report.html")
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_serve_nonexistent_file(self, client_no_auth, tmp_path):
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
|
||||
resp = client_no_auth.get("/files/nope.txt")
|
||||
assert resp.status_code == 404
|
||||
|
||||
def test_path_traversal_blocked(self, client_no_auth, tmp_path):
|
||||
# Create a file outside the archive dir
|
||||
outside = tmp_path / "outside"
|
||||
outside.mkdir()
|
||||
(outside / "secret.txt").write_text("secret")
|
||||
archive = tmp_path / "archive"
|
||||
archive.mkdir()
|
||||
# Symlink into archive pointing outside
|
||||
(archive / "escape").symlink_to(outside / "secret.txt")
|
||||
with patch("deploy.web_ui.ARCHIVE_DIR", archive):
|
||||
resp = client_no_auth.get("/files/escape")
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ── Job rendering ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestJobRendering:
|
||||
def test_no_jobs_shows_message(self, client_no_auth):
|
||||
resp = client_no_auth.get("/")
|
||||
assert "No archiving jobs yet" in resp.text
|
||||
|
||||
def test_jobs_shown_in_table(self, client_no_auth):
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
_jobs.append(
|
||||
{
|
||||
"id": 1,
|
||||
"urls": ["https://example.com"],
|
||||
"status": "done",
|
||||
"started": "2026-01-01 00:00 UTC",
|
||||
"output": "",
|
||||
}
|
||||
)
|
||||
resp = client_no_auth.get("/")
|
||||
assert "example.com" in resp.text
|
||||
assert "done" in resp.text
|
||||
|
||||
def test_many_urls_truncated(self, client_no_auth):
|
||||
from deploy.web_ui import _jobs
|
||||
|
||||
_jobs.append(
|
||||
{
|
||||
"id": 1,
|
||||
"urls": [f"https://example.com/{i}" for i in range(10)],
|
||||
"status": "running",
|
||||
"started": "2026-01-01 00:00 UTC",
|
||||
"output": "",
|
||||
}
|
||||
)
|
||||
resp = client_no_auth.get("/")
|
||||
assert "+7 more" in resp.text
|
||||
|
||||
|
||||
# ── HTML template rendering ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestTemplates:
|
||||
"""Verify HTML templates can be .format()-ed without KeyError."""
|
||||
|
||||
def test_login_html_renders(self):
|
||||
from deploy.web_ui import LOGIN_HTML
|
||||
|
||||
result = LOGIN_HTML.format(error="")
|
||||
assert "Auto Archiver" in result
|
||||
|
||||
def test_login_html_renders_with_error(self):
|
||||
from deploy.web_ui import LOGIN_HTML
|
||||
|
||||
result = LOGIN_HTML.format(error='<p class="err">Nope</p>')
|
||||
assert "Nope" in result
|
||||
|
||||
def test_main_html_renders(self):
|
||||
from deploy.web_ui import MAIN_HTML
|
||||
|
||||
result = MAIN_HTML.format(logout="", jobs_html="")
|
||||
assert "Auto Archiver" in result
|
||||
|
||||
def test_results_html_renders(self):
|
||||
from deploy.web_ui import RESULTS_HTML
|
||||
|
||||
result = RESULTS_HTML.format(file_list="<p>empty</p>")
|
||||
assert "Archived Files" in result
|
||||
269
deploy/web_ui.py
269
deploy/web_ui.py
@@ -1,269 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Minimal web UI for auto-archiver cloud deployments.
|
||||
|
||||
Provides:
|
||||
- GET / → HTML form to submit URLs for archiving
|
||||
- POST /archive → Runs auto-archiver on submitted URLs
|
||||
- GET /results → Lists archived files available for download
|
||||
- GET /files/{path} → Serves archived files
|
||||
- GET /status → Health check
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import html
|
||||
import os
|
||||
import secrets
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
|
||||
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
|
||||
|
||||
AUTH_PASSWORD = os.environ.get("AUTH_PASSWORD", "")
|
||||
ARCHIVE_DIR = Path("/app/local_archive")
|
||||
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
|
||||
COOKIE_NAME = "aa_session"
|
||||
|
||||
# In-memory session tokens (reset on restart, which is fine for this use case)
|
||||
_valid_sessions: set[str] = set()
|
||||
# In-memory job log
|
||||
_jobs: list[dict] = []
|
||||
|
||||
app = FastAPI(title="Auto Archiver", docs_url=None, redoc_url=None)
|
||||
|
||||
|
||||
# ── Auth helpers ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _check_auth(request: Request):
|
||||
"""Dependency: redirect to /login if auth is enabled and session is missing."""
|
||||
if not AUTH_PASSWORD:
|
||||
return # auth disabled
|
||||
token = request.cookies.get(COOKIE_NAME, "")
|
||||
if token not in _valid_sessions:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
|
||||
headers={"Location": "/login"},
|
||||
)
|
||||
|
||||
|
||||
# ── Pages ─────────────────────────────────────────────────────────────
|
||||
|
||||
LOGIN_HTML = """<!DOCTYPE html>
|
||||
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
|
||||
<title>Auto Archiver – Login</title>
|
||||
<style>
|
||||
body {{ font-family: system-ui, sans-serif; max-width: 420px; margin: 80px auto; padding: 0 1rem; }}
|
||||
h1 {{ font-size: 1.4rem; }}
|
||||
input[type=password], button {{ font-size: 1rem; padding: .5rem .8rem; }}
|
||||
input[type=password] {{ width: 100%; box-sizing: border-box; margin: .5rem 0; }}
|
||||
button {{ cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; }}
|
||||
.err {{ color: #dc2626; }}
|
||||
</style></head><body>
|
||||
<h1>🔐 Auto Archiver</h1>
|
||||
<form method="POST" action="/login">
|
||||
<label>Password<br><input type="password" name="password" autofocus required></label><br>
|
||||
<button type="submit">Log in</button>
|
||||
{error}
|
||||
</form></body></html>"""
|
||||
|
||||
|
||||
MAIN_HTML = """<!DOCTYPE html>
|
||||
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
|
||||
<title>Auto Archiver</title>
|
||||
<style>
|
||||
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; line-height: 1.6; }}
|
||||
h1 {{ font-size: 1.5rem; }}
|
||||
textarea {{ width: 100%; box-sizing: border-box; font-size: .95rem; font-family: monospace; }}
|
||||
button {{ font-size: 1rem; padding: .5rem 1.2rem; cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; margin-top: .5rem; }}
|
||||
table {{ border-collapse: collapse; width: 100%; margin-top: 1rem; }}
|
||||
th, td {{ border: 1px solid #e5e7eb; padding: .4rem .6rem; text-align: left; font-size: .9rem; }}
|
||||
th {{ background: #f9fafb; }}
|
||||
.status {{ padding: 2px 8px; border-radius: 4px; font-size: .85rem; }}
|
||||
.running {{ background: #fef3c7; color: #92400e; }}
|
||||
.done {{ background: #d1fae5; color: #065f46; }}
|
||||
.failed {{ background: #fee2e2; color: #991b1b; }}
|
||||
a {{ color: #2563eb; }}
|
||||
.info {{ color: #6b7280; font-size: .9rem; }}
|
||||
nav {{ display: flex; gap: 1rem; align-items: center; }}
|
||||
nav a {{ text-decoration: none; }}
|
||||
</style></head><body>
|
||||
<nav>
|
||||
<h1>📦 Auto Archiver</h1>
|
||||
<a href="/results">Browse files</a>
|
||||
{logout}
|
||||
</nav>
|
||||
<form method="POST" action="/archive">
|
||||
<label for="urls"><strong>URLs to archive</strong> (one per line)</label><br>
|
||||
<textarea id="urls" name="urls" rows="5" placeholder="https://example.com/post https://youtube.com/watch?v=..." required></textarea><br>
|
||||
<button type="submit">Archive</button>
|
||||
</form>
|
||||
{jobs_html}
|
||||
</body></html>"""
|
||||
|
||||
|
||||
RESULTS_HTML = """<!DOCTYPE html>
|
||||
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
|
||||
<title>Auto Archiver – Files</title>
|
||||
<style>
|
||||
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; }}
|
||||
h1 {{ font-size: 1.4rem; }}
|
||||
a {{ color: #2563eb; }}
|
||||
li {{ margin: .3rem 0; font-family: monospace; font-size: .9rem; }}
|
||||
</style></head><body>
|
||||
<h1>📁 Archived Files</h1>
|
||||
<p><a href="/">← Back</a></p>
|
||||
{file_list}
|
||||
</body></html>"""
|
||||
|
||||
|
||||
# ── Routes ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
async def login_page():
|
||||
if not AUTH_PASSWORD:
|
||||
return RedirectResponse("/", status_code=302)
|
||||
return LOGIN_HTML.format(error="")
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
async def login_submit(password: str = Form(...)):
|
||||
if not AUTH_PASSWORD:
|
||||
return RedirectResponse("/", status_code=302)
|
||||
if password != AUTH_PASSWORD:
|
||||
return HTMLResponse(
|
||||
LOGIN_HTML.format(error='<p class="err">Wrong password.</p>'),
|
||||
status_code=401,
|
||||
)
|
||||
token = secrets.token_urlsafe(32)
|
||||
_valid_sessions.add(token)
|
||||
resp = RedirectResponse("/", status_code=302)
|
||||
resp.set_cookie(COOKIE_NAME, token, httponly=True, samesite="lax", max_age=86400 * 30)
|
||||
return resp
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index(request: Request, _=Depends(_check_auth)):
|
||||
logout = '<a href="/logout">Logout</a>' if AUTH_PASSWORD else ""
|
||||
jobs_html = _render_jobs()
|
||||
return MAIN_HTML.format(logout=logout, jobs_html=jobs_html)
|
||||
|
||||
|
||||
@app.post("/archive")
|
||||
async def archive(request: Request, urls: str = Form(...), _=Depends(_check_auth)):
|
||||
url_list = [u.strip() for u in urls.strip().splitlines() if u.strip()]
|
||||
if not url_list:
|
||||
raise HTTPException(400, "No URLs provided")
|
||||
|
||||
job = {
|
||||
"id": len(_jobs) + 1,
|
||||
"urls": url_list,
|
||||
"status": "running",
|
||||
"started": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
|
||||
"output": "",
|
||||
}
|
||||
_jobs.insert(0, job)
|
||||
|
||||
# Run in background so the user sees the page immediately
|
||||
asyncio.create_task(_run_archive(job))
|
||||
return RedirectResponse("/", status_code=303)
|
||||
|
||||
|
||||
@app.get("/results", response_class=HTMLResponse)
|
||||
async def results(request: Request, _=Depends(_check_auth)):
|
||||
if not ARCHIVE_DIR.exists():
|
||||
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
|
||||
|
||||
files = sorted(ARCHIVE_DIR.rglob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
|
||||
files = [f for f in files if f.is_file()]
|
||||
|
||||
if not files:
|
||||
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
|
||||
|
||||
items = []
|
||||
for f in files[:200]: # cap listing
|
||||
rel = f.relative_to(ARCHIVE_DIR)
|
||||
items.append(f'<li><a href="/files/{rel}">{html.escape(str(rel))}</a></li>')
|
||||
|
||||
return RESULTS_HTML.format(file_list="<ul>" + "\n".join(items) + "</ul>")
|
||||
|
||||
|
||||
@app.get("/files/{path:path}")
|
||||
async def serve_file(path: str, request: Request, _=Depends(_check_auth)):
|
||||
full = ARCHIVE_DIR / path
|
||||
if not full.exists() or not full.is_file():
|
||||
raise HTTPException(404, "File not found")
|
||||
# Security: ensure the resolved path is within ARCHIVE_DIR
|
||||
try:
|
||||
full.resolve().relative_to(ARCHIVE_DIR.resolve())
|
||||
except ValueError:
|
||||
raise HTTPException(403, "Forbidden")
|
||||
return FileResponse(full)
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def health():
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@app.get("/logout")
|
||||
async def logout(request: Request):
|
||||
token = request.cookies.get(COOKIE_NAME, "")
|
||||
_valid_sessions.discard(token)
|
||||
resp = RedirectResponse("/login", status_code=302)
|
||||
resp.delete_cookie(COOKIE_NAME)
|
||||
return resp
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _run_archive(job: dict):
|
||||
"""Run auto-archiver as a subprocess for the given URLs."""
|
||||
cmd = [
|
||||
"python3",
|
||||
"-m",
|
||||
"auto_archiver",
|
||||
"--config",
|
||||
str(CONFIG_PATH),
|
||||
] + job["urls"]
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
cwd="/app",
|
||||
)
|
||||
stdout, _ = await proc.communicate()
|
||||
job["output"] = stdout.decode(errors="replace")[-5000:] # keep last 5k chars
|
||||
job["status"] = "done" if proc.returncode == 0 else "failed"
|
||||
except Exception as e:
|
||||
job["output"] = str(e)
|
||||
job["status"] = "failed"
|
||||
|
||||
|
||||
def _render_jobs() -> str:
|
||||
if not _jobs:
|
||||
return '<p class="info">No archiving jobs yet. Submit URLs above to get started.</p>'
|
||||
|
||||
rows = []
|
||||
for j in _jobs[:50]:
|
||||
urls_str = html.escape(", ".join(j["urls"][:3]))
|
||||
if len(j["urls"]) > 3:
|
||||
urls_str += f" (+{len(j['urls']) - 3} more)"
|
||||
status_cls = j["status"]
|
||||
rows.append(
|
||||
f"<tr><td>{j['id']}</td>"
|
||||
f"<td>{urls_str}</td>"
|
||||
f'<td><span class="status {status_cls}">{j["status"]}</span></td>'
|
||||
f"<td>{j['started']}</td></tr>"
|
||||
)
|
||||
|
||||
return (
|
||||
"<h2>Recent Jobs</h2>"
|
||||
"<table><thead><tr><th>#</th><th>URLs</th><th>Status</th><th>Started</th></tr></thead>"
|
||||
"<tbody>" + "\n".join(rows) + "</tbody></table>"
|
||||
)
|
||||
1260
poetry.lock
generated
1260
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[project]
|
||||
name = "auto-archiver"
|
||||
version = "1.2.5"
|
||||
version = "1.2.7"
|
||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
99
railway.json
99
railway.json
@@ -1,99 +0,0 @@
|
||||
{
|
||||
"$schema": "https://railway.app/railway.schema.json",
|
||||
"build": {
|
||||
"dockerfilePath": "deploy/Dockerfile"
|
||||
},
|
||||
"deploy": {
|
||||
"startCommand": "python3 -m deploy.start",
|
||||
"healthcheckPath": "/status",
|
||||
"healthcheckTimeout": 30,
|
||||
"restartPolicyType": "ON_FAILURE",
|
||||
"restartPolicyMaxRetries": 5
|
||||
},
|
||||
"variables": {
|
||||
"AUTH_PASSWORD": {
|
||||
"description": "Password to access your archiver web interface",
|
||||
"required": true
|
||||
},
|
||||
"GSHEET_URL": {
|
||||
"description": "Google Sheet URL to monitor for new URLs (leave empty to disable)",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"GOOGLE_SERVICE_ACCOUNT_JSON": {
|
||||
"description": "Full JSON contents of your Google service account key (required for Sheets)",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"POLL_INTERVAL": {
|
||||
"description": "Seconds between Google Sheet checks (min 60)",
|
||||
"required": false,
|
||||
"default": "300"
|
||||
},
|
||||
"S3_BUCKET": {
|
||||
"description": "S3 bucket name for storage (leave empty for local-only)",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"S3_KEY": {
|
||||
"description": "S3 access key ID",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"S3_SECRET": {
|
||||
"description": "S3 secret access key",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"S3_REGION": {
|
||||
"description": "S3 region (e.g. us-east-1, nyc3 for DO Spaces)",
|
||||
"required": false,
|
||||
"default": "us-east-1"
|
||||
},
|
||||
"S3_ENDPOINT": {
|
||||
"description": "S3 endpoint URL template",
|
||||
"required": false,
|
||||
"default": "https://s3.{region}.amazonaws.com"
|
||||
},
|
||||
"S3_CDN_URL": {
|
||||
"description": "Public CDN URL template for archived files",
|
||||
"required": false,
|
||||
"default": "https://{bucket}.s3.{region}.amazonaws.com/{key}"
|
||||
},
|
||||
"TELEGRAM_API_ID": {
|
||||
"description": "Telegram API ID from https://my.telegram.org",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"TELEGRAM_API_HASH": {
|
||||
"description": "Telegram API hash from https://my.telegram.org",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"TELEGRAM_BOT_TOKEN": {
|
||||
"description": "Telegram bot token from @BotFather",
|
||||
"required": false,
|
||||
"default": ""
|
||||
},
|
||||
"ENABLE_SCREENSHOTS": {
|
||||
"description": "Set to true to capture full-page screenshots",
|
||||
"required": false,
|
||||
"default": "false"
|
||||
},
|
||||
"ENABLE_THUMBNAILS": {
|
||||
"description": "Set to true to generate video thumbnails",
|
||||
"required": false,
|
||||
"default": "false"
|
||||
},
|
||||
"ENABLE_CSV_DB": {
|
||||
"description": "Set to true to save a CSV log of archived items",
|
||||
"required": false,
|
||||
"default": "false"
|
||||
},
|
||||
"LOG_LEVEL": {
|
||||
"description": "Logging level: DEBUG, INFO, WARNING, ERROR",
|
||||
"required": false,
|
||||
"default": "INFO"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ Key Functionalities:
|
||||
|
||||
from __future__ import annotations
|
||||
import hashlib
|
||||
import os
|
||||
from typing import Any, List, Union, Dict
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses_json import dataclass_json
|
||||
@@ -186,6 +187,9 @@ class Metadata:
|
||||
continue
|
||||
h = m.get("hash")
|
||||
if not h:
|
||||
if not os.path.exists(m.filename):
|
||||
logger.warning(f"Skipping missing media file: {m.filename}")
|
||||
continue
|
||||
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
|
||||
if len(h) and h in media_hashes:
|
||||
continue
|
||||
|
||||
@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
||||
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
|
||||
|
||||
def check_for_updates(self):
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
|
||||
try:
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
|
||||
except Exception as e:
|
||||
logger.debug(f"Unable to check for updates: {e}")
|
||||
return
|
||||
latest_version = version.parse(response["info"]["version"])
|
||||
current_version = version.parse(__version__)
|
||||
# check version compared to current version
|
||||
|
||||
@@ -575,6 +575,8 @@ class GenericExtractor(Extractor):
|
||||
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
||||
"--postprocessor-args",
|
||||
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
|
||||
"--js-runtimes",
|
||||
"node", # yt-dlp defaults to deno-only; node is available in the base image
|
||||
]
|
||||
|
||||
# proxy handling
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .ghostarchive_enricher import GhostarchiveEnricher
|
||||
@@ -0,0 +1,58 @@
|
||||
{
|
||||
"name": "Ghost Archive Enricher",
|
||||
"type": ["enricher"],
|
||||
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
|
||||
"requires_setup": False,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "requests", "bs4", "seleniumbase"],
|
||||
},
|
||||
"configs": {
|
||||
"timeout": {
|
||||
"default": 120,
|
||||
"type": "int",
|
||||
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
|
||||
},
|
||||
"check_existing": {
|
||||
"default": True,
|
||||
"type": "bool",
|
||||
"help": "whether to search for an existing archive before submitting a new one.",
|
||||
},
|
||||
"proxy_http": {
|
||||
"default": None,
|
||||
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
"proxy_https": {
|
||||
"default": None,
|
||||
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
|
||||
|
||||
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
|
||||
|
||||
### Features
|
||||
- Archives any public URL using the Ghost Archive service.
|
||||
- Optionally checks for existing archives before submitting a new one.
|
||||
- Supports HTTP and HTTPS proxies for requests.
|
||||
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
|
||||
|
||||
### Important
|
||||
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
|
||||
It does **not** verify the contents or completeness of the archived page.
|
||||
|
||||
### Notes
|
||||
- Ghost Archive is a free service with no authentication required.
|
||||
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
|
||||
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
|
||||
- Archival may take up to 5 minutes depending on the queue and page complexity.
|
||||
- Archived content is stored indefinitely.
|
||||
- Ghost Archive does not archive pages that require authentication or form submission.
|
||||
|
||||
### Limitations
|
||||
- No official API — this module interacts with the Ghost Archive web interface.
|
||||
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
|
||||
- Searching for existing archives uses plain HTTP requests and does not require a browser.
|
||||
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
|
||||
""",
|
||||
}
|
||||
@@ -0,0 +1,153 @@
|
||||
import time
|
||||
import re
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from seleniumbase import SB
|
||||
from auto_archiver.utils.custom_logger import logger
|
||||
from auto_archiver.utils import url as UrlUtil
|
||||
from auto_archiver.core import Enricher, Metadata
|
||||
|
||||
|
||||
class GhostarchiveEnricher(Enricher):
|
||||
"""
|
||||
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
|
||||
and stores the archived page URL as enrichment metadata.
|
||||
|
||||
Ghost Archive has no official API — this module interacts with the web form
|
||||
and parses HTML responses. The submission endpoint is protected by Cloudflare,
|
||||
so a headless browser (SeleniumBase) is used for archival submissions, while
|
||||
plain HTTP requests are used for searching existing archives.
|
||||
|
||||
Note: this module only confirms that Ghost Archive accepted the submission
|
||||
and returned an archive URL. It does not verify that the archived page
|
||||
content is complete or correctly rendered.
|
||||
"""
|
||||
|
||||
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
|
||||
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
|
||||
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
|
||||
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
|
||||
|
||||
def _get_proxies(self) -> dict:
|
||||
proxies = {}
|
||||
if self.proxy_http:
|
||||
proxies["http"] = self.proxy_http
|
||||
if self.proxy_https:
|
||||
proxies["https"] = self.proxy_https
|
||||
return proxies
|
||||
|
||||
def _get_headers(self) -> dict:
|
||||
return {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
}
|
||||
|
||||
def _normalize_archive_href(self, href: str) -> str | None:
|
||||
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
|
||||
if "/archive/" not in href or "/replay/" in href:
|
||||
return None
|
||||
if href.startswith("/"):
|
||||
return f"{self.GHOSTARCHIVE_BASE}{href}"
|
||||
if href.startswith("http://ghostarchive.org"):
|
||||
return href.replace("http://", "https://")
|
||||
if href.startswith("https://ghostarchive.org"):
|
||||
return href
|
||||
return None
|
||||
|
||||
def _search_existing(self, url: str) -> str | None:
|
||||
"""
|
||||
Search Ghost Archive for an existing archive of the given URL.
|
||||
Returns the archive URL if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
r = requests.get(
|
||||
self.SEARCH_ENDPOINT,
|
||||
params={"term": url},
|
||||
headers=self._get_headers(),
|
||||
proxies=self._get_proxies(),
|
||||
timeout=30,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.warning(f"Ghost Archive search returned status {r.status_code}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
logger.info(f"Found existing Ghost Archive: {archive_url}")
|
||||
return archive_url
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Ghost Archive search failed: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _submit_url(self, url: str) -> str | None:
|
||||
"""
|
||||
Submit a URL to Ghost Archive for archiving using a headless browser.
|
||||
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
|
||||
Returns the archive URL if successful, otherwise None.
|
||||
"""
|
||||
try:
|
||||
with SB(uc=True, headless=True) as sb:
|
||||
logger.debug("Opening Ghost Archive homepage in headless browser")
|
||||
sb.open(self.GHOSTARCHIVE_BASE)
|
||||
|
||||
# fill in the archive form and submit
|
||||
sb.type('input[name="archive"]', url)
|
||||
sb.click('input[type="submit"][value="Submit for archival"]')
|
||||
|
||||
# wait for navigation to /archive/{id} or timeout
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < self.timeout:
|
||||
current_url = sb.get_current_url()
|
||||
if self.ARCHIVE_URL_PATTERN.search(current_url):
|
||||
archive_url = current_url.split("?")[0]
|
||||
logger.info(f"Ghost Archive saved: {archive_url}")
|
||||
return archive_url
|
||||
time.sleep(2)
|
||||
|
||||
# if we didn't redirect, try parsing the page source
|
||||
page_source = sb.get_page_source()
|
||||
return self._parse_archive_url(page_source)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Ghost Archive submission failed: {e}")
|
||||
return None
|
||||
|
||||
def _parse_archive_url(self, html: str) -> str | None:
|
||||
"""Parse HTML response to find an archive URL."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
return archive_url
|
||||
return None
|
||||
|
||||
def enrich(self, to_enrich: Metadata) -> bool:
|
||||
url = to_enrich.get_url()
|
||||
if UrlUtil.is_auth_wall(url):
|
||||
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
|
||||
return False
|
||||
|
||||
if to_enrich.get("ghostarchive"):
|
||||
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
|
||||
return True
|
||||
|
||||
# optionally check for existing archive first
|
||||
archive_url = None
|
||||
if self.check_existing:
|
||||
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
|
||||
archive_url = self._search_existing(url)
|
||||
|
||||
if not archive_url:
|
||||
logger.debug(f"Submitting {url} to Ghost Archive")
|
||||
archive_url = self._submit_url(url)
|
||||
|
||||
if archive_url:
|
||||
to_enrich.set("ghostarchive", archive_url)
|
||||
return True
|
||||
|
||||
logger.warning(f"Ghost Archive failed to archive {url}")
|
||||
return False
|
||||
@@ -64,7 +64,6 @@ class DeletionIndicators:
|
||||
# YouTube deletion indicators
|
||||
YOUTUBE = [
|
||||
"This video isn't available anymore",
|
||||
"Video unavailable",
|
||||
"This video has been removed",
|
||||
"This video is no longer available",
|
||||
"This video is private",
|
||||
|
||||
@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
|
||||
directory = os.path.dirname(base_filename) # '/get/path/to'
|
||||
basename = os.path.basename(base_filename) # 'file'
|
||||
for f in os.listdir(directory):
|
||||
# skip incomplete downloads left behind by yt-dlp
|
||||
if f.endswith(".part"):
|
||||
continue
|
||||
if (
|
||||
f.startswith(basename)
|
||||
or (entry_url and os.path.splitext(f)[0] in entry_url)
|
||||
|
||||
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
@@ -0,0 +1,277 @@
|
||||
import pytest
|
||||
import requests
|
||||
import os
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
|
||||
|
||||
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
|
||||
|
||||
# sample HTML responses for mocking
|
||||
SEARCH_HTML_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<table>
|
||||
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
|
||||
</table>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SEARCH_HTML_NOT_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<p>Page 0 out of 0</p>
|
||||
<p>No archives for that site.</p>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SAVE_RESPONSE_HTML_WITH_LINK = """
|
||||
<html><body>
|
||||
<h1>Archive saved</h1>
|
||||
<a href="/archive/Xyz99">View archive</a>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
ENRICHER_CONFIG = {
|
||||
"timeout": 120,
|
||||
"check_existing": True,
|
||||
"proxy_http": None,
|
||||
"proxy_https": None,
|
||||
}
|
||||
|
||||
|
||||
class TestGhostarchiveEnricher:
|
||||
"""Tests for Ghost Archive Enricher"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_enricher(self, setup_module):
|
||||
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
|
||||
def test_search_existing_found(self, mocker):
|
||||
"""When an existing archive is found, it should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Abc12"
|
||||
|
||||
def test_search_existing_not_found(self, mocker):
|
||||
"""When no existing archive is found, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_NOT_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_request_error(self, mocker):
|
||||
"""When search request fails, None should be returned."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
side_effect=requests.exceptions.ConnectionError("connection failed"),
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_non_200(self, mocker):
|
||||
"""When search returns non-200, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 503
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_submit_url_success_redirect(self, mocker):
|
||||
"""Successful submission via headless browser should return archive URL."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.type.assert_called_once()
|
||||
mock_sb.click.assert_called_once()
|
||||
|
||||
def test_submit_url_success_redirect_strips_query(self, mocker):
|
||||
"""Redirect URL query params should be stripped."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
|
||||
def test_submit_url_success_html_fallback(self, mocker):
|
||||
"""When browser doesn't redirect, should parse page source for archive link."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
|
||||
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
|
||||
self.enricher.timeout = 0
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Xyz99"
|
||||
|
||||
def test_submit_url_browser_error(self, mocker):
|
||||
"""Browser error during submission should return None."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
|
||||
side_effect=Exception("browser failed to start"),
|
||||
)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_proxy_configuration(self, mocker):
|
||||
"""Proxies should be passed to search requests when configured."""
|
||||
self.enricher.proxy_http = "http://proxy:8080"
|
||||
self.enricher.proxy_https = "https://proxy:8443"
|
||||
|
||||
mock_get = mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
)
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
|
||||
call_kwargs = mock_get.call_args
|
||||
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
|
||||
assert result is not None
|
||||
|
||||
def test_parse_archive_url_with_replay_links(self):
|
||||
"""Parser should ignore /replay/ links and only return /archive/ links."""
|
||||
html = """
|
||||
<html><body>
|
||||
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
|
||||
<a href="/archive/Valid1">valid</a>
|
||||
</body></html>
|
||||
"""
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result == "https://ghostarchive.org/archive/Valid1"
|
||||
|
||||
def test_parse_archive_url_no_links(self):
|
||||
"""Parser should return None when no archive links found."""
|
||||
html = "<html><body><p>No archive here</p></body></html>"
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result is None
|
||||
|
||||
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
|
||||
"""enrich() should set 'ghostarchive' key on the metadata object."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
|
||||
|
||||
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
|
||||
"""enrich() should skip if ghostarchive key is already set."""
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
|
||||
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
def test_enrich_returns_false_on_failure(self, mocker, make_item):
|
||||
"""enrich() should return False when both search and submit fail."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_enrich_skips_auth_wall(self, mocker, make_item):
|
||||
"""enrich() should skip URLs behind auth walls."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
|
||||
)
|
||||
|
||||
item = make_item("https://example.com/login")
|
||||
result = self.enricher.enrich(item)
|
||||
assert result is False
|
||||
|
||||
def test_enrich_with_existing_archive(self, mocker, make_item):
|
||||
"""enrich() should use existing archive when check_existing is True."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
|
||||
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
def test_enrich_submits_when_no_existing(self, mocker, make_item):
|
||||
"""enrich() should submit URL when no existing archive found."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
|
||||
|
||||
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
|
||||
"""enrich() should skip search when check_existing is False."""
|
||||
self.enricher.check_existing = False
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_real_search_existing(self, setup_module):
|
||||
"""Integration test: search for an existing archive on Ghost Archive."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
# example.com is commonly archived
|
||||
result = enricher._search_existing("https://example.com")
|
||||
# we just check it doesn't crash; result may or may not be found
|
||||
assert result is None or result.startswith("https://ghostarchive.org/archive/")
|
||||
|
||||
@pytest.mark.download
|
||||
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
|
||||
def test_real_submit_example_com(self, setup_module, make_item):
|
||||
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
item = make_item("https://example.com")
|
||||
result = enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
archive_url = item.get("ghostarchive")
|
||||
assert archive_url is not None
|
||||
assert archive_url.startswith("https://ghostarchive.org/archive/")
|
||||
@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
|
||||
assert basic_metadata.get_media_by_id("m1") == media1
|
||||
|
||||
|
||||
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
|
||||
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
|
||||
real_file = tmp_path / "exists.txt"
|
||||
real_file.write_text("content")
|
||||
valid = media_file(filename=str(real_file), hash_value="abc")
|
||||
missing = media_file(filename="/nonexistent/path/gone.mp4")
|
||||
|
||||
basic_metadata.add_media(valid, "valid")
|
||||
basic_metadata.add_media(missing, "missing")
|
||||
|
||||
assert len(basic_metadata.media) == 2
|
||||
basic_metadata.remove_duplicate_media_by_hash()
|
||||
assert len(basic_metadata.media) == 1
|
||||
assert basic_metadata.get_media_by_id("valid") == valid
|
||||
|
||||
|
||||
def test_success():
|
||||
m = Metadata()
|
||||
assert not m.is_success()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from argparse import ArgumentParser, ArgumentTypeError
|
||||
from requests.exceptions import SSLError
|
||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
|
||||
assert "Error during setup of modules: Test exception" in caplog.text
|
||||
# make sure the 'cleanup' is called
|
||||
assert "cleanup" in caplog.text
|
||||
|
||||
|
||||
def test_check_for_updates_ssl_error(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when the HTTP request fails."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=SSLError("SSL handshake failed"),
|
||||
)
|
||||
# should not raise
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_timeout(orchestrator, mocker):
|
||||
"""check_for_updates should not raise on connection timeout."""
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=ConnectionError("Connection refused"),
|
||||
)
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_new_version_available(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when a newer version exists."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
|
||||
)
|
||||
# should complete without error
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
|
||||
calculate_file_hash,
|
||||
random_str,
|
||||
get_timestamp,
|
||||
ydl_entry_to_filename,
|
||||
)
|
||||
|
||||
|
||||
@@ -139,3 +140,47 @@ class TestMiscUtils:
|
||||
|
||||
def test_invalid_timestamp_returns_none(self):
|
||||
assert get_timestamp("invalid-date") is None
|
||||
|
||||
|
||||
class TestYdlEntryToFilename:
|
||||
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
|
||||
|
||||
def _make_mock_ydl(self, prepared_filename):
|
||||
class MockYDL:
|
||||
def prepare_filename(self, entry):
|
||||
return prepared_filename
|
||||
|
||||
return MockYDL()
|
||||
|
||||
def test_returns_exact_file_if_exists(self, tmp_path):
|
||||
video = tmp_path / "video.mp4"
|
||||
video.write_bytes(b"data")
|
||||
ydl = self._make_mock_ydl(str(video))
|
||||
assert ydl_entry_to_filename(ydl, {}) == str(video)
|
||||
|
||||
def test_skips_part_file_returns_complete(self, tmp_path):
|
||||
"""Simulates yt-dlp leaving a .part file from a failed format
|
||||
while a complete .webm exists."""
|
||||
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
|
||||
webm = tmp_path / "f5U3IKfoSYs.webm"
|
||||
webm.write_bytes(b"complete video")
|
||||
|
||||
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
|
||||
result = ydl_entry_to_filename(ydl, {})
|
||||
|
||||
assert result == str(webm)
|
||||
assert not result.endswith(".part")
|
||||
|
||||
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
|
||||
"""Only a .part file exists — should return False."""
|
||||
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
def test_returns_false_when_no_files_match(self, tmp_path):
|
||||
(tmp_path / "unrelated.txt").write_bytes(b"data")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
Reference in New Issue
Block a user