Compare commits

..

22 Commits

Author SHA1 Message Date
Miguel Sozinho Ramalho
9e651bb849 Merge pull request #434 from PeterUpfold/video-unavailable-fix
Removes 'video unavailable' from YouTube deletion indicators
2026-05-01 10:21:30 +01:00
Peter Upfold
6581bbe139 Remove 'video unavailable' from YouTube deletion indicators 2026-04-28 21:02:38 +01:00
msramalho
e633be1721 version bump 2026-04-27 12:35:54 +01:00
msramalho
bc06de8e5c fixes incomplete yt-dlp parts download 2026-04-27 12:34:47 +01:00
Miguel Sozinho Ramalho
20fddce3a3 Merge pull request #427 from PeterUpfold/deno-container
Fix missing JS runtime config for bguils_po_token_method
2026-04-24 11:08:28 +01:00
msramalho
6efa439cdb dependencies bump 2026-04-23 17:20:54 +01:00
Miguel Sozinho Ramalho
ef77d1fc86 Merge branch 'main' into dev 2026-04-23 14:21:01 +01:00
msramalho
a57a5ee005 adds an extra check when calling pypi as it's led to uncaught ssl errors 2026-04-23 14:20:07 +01:00
msramalho
2582f567ac removes curl/unzip from dockerfile 2026-04-23 14:04:46 +01:00
msramalho
4e5c1a6218 suggested alternative change to deno install 2026-04-23 14:02:51 +01:00
Peter Upfold
12d9c469b2 Add Deno to Dockerfile 2026-04-13 18:19:23 +01:00
Miguel Sozinho Ramalho
792838f1a1 Merge pull request #419 from bellingcat/dev
Dependencies bump, new ghostarchive enricher
2026-04-07 14:44:35 +01:00
Miguel Sozinho Ramalho
17c4ae15eb Merge branch 'main' into dev 2026-04-07 10:51:10 +01:00
msramalho
a08af07348 version bump 2026-04-06 18:34:20 +01:00
Miguel Sozinho Ramalho
e54077f4e8 Merge pull request #418 from bellingcat/feat/ghostarchive
Feat/ghostarchive
2026-04-06 18:33:15 +01:00
msramalho
319c0528da dependencies bump 2026-04-06 18:27:47 +01:00
msramalho
ae0e53e434 adds tests for new ghostarchive enricher feature 2026-04-06 17:15:32 +01:00
msramalho
82fc786d56 implements new enricher to submit URLs to ghostarchive 2026-04-06 17:13:48 +01:00
Miguel Sozinho Ramalho
aa65299844 Merge pull request #408 from bellingcat/dev
telethon compatibility with celery workers, dependency bumps
2026-03-16 11:28:21 +00:00
msramalho
1b69ec1f00 dependencies bump 2026-03-16 11:11:57 +00:00
Miguel Sozinho Ramalho
304e5d40b1 Merge branch 'main' into dev 2026-03-16 11:10:26 +00:00
Miguel Sozinho Ramalho
63cfe34e23 Merge pull request #407 from bellingcat/dev
minor bug fix: handles failed get downloads
2026-03-02 17:10:46 +00:00
29 changed files with 1247 additions and 2148 deletions

View File

@@ -1,29 +0,0 @@
name: Deploy Tests
on:
push:
branches: [ main ]
paths:
- deploy/**
pull_request:
paths:
- deploy/**
jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Python 3.12
uses: actions/setup-python@v6
with:
python-version: "3.12"
- name: Install dependencies
run: pip install pytest fastapi httpx python-multipart pyyaml
- name: Run Deploy Tests
working-directory: deploy
run: python -m pytest tests/ -v

View File

@@ -1,18 +1,17 @@
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
ENV RUNNING_IN_DOCKER=1 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONFAULTHANDLER=1 \
PATH="/root/.local/bin:$PATH"
PYTHONFAULTHANDLER=1
ARG TARGETARCH
# Installing system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
# Poetry and runtime
FROM base AS runtime

View File

@@ -22,40 +22,7 @@ Auto Archiver is a Python tool to automatically archive content on the web in a
Read the [article about Auto Archiver on bellingcat.com](https://www.bellingcat.com/resources/2022/09/22/preserve-vital-online-content-with-bellingcats-auto-archiver-tool/).
## One-Click Cloud Deploy
Deploy your own Auto Archiver instance to the cloud — no coding required:
[![Deploy on Railway](https://railway.app/button.svg)](https://railway.app/new/template?template=https://github.com/bellingcat/auto-archiver&envs=AUTH_PASSWORD,GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&optionalEnvs=GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&AUTH_PASSWORDDesc=Password+to+access+your+archiver+web+interface&GSHEET_URLDesc=Google+Sheet+URL+to+monitor+for+new+URLs+(leave+empty+to+disable)&POLL_INTERVALDesc=Seconds+between+Google+Sheet+checks+(min+60)&POLL_INTERVALDefault=300&S3_BUCKETDesc=S3+bucket+name+for+storage+(leave+empty+for+local+only)&S3_REGIONDefault=us-east-1&LOG_LEVELDefault=INFO)
**What you get:** A web interface where you can paste URLs and archive them instantly. Optionally connect a Google Sheet for automated monitoring, S3 for cloud storage, and Telegram for archiving channels.
**Only required setting:** `AUTH_PASSWORD` — everything else is optional and can be configured later via the Railway dashboard.
<details>
<summary>📋 Environment variables reference</summary>
| Variable | Required | Description |
|----------|----------|-------------|
| `AUTH_PASSWORD` | **Yes** | Password to access the web interface |
| `GSHEET_URL` | No | Google Sheet URL to monitor for new URLs [use this template](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?gid=0#gid=0) |
| `GOOGLE_SERVICE_ACCOUNT_JSON` | No | Google service account JSON (required with Sheets) [follow these instructions](https://auto-archiver.readthedocs.io/en/v1.0.1/how_to/gsheets_setup.html) |
| `POLL_INTERVAL` | No | Seconds between Sheet checks (default: 300) |
| `S3_BUCKET` | No | S3 bucket name for archived content, ideal for cloud hosting your archives but not mandatory, any S3-compatible storage works |
| `S3_KEY` / `S3_SECRET` | No | S3 credentials |
| `S3_REGION` | No | S3 region (default: us-east-1) |
| `S3_ENDPOINT` | No | S3 endpoint URL |
| `TELEGRAM_API_ID` / `TELEGRAM_API_HASH` | No | Telegram API credentials |
| `TELEGRAM_BOT_TOKEN` | No | Telegram bot token |
| `ENABLE_SCREENSHOTS` | No | Set to `true` for full-page screenshots |
| `ENABLE_THUMBNAILS` | No | Set to `true` for video thumbnails |
| `ENABLE_CSV_DB` | No | Set to `true` for CSV logging |
| `LOG_LEVEL` | No | DEBUG, INFO, WARNING, ERROR (default: INFO) |
</details>
## Traditional Installation
## Installation
View the [Installation Guide](https://auto-archiver.readthedocs.io/en/latest/installation/installation.html) for full instructions

View File

@@ -1,34 +0,0 @@
# ── Cloud Deploy ──────────────────────────────────────────────────────
# Thin web UI + config generator layer on top of the published
# auto-archiver Docker image. Used by the Railway one-click deploy.
#
# Build:
# docker build -f deploy/Dockerfile -t auto-archiver-deploy .
#
# Run:
# docker run -p 8080:8080 -e PORT=8080 -e AUTH_PASSWORD=secret auto-archiver-deploy
# ──────────────────────────────────────────────────────────────────────
FROM bellingcat/auto-archiver:latest
USER root
# Install the lightweight web layer dependencies
RUN pip install --no-cache-dir fastapi uvicorn[standard] python-multipart pyyaml
# Copy deploy scripts into the image
COPY deploy/ /app/deploy/
# Ensure writable dirs exist
RUN mkdir -p /app/local_archive /app/secrets && \
chown -R 1000:1000 /app/local_archive /app/secrets /app/deploy
USER 1000
# Railway sets PORT; default to 8080
ENV PORT=8080
EXPOSE ${PORT}
# Override the CLI entrypoint with the web server
ENTRYPOINT ["python3", "-m", "deploy.start"]

View File

@@ -1 +0,0 @@
# Cloud deployment layer for auto-archiver

View File

@@ -1,163 +0,0 @@
#!/usr/bin/env python3
"""
Generates orchestration.yaml from environment variables.
This script bridges Railway's env-var-based configuration with
auto-archiver's YAML-based configuration system. It runs at container
startup before the web UI server starts.
"""
import os
from pathlib import Path
import yaml
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
SECRETS_DIR = Path("/app/secrets")
def build_config() -> dict:
"""Build an orchestration config dict from environment variables."""
# -- Base config: always present ------------------------------------
config = {
"steps": {
"feeders": ["cli_feeder"],
"extractors": ["generic_extractor"],
"enrichers": ["hash_enricher"],
"databases": ["console_db"],
"storages": ["local_storage"],
"formatters": ["html_formatter"],
},
"logging": {
"level": os.environ.get("LOG_LEVEL", "INFO"),
},
"local_storage": {
"save_to": "/app/local_archive",
"path_generator": "flat",
"filename_generator": "static",
},
"generic_extractor": {
"subtitles": os.environ.get("SUBTITLES", "false").lower() == "true",
"comments": False,
"livestreams": False,
"live_from_start": False,
"end_means_success": True,
"allow_playlist": False,
},
"hash_enricher": {
"algorithm": "SHA-256",
},
"html_formatter": {
"detect_thumbnails": True,
},
"authentication": {},
}
# -- Google Sheets feeder (optional) --------------------------------
gsheet_url = os.environ.get("GSHEET_URL", "")
if gsheet_url:
config["steps"]["feeders"].append("gsheet_feeder")
config["steps"]["databases"].append("gsheet_db")
config["gsheet_feeder"] = {
"sheet": gsheet_url,
"header": 1,
"service_account": str(SECRETS_DIR / "service_account.json"),
"use_sheet_names_in_stored_paths": False,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "textual content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
},
}
# -- Google service account JSON (optional) -------------------------
sa_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")
if sa_json:
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
sa_path = SECRETS_DIR / "service_account.json"
sa_path.write_text(sa_json)
print(f"[deploy] Wrote Google service account to {sa_path}")
# -- S3 storage (optional) ------------------------------------------
s3_bucket = os.environ.get("S3_BUCKET", "")
if s3_bucket:
config["steps"]["storages"].append("s3_storage")
config["s3_storage"] = {
"bucket": s3_bucket,
"region": os.environ.get("S3_REGION", "us-east-1"),
"key": os.environ.get("S3_KEY", ""),
"secret": os.environ.get("S3_SECRET", ""),
"endpoint_url": os.environ.get("S3_ENDPOINT", "https://s3.{region}.amazonaws.com"),
"cdn_url": os.environ.get(
"S3_CDN_URL",
"https://{bucket}.s3.{region}.amazonaws.com/{key}",
),
"private": os.environ.get("S3_PRIVATE", "false").lower() == "true",
"random_no_duplicate": True,
"key_path": "random",
}
# -- Telegram extractor (optional) ----------------------------------
tg_api_id = os.environ.get("TELEGRAM_API_ID", "")
tg_api_hash = os.environ.get("TELEGRAM_API_HASH", "")
if tg_api_id and tg_api_hash:
config["steps"]["extractors"].append("telegram_extractor")
config["telegram_extractor"] = {
"api_id": tg_api_id,
"api_hash": tg_api_hash,
}
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
if bot_token:
config["telegram_extractor"]["bot_token"] = bot_token
# -- Screenshot enricher (optional) ---------------------------------
if os.environ.get("ENABLE_SCREENSHOTS", "").lower() == "true":
config["steps"]["enrichers"].append("screenshot_enricher")
config["screenshot_enricher"] = {
"width": 1280,
"height": 7200,
"save_to_pdf": True,
}
# -- Thumbnail enricher (optional) ----------------------------------
if os.environ.get("ENABLE_THUMBNAILS", "").lower() == "true":
config["steps"]["enrichers"].append("thumbnail_enricher")
config["thumbnail_enricher"] = {
"thumbnails_per_minute": 60,
"max_thumbnails": 16,
}
# -- CSV database (optional) ----------------------------------------
if os.environ.get("ENABLE_CSV_DB", "").lower() == "true":
config["steps"]["databases"].append("csv_db")
config["csv_db"] = {
"csv_file": "/app/local_archive/db.csv",
}
return config
def main():
config = build_config()
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(CONFIG_PATH, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
print(f"[deploy] Generated config at {CONFIG_PATH}")
print(f"[deploy] Active steps: {config['steps']}")
if __name__ == "__main__":
main()

View File

@@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
Background Google Sheets poller for auto-archiver cloud deployments.
When GSHEET_URL is set, periodically runs auto-archiver with gsheet_feeder
to check for new URLs in the configured spreadsheet. Runs as a daemon thread
alongside the web UI.
"""
import logging
import os
import subprocess
import threading
import time
logger = logging.getLogger("gsheet_poller")
CONFIG_PATH = "/app/secrets/orchestration.yaml"
def _poll_once():
"""Run auto-archiver once to process any new rows in the Google Sheet."""
logger.info("Polling Google Sheet for new URLs...")
try:
result = subprocess.run(
["python3", "-m", "auto_archiver", "--config", CONFIG_PATH],
capture_output=True,
text=True,
cwd="/app",
timeout=600, # 10 minute timeout per poll
)
if result.returncode == 0:
logger.info("Sheet poll completed successfully.")
else:
logger.warning("Sheet poll exited with code %d: %s", result.returncode, result.stderr[-500:])
except subprocess.TimeoutExpired:
logger.error("Sheet poll timed out after 600s")
except Exception:
logger.exception("Sheet poll failed")
def _poll_loop(interval: int):
"""Run the poll loop at the given interval (seconds)."""
logger.info("Google Sheets poller started (interval=%ds)", interval)
while True:
_poll_once()
time.sleep(interval)
def start_poller():
"""
Start the Google Sheets poller as a daemon thread if GSHEET_URL is set.
Call this once at application startup.
"""
gsheet_url = os.environ.get("GSHEET_URL", "")
if not gsheet_url:
logger.info("GSHEET_URL not set Sheet poller disabled.")
return
interval = int(os.environ.get("POLL_INTERVAL", "300"))
if interval < 60:
interval = 60 # minimum 1 minute
thread = threading.Thread(
target=_poll_loop,
args=(interval,),
daemon=True,
name="gsheet-poller",
)
thread.start()
logger.info("Google Sheets poller thread started.")

View File

@@ -1,2 +0,0 @@
[pytest]
testpaths = tests

View File

@@ -1,37 +0,0 @@
#!/usr/bin/env python3
"""
Startup entrypoint for cloud deployments.
1. Generates orchestration.yaml from environment variables
2. Starts the Google Sheets poller (if GSHEET_URL is set)
3. Starts the FastAPI web UI
"""
import os
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
# Generate config from env vars
from deploy.generate_config import main as generate_config # noqa: E402
generate_config()
# Start gsheet poller (no-op if GSHEET_URL not set)
from deploy.gsheet_poller import start_poller # noqa: E402
start_poller()
# Start web server
import uvicorn # noqa: E402
port = int(os.environ.get("PORT", "8080"))
uvicorn.run(
"deploy.web_ui:app",
host="0.0.0.0",
port=port,
log_level="info",
)

View File

@@ -1,354 +0,0 @@
"""Tests for deploy/generate_config.py config generation from env vars."""
import json
import os
from unittest.mock import patch
import yaml
from deploy.generate_config import build_config, main
# ── Helpers ───────────────────────────────────────────────────────────
def _env(**overrides):
"""Return a clean env dict with only the given overrides (no leak from host)."""
# Clear all deploy-relevant env vars, then apply overrides
deploy_vars = [
"LOG_LEVEL",
"SUBTITLES",
"GSHEET_URL",
"GOOGLE_SERVICE_ACCOUNT_JSON",
"S3_BUCKET",
"S3_KEY",
"S3_SECRET",
"S3_REGION",
"S3_ENDPOINT",
"S3_CDN_URL",
"S3_PRIVATE",
"TELEGRAM_API_ID",
"TELEGRAM_API_HASH",
"TELEGRAM_BOT_TOKEN",
"ENABLE_SCREENSHOTS",
"ENABLE_THUMBNAILS",
"ENABLE_CSV_DB",
]
clean = {k: v for k, v in os.environ.items() if k not in deploy_vars}
clean.update(overrides)
return clean
# ── Base config (no optional env vars) ────────────────────────────────
class TestBaseConfig:
"""When no optional env vars are set, build_config returns a minimal working config."""
def test_base_steps(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
steps = cfg["steps"]
assert steps["feeders"] == ["cli_feeder"]
assert steps["extractors"] == ["generic_extractor"]
assert steps["enrichers"] == ["hash_enricher"]
assert steps["databases"] == ["console_db"]
assert steps["storages"] == ["local_storage"]
assert steps["formatters"] == ["html_formatter"]
def test_base_has_required_module_configs(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "local_storage" in cfg
assert "generic_extractor" in cfg
assert "hash_enricher" in cfg
assert "html_formatter" in cfg
def test_default_log_level_is_info(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["logging"]["level"] == "INFO"
def test_custom_log_level(self):
with patch.dict(os.environ, _env(LOG_LEVEL="DEBUG"), clear=True):
cfg = build_config()
assert cfg["logging"]["level"] == "DEBUG"
def test_authentication_present_and_empty(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["authentication"] == {}
def test_local_storage_defaults(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
ls = cfg["local_storage"]
assert ls["save_to"] == "/app/local_archive"
assert ls["path_generator"] == "flat"
assert ls["filename_generator"] == "static"
def test_subtitles_default_false(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is False
def test_subtitles_enabled(self):
with patch.dict(os.environ, _env(SUBTITLES="true"), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is True
def test_subtitles_case_insensitive(self):
with patch.dict(os.environ, _env(SUBTITLES="True"), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is True
def test_no_optional_modules_present(self):
"""Ensure optional modules don't appear when their env vars are absent."""
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "gsheet_feeder" not in cfg
assert "s3_storage" not in cfg
assert "telegram_extractor" not in cfg
assert "screenshot_enricher" not in cfg
assert "thumbnail_enricher" not in cfg
assert "csv_db" not in cfg
def test_config_is_valid_yaml(self):
"""The output dict should round-trip through YAML cleanly."""
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
dumped = yaml.dump(cfg)
reloaded = yaml.safe_load(dumped)
assert reloaded == cfg
# ── Google Sheets ─────────────────────────────────────────────────────
class TestGSheetConfig:
def test_gsheet_adds_feeder_and_db(self):
with patch.dict(os.environ, _env(GSHEET_URL="https://docs.google.com/spreadsheets/d/abc"), clear=True):
cfg = build_config()
assert "gsheet_feeder" in cfg["steps"]["feeders"]
assert "gsheet_db" in cfg["steps"]["databases"]
def test_gsheet_feeder_config(self):
url = "https://docs.google.com/spreadsheets/d/abc123"
with patch.dict(os.environ, _env(GSHEET_URL=url), clear=True):
cfg = build_config()
gf = cfg["gsheet_feeder"]
assert gf["sheet"] == url
assert gf["header"] == 1
assert "service_account" in gf
assert gf["columns"]["url"] == "link"
assert gf["columns"]["status"] == "archive status"
def test_gsheet_preserves_cli_feeder(self):
"""cli_feeder should still be present even when gsheet is added."""
with patch.dict(os.environ, _env(GSHEET_URL="https://example.com/sheet"), clear=True):
cfg = build_config()
assert "cli_feeder" in cfg["steps"]["feeders"]
def test_service_account_json_written(self, tmp_path):
"""When GOOGLE_SERVICE_ACCOUNT_JSON is set, it writes the file."""
sa_data = json.dumps({"type": "service_account", "project_id": "test"})
secrets_dir = tmp_path / "secrets"
with (
patch.dict(os.environ, _env(GOOGLE_SERVICE_ACCOUNT_JSON=sa_data), clear=True),
patch("deploy.generate_config.SECRETS_DIR", secrets_dir),
):
build_config()
sa_path = secrets_dir / "service_account.json"
assert sa_path.exists()
assert json.loads(sa_path.read_text())["project_id"] == "test"
# ── S3 storage ────────────────────────────────────────────────────────
class TestS3Config:
def test_s3_adds_storage(self):
with patch.dict(os.environ, _env(S3_BUCKET="my-bucket"), clear=True):
cfg = build_config()
assert "s3_storage" in cfg["steps"]["storages"]
assert "local_storage" in cfg["steps"]["storages"] # local still there
def test_s3_config_values(self):
env = _env(
S3_BUCKET="my-bucket",
S3_KEY="AKID",
S3_SECRET="shhh",
S3_REGION="eu-west-1",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
s3 = cfg["s3_storage"]
assert s3["bucket"] == "my-bucket"
assert s3["key"] == "AKID"
assert s3["secret"] == "shhh"
assert s3["region"] == "eu-west-1"
assert s3["private"] is False
assert s3["random_no_duplicate"] is True
def test_s3_defaults(self):
with patch.dict(os.environ, _env(S3_BUCKET="b"), clear=True):
cfg = build_config()
s3 = cfg["s3_storage"]
assert s3["region"] == "us-east-1"
assert "{region}" in s3["endpoint_url"]
def test_s3_private_flag(self):
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_PRIVATE="true"), clear=True):
cfg = build_config()
assert cfg["s3_storage"]["private"] is True
def test_s3_custom_endpoint(self):
endpoint = "https://nyc3.digitaloceanspaces.com"
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_ENDPOINT=endpoint), clear=True):
cfg = build_config()
assert cfg["s3_storage"]["endpoint_url"] == endpoint
# ── Telegram ──────────────────────────────────────────────────────────
class TestTelegramConfig:
def test_telegram_added_when_both_set(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert "telegram_extractor" in cfg["steps"]["extractors"]
assert cfg["telegram_extractor"]["api_id"] == "12345"
assert cfg["telegram_extractor"]["api_hash"] == "abc"
def test_telegram_not_added_if_only_id(self):
with patch.dict(os.environ, _env(TELEGRAM_API_ID="12345"), clear=True):
cfg = build_config()
assert "telegram_extractor" not in cfg["steps"]["extractors"]
def test_telegram_not_added_if_only_hash(self):
with patch.dict(os.environ, _env(TELEGRAM_API_HASH="abc"), clear=True):
cfg = build_config()
assert "telegram_extractor" not in cfg["steps"]["extractors"]
def test_telegram_bot_token_optional(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc", TELEGRAM_BOT_TOKEN="bot:tok")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert cfg["telegram_extractor"]["bot_token"] == "bot:tok"
def test_telegram_no_bot_token(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert "bot_token" not in cfg["telegram_extractor"]
# ── Optional enrichers / databases ────────────────────────────────────
class TestOptionalModules:
def test_screenshots_disabled_by_default(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "screenshot_enricher" not in cfg["steps"]["enrichers"]
def test_screenshots_enabled(self):
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="true"), clear=True):
cfg = build_config()
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
assert cfg["screenshot_enricher"]["width"] == 1280
def test_thumbnails_enabled(self):
with patch.dict(os.environ, _env(ENABLE_THUMBNAILS="true"), clear=True):
cfg = build_config()
assert "thumbnail_enricher" in cfg["steps"]["enrichers"]
assert cfg["thumbnail_enricher"]["max_thumbnails"] == 16
def test_csv_db_enabled(self):
with patch.dict(os.environ, _env(ENABLE_CSV_DB="true"), clear=True):
cfg = build_config()
assert "csv_db" in cfg["steps"]["databases"]
assert cfg["csv_db"]["csv_file"] == "/app/local_archive/db.csv"
def test_case_insensitive_boolean(self):
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="TRUE"), clear=True):
cfg = build_config()
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
# ── Combined / full config ────────────────────────────────────────────
class TestCombinedConfig:
def test_all_optional_modules_together(self):
"""Enable everything at once and verify no conflicts."""
env = _env(
GSHEET_URL="https://example.com/sheet",
S3_BUCKET="bucket",
S3_KEY="key",
S3_SECRET="secret",
TELEGRAM_API_ID="123",
TELEGRAM_API_HASH="abc",
TELEGRAM_BOT_TOKEN="tok",
ENABLE_SCREENSHOTS="true",
ENABLE_THUMBNAILS="true",
ENABLE_CSV_DB="true",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
steps = cfg["steps"]
assert "gsheet_feeder" in steps["feeders"]
assert "telegram_extractor" in steps["extractors"]
assert "screenshot_enricher" in steps["enrichers"]
assert "thumbnail_enricher" in steps["enrichers"]
assert "csv_db" in steps["databases"]
assert "gsheet_db" in steps["databases"]
assert "s3_storage" in steps["storages"]
assert "local_storage" in steps["storages"]
# All module configs present
for key in [
"gsheet_feeder",
"s3_storage",
"telegram_extractor",
"screenshot_enricher",
"thumbnail_enricher",
"csv_db",
]:
assert key in cfg, f"{key} config missing"
def test_full_config_valid_yaml(self):
env = _env(
GSHEET_URL="https://example.com/sheet",
S3_BUCKET="bucket",
TELEGRAM_API_ID="123",
TELEGRAM_API_HASH="abc",
ENABLE_SCREENSHOTS="true",
ENABLE_CSV_DB="true",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
dumped = yaml.dump(cfg)
reloaded = yaml.safe_load(dumped)
assert reloaded == cfg
# ── main() writes file ───────────────────────────────────────────────
class TestMainFunction:
def test_main_writes_config_file(self, tmp_path):
config_path = tmp_path / "orchestration.yaml"
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
main()
assert config_path.exists()
cfg = yaml.safe_load(config_path.read_text())
assert cfg["steps"]["feeders"] == ["cli_feeder"]
def test_main_creates_parent_dirs(self, tmp_path):
config_path = tmp_path / "nested" / "dir" / "orchestration.yaml"
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
main()
assert config_path.exists()

View File

@@ -1,124 +0,0 @@
"""Tests for deploy/gsheet_poller.py background Google Sheets polling."""
import os
from unittest.mock import patch, MagicMock
from deploy.gsheet_poller import start_poller, _poll_once
# ── start_poller ──────────────────────────────────────────────────────
class TestStartPoller:
def test_disabled_when_no_gsheet_url(self):
"""No thread should be started when GSHEET_URL is empty."""
with (
patch.dict(os.environ, {"GSHEET_URL": ""}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
start_poller()
mock_thread.assert_not_called()
def test_disabled_when_gsheet_url_absent(self):
env = {k: v for k, v in os.environ.items() if k != "GSHEET_URL"}
with patch.dict(os.environ, env, clear=True), patch("deploy.gsheet_poller.threading.Thread") as mock_thread:
start_poller()
mock_thread.assert_not_called()
def test_starts_thread_when_gsheet_url_set(self):
with (
patch.dict(os.environ, {"GSHEET_URL": "https://example.com/sheet"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_instance = MagicMock()
mock_thread.return_value = mock_instance
start_poller()
mock_thread.assert_called_once()
assert mock_thread.call_args.kwargs["daemon"] is True
assert mock_thread.call_args.kwargs["name"] == "gsheet-poller"
mock_instance.start.assert_called_once()
def test_default_interval_300(self):
env = {"GSHEET_URL": "https://example.com/sheet"}
# Remove POLL_INTERVAL if present
clean_env = {k: v for k, v in os.environ.items() if k != "POLL_INTERVAL"}
clean_env.update(env)
with (
patch.dict(os.environ, clean_env, clear=True),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
# interval should be passed as arg to _poll_loop
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (300,)
def test_custom_interval(self):
with (
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "600"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (600,)
def test_interval_minimum_enforced(self):
"""Intervals below 60 should be clamped to 60."""
with (
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "10"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (60,)
# ── _poll_once ────────────────────────────────────────────────────────
class TestPollOnce:
def test_calls_subprocess_with_config(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
mock_run.assert_called_once()
cmd = mock_run.call_args[0][0]
assert "auto_archiver" in " ".join(cmd)
assert "--config" in cmd
def test_handles_nonzero_exit(self):
"""Should not raise on non-zero exit, just log a warning."""
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stderr="some error")
_poll_once() # should not raise
def test_handles_timeout(self):
"""Should not raise on timeout, just log."""
import subprocess
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=600)
_poll_once() # should not raise
def test_handles_exception(self):
"""Should not raise on arbitrary exceptions."""
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.side_effect = OSError("broken")
_poll_once() # should not raise
def test_uses_correct_config_path(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
cmd = mock_run.call_args[0][0]
config_idx = cmd.index("--config")
assert cmd[config_idx + 1] == "/app/secrets/orchestration.yaml"
def test_timeout_set(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
assert mock_run.call_args[1]["timeout"] == 600

View File

@@ -1,310 +0,0 @@
"""Tests for deploy/web_ui.py FastAPI web interface."""
from unittest.mock import patch, AsyncMock
import pytest
from fastapi.testclient import TestClient
# ── Fixtures ──────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def _reset_state():
"""Reset in-memory state between tests."""
import deploy.web_ui as mod
mod._valid_sessions.clear()
mod._jobs.clear()
yield
mod._valid_sessions.clear()
mod._jobs.clear()
@pytest.fixture
def client_no_auth():
"""Test client with auth disabled (no AUTH_PASSWORD)."""
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", ""):
from deploy.web_ui import app
yield TestClient(app, raise_server_exceptions=False)
@pytest.fixture
def client_with_auth():
"""Test client with auth enabled."""
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", "secret123"):
from deploy.web_ui import app
yield TestClient(app, raise_server_exceptions=False)
def _login(client, password="secret123"):
"""Helper: log in and return the session cookie."""
resp = client.post("/login", data={"password": password}, follow_redirects=False)
return resp.cookies.get("aa_session")
# ── Health check ──────────────────────────────────────────────────────
class TestHealthCheck:
def test_status_returns_ok(self, client_no_auth):
resp = client_no_auth.get("/status")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
def test_status_no_auth_required(self, client_with_auth):
resp = client_with_auth.get("/status")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
# ── Auth disabled ─────────────────────────────────────────────────────
class TestNoAuth:
def test_index_accessible(self, client_no_auth):
resp = client_no_auth.get("/")
assert resp.status_code == 200
assert "Auto Archiver" in resp.text
def test_login_page_redirects_to_index(self, client_no_auth):
resp = client_no_auth.get("/login", follow_redirects=False)
assert resp.status_code == 302
assert resp.headers["location"] == "/"
def test_login_post_redirects_to_index(self, client_no_auth):
resp = client_no_auth.post("/login", data={"password": "anything"}, follow_redirects=False)
assert resp.status_code == 302
def test_no_logout_link_shown(self, client_no_auth):
resp = client_no_auth.get("/")
assert "Logout" not in resp.text
# ── Auth enabled ──────────────────────────────────────────────────────
class TestAuth:
def test_index_redirects_to_login(self, client_with_auth):
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
assert resp.headers["location"] == "/login"
def test_login_page_renders(self, client_with_auth):
resp = client_with_auth.get("/login")
assert resp.status_code == 200
assert "Password" in resp.text
def test_wrong_password_returns_401(self, client_with_auth):
resp = client_with_auth.post("/login", data={"password": "wrong"})
assert resp.status_code == 401
assert "Wrong password" in resp.text
def test_correct_password_sets_cookie(self, client_with_auth):
resp = client_with_auth.post("/login", data={"password": "secret123"}, follow_redirects=False)
assert resp.status_code == 302
assert "aa_session" in resp.cookies
def test_authenticated_access(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/")
assert resp.status_code == 200
assert "Auto Archiver" in resp.text
def test_logout_clears_session(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/logout", follow_redirects=False)
assert resp.status_code == 302
# After logout, index should redirect to login again
client_with_auth.cookies.clear()
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
def test_logout_link_shown_when_auth_enabled(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/")
assert "Logout" in resp.text
def test_results_requires_auth(self, client_with_auth):
resp = client_with_auth.get("/results", follow_redirects=False)
assert resp.status_code == 307
def test_invalid_session_rejected(self, client_with_auth):
client_with_auth.cookies.set("aa_session", "bogus-token")
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
# ── Archive submission ────────────────────────────────────────────────
class TestArchive:
def test_archive_creates_job(self, client_no_auth):
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
resp = client_no_auth.post(
"/archive",
data={"urls": "https://example.com\nhttps://example.org"},
follow_redirects=False,
)
assert resp.status_code == 303
assert resp.headers["location"] == "/"
from deploy.web_ui import _jobs
assert len(_jobs) == 1
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
assert _jobs[0]["status"] == "running"
def test_archive_empty_urls_returns_400(self, client_no_auth):
resp = client_no_auth.post("/archive", data={"urls": " \n \n"})
assert resp.status_code == 400
def test_archive_strips_whitespace(self, client_no_auth):
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
client_no_auth.post(
"/archive",
data={"urls": " https://example.com \n\n https://example.org \n"},
follow_redirects=False,
)
from deploy.web_ui import _jobs
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
def test_archive_requires_auth(self, client_with_auth):
resp = client_with_auth.post(
"/archive",
data={"urls": "https://example.com"},
follow_redirects=False,
)
assert resp.status_code == 307
# ── Results page ──────────────────────────────────────────────────────
class TestResults:
def test_results_empty(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "No archived files yet" in resp.text
def test_results_lists_files(self, client_no_auth, tmp_path):
(tmp_path / "test.html").write_text("<html>archived</html>")
(tmp_path / "video.mp4").write_bytes(b"\x00" * 10)
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "test.html" in resp.text
assert "video.mp4" in resp.text
def test_results_nonexistent_dir(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path / "nonexistent"):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "No archived files yet" in resp.text
# ── File serving ──────────────────────────────────────────────────────
class TestFileServing:
def test_serve_existing_file(self, client_no_auth, tmp_path):
(tmp_path / "report.html").write_text("<html>done</html>")
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/files/report.html")
assert resp.status_code == 200
def test_serve_nonexistent_file(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/files/nope.txt")
assert resp.status_code == 404
def test_path_traversal_blocked(self, client_no_auth, tmp_path):
# Create a file outside the archive dir
outside = tmp_path / "outside"
outside.mkdir()
(outside / "secret.txt").write_text("secret")
archive = tmp_path / "archive"
archive.mkdir()
# Symlink into archive pointing outside
(archive / "escape").symlink_to(outside / "secret.txt")
with patch("deploy.web_ui.ARCHIVE_DIR", archive):
resp = client_no_auth.get("/files/escape")
assert resp.status_code == 403
# ── Job rendering ─────────────────────────────────────────────────────
class TestJobRendering:
def test_no_jobs_shows_message(self, client_no_auth):
resp = client_no_auth.get("/")
assert "No archiving jobs yet" in resp.text
def test_jobs_shown_in_table(self, client_no_auth):
from deploy.web_ui import _jobs
_jobs.append(
{
"id": 1,
"urls": ["https://example.com"],
"status": "done",
"started": "2026-01-01 00:00 UTC",
"output": "",
}
)
resp = client_no_auth.get("/")
assert "example.com" in resp.text
assert "done" in resp.text
def test_many_urls_truncated(self, client_no_auth):
from deploy.web_ui import _jobs
_jobs.append(
{
"id": 1,
"urls": [f"https://example.com/{i}" for i in range(10)],
"status": "running",
"started": "2026-01-01 00:00 UTC",
"output": "",
}
)
resp = client_no_auth.get("/")
assert "+7 more" in resp.text
# ── HTML template rendering ──────────────────────────────────────────
class TestTemplates:
"""Verify HTML templates can be .format()-ed without KeyError."""
def test_login_html_renders(self):
from deploy.web_ui import LOGIN_HTML
result = LOGIN_HTML.format(error="")
assert "Auto Archiver" in result
def test_login_html_renders_with_error(self):
from deploy.web_ui import LOGIN_HTML
result = LOGIN_HTML.format(error='<p class="err">Nope</p>')
assert "Nope" in result
def test_main_html_renders(self):
from deploy.web_ui import MAIN_HTML
result = MAIN_HTML.format(logout="", jobs_html="")
assert "Auto Archiver" in result
def test_results_html_renders(self):
from deploy.web_ui import RESULTS_HTML
result = RESULTS_HTML.format(file_list="<p>empty</p>")
assert "Archived Files" in result

View File

@@ -1,269 +0,0 @@
#!/usr/bin/env python3
"""
Minimal web UI for auto-archiver cloud deployments.
Provides:
- GET / → HTML form to submit URLs for archiving
- POST /archive → Runs auto-archiver on submitted URLs
- GET /results → Lists archived files available for download
- GET /files/{path} → Serves archived files
- GET /status → Health check
"""
import asyncio
import html
import os
import secrets
from datetime import datetime, timezone
from pathlib import Path
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
AUTH_PASSWORD = os.environ.get("AUTH_PASSWORD", "")
ARCHIVE_DIR = Path("/app/local_archive")
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
COOKIE_NAME = "aa_session"
# In-memory session tokens (reset on restart, which is fine for this use case)
_valid_sessions: set[str] = set()
# In-memory job log
_jobs: list[dict] = []
app = FastAPI(title="Auto Archiver", docs_url=None, redoc_url=None)
# ── Auth helpers ──────────────────────────────────────────────────────
def _check_auth(request: Request):
"""Dependency: redirect to /login if auth is enabled and session is missing."""
if not AUTH_PASSWORD:
return # auth disabled
token = request.cookies.get(COOKIE_NAME, "")
if token not in _valid_sessions:
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
headers={"Location": "/login"},
)
# ── Pages ─────────────────────────────────────────────────────────────
LOGIN_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver Login</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 420px; margin: 80px auto; padding: 0 1rem; }}
h1 {{ font-size: 1.4rem; }}
input[type=password], button {{ font-size: 1rem; padding: .5rem .8rem; }}
input[type=password] {{ width: 100%; box-sizing: border-box; margin: .5rem 0; }}
button {{ cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; }}
.err {{ color: #dc2626; }}
</style></head><body>
<h1>🔐 Auto Archiver</h1>
<form method="POST" action="/login">
<label>Password<br><input type="password" name="password" autofocus required></label><br>
<button type="submit">Log in</button>
{error}
</form></body></html>"""
MAIN_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; line-height: 1.6; }}
h1 {{ font-size: 1.5rem; }}
textarea {{ width: 100%; box-sizing: border-box; font-size: .95rem; font-family: monospace; }}
button {{ font-size: 1rem; padding: .5rem 1.2rem; cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; margin-top: .5rem; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 1rem; }}
th, td {{ border: 1px solid #e5e7eb; padding: .4rem .6rem; text-align: left; font-size: .9rem; }}
th {{ background: #f9fafb; }}
.status {{ padding: 2px 8px; border-radius: 4px; font-size: .85rem; }}
.running {{ background: #fef3c7; color: #92400e; }}
.done {{ background: #d1fae5; color: #065f46; }}
.failed {{ background: #fee2e2; color: #991b1b; }}
a {{ color: #2563eb; }}
.info {{ color: #6b7280; font-size: .9rem; }}
nav {{ display: flex; gap: 1rem; align-items: center; }}
nav a {{ text-decoration: none; }}
</style></head><body>
<nav>
<h1>📦 Auto Archiver</h1>
<a href="/results">Browse files</a>
{logout}
</nav>
<form method="POST" action="/archive">
<label for="urls"><strong>URLs to archive</strong> (one per line)</label><br>
<textarea id="urls" name="urls" rows="5" placeholder="https://example.com/post&#10;https://youtube.com/watch?v=..." required></textarea><br>
<button type="submit">Archive</button>
</form>
{jobs_html}
</body></html>"""
RESULTS_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver Files</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; }}
h1 {{ font-size: 1.4rem; }}
a {{ color: #2563eb; }}
li {{ margin: .3rem 0; font-family: monospace; font-size: .9rem; }}
</style></head><body>
<h1>📁 Archived Files</h1>
<p><a href="/">← Back</a></p>
{file_list}
</body></html>"""
# ── Routes ────────────────────────────────────────────────────────────
@app.get("/login", response_class=HTMLResponse)
async def login_page():
if not AUTH_PASSWORD:
return RedirectResponse("/", status_code=302)
return LOGIN_HTML.format(error="")
@app.post("/login")
async def login_submit(password: str = Form(...)):
if not AUTH_PASSWORD:
return RedirectResponse("/", status_code=302)
if password != AUTH_PASSWORD:
return HTMLResponse(
LOGIN_HTML.format(error='<p class="err">Wrong password.</p>'),
status_code=401,
)
token = secrets.token_urlsafe(32)
_valid_sessions.add(token)
resp = RedirectResponse("/", status_code=302)
resp.set_cookie(COOKIE_NAME, token, httponly=True, samesite="lax", max_age=86400 * 30)
return resp
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, _=Depends(_check_auth)):
logout = '<a href="/logout">Logout</a>' if AUTH_PASSWORD else ""
jobs_html = _render_jobs()
return MAIN_HTML.format(logout=logout, jobs_html=jobs_html)
@app.post("/archive")
async def archive(request: Request, urls: str = Form(...), _=Depends(_check_auth)):
url_list = [u.strip() for u in urls.strip().splitlines() if u.strip()]
if not url_list:
raise HTTPException(400, "No URLs provided")
job = {
"id": len(_jobs) + 1,
"urls": url_list,
"status": "running",
"started": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
"output": "",
}
_jobs.insert(0, job)
# Run in background so the user sees the page immediately
asyncio.create_task(_run_archive(job))
return RedirectResponse("/", status_code=303)
@app.get("/results", response_class=HTMLResponse)
async def results(request: Request, _=Depends(_check_auth)):
if not ARCHIVE_DIR.exists():
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
files = sorted(ARCHIVE_DIR.rglob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
files = [f for f in files if f.is_file()]
if not files:
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
items = []
for f in files[:200]: # cap listing
rel = f.relative_to(ARCHIVE_DIR)
items.append(f'<li><a href="/files/{rel}">{html.escape(str(rel))}</a></li>')
return RESULTS_HTML.format(file_list="<ul>" + "\n".join(items) + "</ul>")
@app.get("/files/{path:path}")
async def serve_file(path: str, request: Request, _=Depends(_check_auth)):
full = ARCHIVE_DIR / path
if not full.exists() or not full.is_file():
raise HTTPException(404, "File not found")
# Security: ensure the resolved path is within ARCHIVE_DIR
try:
full.resolve().relative_to(ARCHIVE_DIR.resolve())
except ValueError:
raise HTTPException(403, "Forbidden")
return FileResponse(full)
@app.get("/status")
async def health():
return {"status": "ok"}
@app.get("/logout")
async def logout(request: Request):
token = request.cookies.get(COOKIE_NAME, "")
_valid_sessions.discard(token)
resp = RedirectResponse("/login", status_code=302)
resp.delete_cookie(COOKIE_NAME)
return resp
# ── Helpers ───────────────────────────────────────────────────────────
async def _run_archive(job: dict):
"""Run auto-archiver as a subprocess for the given URLs."""
cmd = [
"python3",
"-m",
"auto_archiver",
"--config",
str(CONFIG_PATH),
] + job["urls"]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd="/app",
)
stdout, _ = await proc.communicate()
job["output"] = stdout.decode(errors="replace")[-5000:] # keep last 5k chars
job["status"] = "done" if proc.returncode == 0 else "failed"
except Exception as e:
job["output"] = str(e)
job["status"] = "failed"
def _render_jobs() -> str:
if not _jobs:
return '<p class="info">No archiving jobs yet. Submit URLs above to get started.</p>'
rows = []
for j in _jobs[:50]:
urls_str = html.escape(", ".join(j["urls"][:3]))
if len(j["urls"]) > 3:
urls_str += f" (+{len(j['urls']) - 3} more)"
status_cls = j["status"]
rows.append(
f"<tr><td>{j['id']}</td>"
f"<td>{urls_str}</td>"
f'<td><span class="status {status_cls}">{j["status"]}</span></td>'
f"<td>{j['started']}</td></tr>"
)
return (
"<h2>Recent Jobs</h2>"
"<table><thead><tr><th>#</th><th>URLs</th><th>Status</th><th>Started</th></tr></thead>"
"<tbody>" + "\n".join(rows) + "</tbody></table>"
)

1260
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.2.5"
version = "1.2.7"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"

View File

@@ -1,99 +0,0 @@
{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"dockerfilePath": "deploy/Dockerfile"
},
"deploy": {
"startCommand": "python3 -m deploy.start",
"healthcheckPath": "/status",
"healthcheckTimeout": 30,
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 5
},
"variables": {
"AUTH_PASSWORD": {
"description": "Password to access your archiver web interface",
"required": true
},
"GSHEET_URL": {
"description": "Google Sheet URL to monitor for new URLs (leave empty to disable)",
"required": false,
"default": ""
},
"GOOGLE_SERVICE_ACCOUNT_JSON": {
"description": "Full JSON contents of your Google service account key (required for Sheets)",
"required": false,
"default": ""
},
"POLL_INTERVAL": {
"description": "Seconds between Google Sheet checks (min 60)",
"required": false,
"default": "300"
},
"S3_BUCKET": {
"description": "S3 bucket name for storage (leave empty for local-only)",
"required": false,
"default": ""
},
"S3_KEY": {
"description": "S3 access key ID",
"required": false,
"default": ""
},
"S3_SECRET": {
"description": "S3 secret access key",
"required": false,
"default": ""
},
"S3_REGION": {
"description": "S3 region (e.g. us-east-1, nyc3 for DO Spaces)",
"required": false,
"default": "us-east-1"
},
"S3_ENDPOINT": {
"description": "S3 endpoint URL template",
"required": false,
"default": "https://s3.{region}.amazonaws.com"
},
"S3_CDN_URL": {
"description": "Public CDN URL template for archived files",
"required": false,
"default": "https://{bucket}.s3.{region}.amazonaws.com/{key}"
},
"TELEGRAM_API_ID": {
"description": "Telegram API ID from https://my.telegram.org",
"required": false,
"default": ""
},
"TELEGRAM_API_HASH": {
"description": "Telegram API hash from https://my.telegram.org",
"required": false,
"default": ""
},
"TELEGRAM_BOT_TOKEN": {
"description": "Telegram bot token from @BotFather",
"required": false,
"default": ""
},
"ENABLE_SCREENSHOTS": {
"description": "Set to true to capture full-page screenshots",
"required": false,
"default": "false"
},
"ENABLE_THUMBNAILS": {
"description": "Set to true to generate video thumbnails",
"required": false,
"default": "false"
},
"ENABLE_CSV_DB": {
"description": "Set to true to save a CSV log of archived items",
"required": false,
"default": "false"
},
"LOG_LEVEL": {
"description": "Logging level: DEBUG, INFO, WARNING, ERROR",
"required": false,
"default": "INFO"
}
}
}

View File

@@ -11,6 +11,7 @@ Key Functionalities:
from __future__ import annotations
import hashlib
import os
from typing import Any, List, Union, Dict
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
@@ -186,6 +187,9 @@ class Metadata:
continue
h = m.get("hash")
if not h:
if not os.path.exists(m.filename):
logger.warning(f"Skipping missing media file: {m.filename}")
continue
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
if len(h) and h in media_hashes:
continue

View File

@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
def check_for_updates(self):
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
try:
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
except Exception as e:
logger.debug(f"Unable to check for updates: {e}")
return
latest_version = version.parse(response["info"]["version"])
current_version = version.parse(__version__)
# check version compared to current version

View File

@@ -575,6 +575,8 @@ class GenericExtractor(Extractor):
"--live-from-start" if self.live_from_start else "--no-live-from-start",
"--postprocessor-args",
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
"--js-runtimes",
"node", # yt-dlp defaults to deno-only; node is available in the base image
]
# proxy handling

View File

@@ -0,0 +1 @@
from .ghostarchive_enricher import GhostarchiveEnricher

View File

@@ -0,0 +1,58 @@
{
"name": "Ghost Archive Enricher",
"type": ["enricher"],
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
"requires_setup": False,
"dependencies": {
"python": ["loguru", "requests", "bs4", "seleniumbase"],
},
"configs": {
"timeout": {
"default": 120,
"type": "int",
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
},
"check_existing": {
"default": True,
"type": "bool",
"help": "whether to search for an existing archive before submitting a new one.",
},
"proxy_http": {
"default": None,
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
},
"proxy_https": {
"default": None,
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
},
},
"description": """
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
### Features
- Archives any public URL using the Ghost Archive service.
- Optionally checks for existing archives before submitting a new one.
- Supports HTTP and HTTPS proxies for requests.
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
### Important
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
It does **not** verify the contents or completeness of the archived page.
### Notes
- Ghost Archive is a free service with no authentication required.
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
- Archival may take up to 5 minutes depending on the queue and page complexity.
- Archived content is stored indefinitely.
- Ghost Archive does not archive pages that require authentication or form submission.
### Limitations
- No official API — this module interacts with the Ghost Archive web interface.
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
- Searching for existing archives uses plain HTTP requests and does not require a browser.
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
""",
}

View File

@@ -0,0 +1,153 @@
import time
import re
import requests
from bs4 import BeautifulSoup
from seleniumbase import SB
from auto_archiver.utils.custom_logger import logger
from auto_archiver.utils import url as UrlUtil
from auto_archiver.core import Enricher, Metadata
class GhostarchiveEnricher(Enricher):
"""
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
and stores the archived page URL as enrichment metadata.
Ghost Archive has no official API — this module interacts with the web form
and parses HTML responses. The submission endpoint is protected by Cloudflare,
so a headless browser (SeleniumBase) is used for archival submissions, while
plain HTTP requests are used for searching existing archives.
Note: this module only confirms that Ghost Archive accepted the submission
and returned an archive URL. It does not verify that the archived page
content is complete or correctly rendered.
"""
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
def _get_proxies(self) -> dict:
proxies = {}
if self.proxy_http:
proxies["http"] = self.proxy_http
if self.proxy_https:
proxies["https"] = self.proxy_https
return proxies
def _get_headers(self) -> dict:
return {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
def _normalize_archive_href(self, href: str) -> str | None:
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
if "/archive/" not in href or "/replay/" in href:
return None
if href.startswith("/"):
return f"{self.GHOSTARCHIVE_BASE}{href}"
if href.startswith("http://ghostarchive.org"):
return href.replace("http://", "https://")
if href.startswith("https://ghostarchive.org"):
return href
return None
def _search_existing(self, url: str) -> str | None:
"""
Search Ghost Archive for an existing archive of the given URL.
Returns the archive URL if found, otherwise None.
"""
try:
r = requests.get(
self.SEARCH_ENDPOINT,
params={"term": url},
headers=self._get_headers(),
proxies=self._get_proxies(),
timeout=30,
)
if r.status_code != 200:
logger.warning(f"Ghost Archive search returned status {r.status_code}")
return None
soup = BeautifulSoup(r.text, "html.parser")
for link in soup.find_all("a", href=True):
archive_url = self._normalize_archive_href(link["href"])
if archive_url:
logger.info(f"Found existing Ghost Archive: {archive_url}")
return archive_url
except requests.exceptions.RequestException as e:
logger.warning(f"Ghost Archive search failed: {e}")
return None
def _submit_url(self, url: str) -> str | None:
"""
Submit a URL to Ghost Archive for archiving using a headless browser.
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
Returns the archive URL if successful, otherwise None.
"""
try:
with SB(uc=True, headless=True) as sb:
logger.debug("Opening Ghost Archive homepage in headless browser")
sb.open(self.GHOSTARCHIVE_BASE)
# fill in the archive form and submit
sb.type('input[name="archive"]', url)
sb.click('input[type="submit"][value="Submit for archival"]')
# wait for navigation to /archive/{id} or timeout
start_time = time.time()
while time.time() - start_time < self.timeout:
current_url = sb.get_current_url()
if self.ARCHIVE_URL_PATTERN.search(current_url):
archive_url = current_url.split("?")[0]
logger.info(f"Ghost Archive saved: {archive_url}")
return archive_url
time.sleep(2)
# if we didn't redirect, try parsing the page source
page_source = sb.get_page_source()
return self._parse_archive_url(page_source)
except Exception as e:
logger.warning(f"Ghost Archive submission failed: {e}")
return None
def _parse_archive_url(self, html: str) -> str | None:
"""Parse HTML response to find an archive URL."""
soup = BeautifulSoup(html, "html.parser")
for link in soup.find_all("a", href=True):
archive_url = self._normalize_archive_href(link["href"])
if archive_url:
return archive_url
return None
def enrich(self, to_enrich: Metadata) -> bool:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
return False
if to_enrich.get("ghostarchive"):
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
return True
# optionally check for existing archive first
archive_url = None
if self.check_existing:
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
archive_url = self._search_existing(url)
if not archive_url:
logger.debug(f"Submitting {url} to Ghost Archive")
archive_url = self._submit_url(url)
if archive_url:
to_enrich.set("ghostarchive", archive_url)
return True
logger.warning(f"Ghost Archive failed to archive {url}")
return False

View File

@@ -64,7 +64,6 @@ class DeletionIndicators:
# YouTube deletion indicators
YOUTUBE = [
"This video isn't available anymore",
"Video unavailable",
"This video has been removed",
"This video is no longer available",
"This video is private",

View File

@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
directory = os.path.dirname(base_filename) # '/get/path/to'
basename = os.path.basename(base_filename) # 'file'
for f in os.listdir(directory):
# skip incomplete downloads left behind by yt-dlp
if f.endswith(".part"):
continue
if (
f.startswith(basename)
or (entry_url and os.path.splitext(f)[0] in entry_url)

View File

@@ -0,0 +1,277 @@
import pytest
import requests
import os
from unittest.mock import MagicMock
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
# sample HTML responses for mocking
SEARCH_HTML_FOUND = """
<html><body>
<h1>Archives for https://example.com</h1>
<table>
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
</table>
</body></html>
"""
SEARCH_HTML_NOT_FOUND = """
<html><body>
<h1>Archives for https://example.com</h1>
<p>Page 0 out of 0</p>
<p>No archives for that site.</p>
</body></html>
"""
SAVE_RESPONSE_HTML_WITH_LINK = """
<html><body>
<h1>Archive saved</h1>
<a href="/archive/Xyz99">View archive</a>
</body></html>
"""
ENRICHER_CONFIG = {
"timeout": 120,
"check_existing": True,
"proxy_http": None,
"proxy_https": None,
}
class TestGhostarchiveEnricher:
"""Tests for Ghost Archive Enricher"""
@pytest.fixture(autouse=True)
def setup_enricher(self, setup_module):
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
def test_search_existing_found(self, mocker):
"""When an existing archive is found, it should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_FOUND
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result == "https://ghostarchive.org/archive/Abc12"
def test_search_existing_not_found(self, mocker):
"""When no existing archive is found, None should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_NOT_FOUND
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_search_existing_request_error(self, mocker):
"""When search request fails, None should be returned."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
side_effect=requests.exceptions.ConnectionError("connection failed"),
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_search_existing_non_200(self, mocker):
"""When search returns non-200, None should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 503
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_submit_url_success_redirect(self, mocker):
"""Successful submission via headless browser should return archive URL."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/NewId1"
mock_sb.type.assert_called_once()
mock_sb.click.assert_called_once()
def test_submit_url_success_redirect_strips_query(self, mocker):
"""Redirect URL query params should be stripped."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/NewId1"
def test_submit_url_success_html_fallback(self, mocker):
"""When browser doesn't redirect, should parse page source for archive link."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
self.enricher.timeout = 0
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/Xyz99"
def test_submit_url_browser_error(self, mocker):
"""Browser error during submission should return None."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
side_effect=Exception("browser failed to start"),
)
result = self.enricher._submit_url("https://example.com")
assert result is None
def test_proxy_configuration(self, mocker):
"""Proxies should be passed to search requests when configured."""
self.enricher.proxy_http = "http://proxy:8080"
self.enricher.proxy_https = "https://proxy:8443"
mock_get = mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
)
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_FOUND
mock_get.return_value = mock_response
result = self.enricher._search_existing("https://example.com")
call_kwargs = mock_get.call_args
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
assert result is not None
def test_parse_archive_url_with_replay_links(self):
"""Parser should ignore /replay/ links and only return /archive/ links."""
html = """
<html><body>
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
<a href="/archive/Valid1">valid</a>
</body></html>
"""
result = self.enricher._parse_archive_url(html)
assert result == "https://ghostarchive.org/archive/Valid1"
def test_parse_archive_url_no_links(self):
"""Parser should return None when no archive links found."""
html = "<html><body><p>No archive here</p></body></html>"
result = self.enricher._parse_archive_url(html)
assert result is None
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
"""enrich() should set 'ghostarchive' key on the metadata object."""
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
"""enrich() should skip if ghostarchive key is already set."""
mock_search = mocker.patch.object(self.enricher, "_search_existing")
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
result = self.enricher.enrich(item)
assert result is True
mock_search.assert_not_called()
def test_enrich_returns_false_on_failure(self, mocker, make_item):
"""enrich() should return False when both search and submit fail."""
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is False
def test_enrich_skips_auth_wall(self, mocker, make_item):
"""enrich() should skip URLs behind auth walls."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
)
item = make_item("https://example.com/login")
result = self.enricher.enrich(item)
assert result is False
def test_enrich_with_existing_archive(self, mocker, make_item):
"""enrich() should use existing archive when check_existing is True."""
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
mock_submit.assert_not_called()
def test_enrich_submits_when_no_existing(self, mocker, make_item):
"""enrich() should submit URL when no existing archive found."""
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
"""enrich() should skip search when check_existing is False."""
self.enricher.check_existing = False
mock_search = mocker.patch.object(self.enricher, "_search_existing")
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
mock_search.assert_not_called()
@pytest.mark.download
def test_real_search_existing(self, setup_module):
"""Integration test: search for an existing archive on Ghost Archive."""
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
# example.com is commonly archived
result = enricher._search_existing("https://example.com")
# we just check it doesn't crash; result may or may not be found
assert result is None or result.startswith("https://ghostarchive.org/archive/")
@pytest.mark.download
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
def test_real_submit_example_com(self, setup_module, make_item):
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
item = make_item("https://example.com")
result = enricher.enrich(item)
assert result is True
archive_url = item.get("ghostarchive")
assert archive_url is not None
assert archive_url.startswith("https://ghostarchive.org/archive/")

View File

@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
assert basic_metadata.get_media_by_id("m1") == media1
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
real_file = tmp_path / "exists.txt"
real_file.write_text("content")
valid = media_file(filename=str(real_file), hash_value="abc")
missing = media_file(filename="/nonexistent/path/gone.mp4")
basic_metadata.add_media(valid, "valid")
basic_metadata.add_media(missing, "missing")
assert len(basic_metadata.media) == 2
basic_metadata.remove_duplicate_media_by_hash()
assert len(basic_metadata.media) == 1
assert basic_metadata.get_media_by_id("valid") == valid
def test_success():
m = Metadata()
assert not m.is_success()

View File

@@ -1,5 +1,6 @@
import pytest
from argparse import ArgumentParser, ArgumentTypeError
from requests.exceptions import SSLError
from auto_archiver.core.orchestrator import ArchivingOrchestrator
from auto_archiver.version import __version__
from auto_archiver.core.config import read_yaml, store_yaml
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
assert "Error during setup of modules: Test exception" in caplog.text
# make sure the 'cleanup' is called
assert "cleanup" in caplog.text
def test_check_for_updates_ssl_error(orchestrator, mocker):
"""check_for_updates should not raise when the HTTP request fails."""
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
side_effect=SSLError("SSL handshake failed"),
)
# should not raise
orchestrator.check_for_updates()
def test_check_for_updates_timeout(orchestrator, mocker):
"""check_for_updates should not raise on connection timeout."""
from requests.exceptions import ConnectionError
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
side_effect=ConnectionError("Connection refused"),
)
orchestrator.check_for_updates()
def test_check_for_updates_new_version_available(orchestrator, mocker):
"""check_for_updates should not raise when a newer version exists."""
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
)
# should complete without error
orchestrator.check_for_updates()

View File

@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
calculate_file_hash,
random_str,
get_timestamp,
ydl_entry_to_filename,
)
@@ -139,3 +140,47 @@ class TestMiscUtils:
def test_invalid_timestamp_returns_none(self):
assert get_timestamp("invalid-date") is None
class TestYdlEntryToFilename:
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
def _make_mock_ydl(self, prepared_filename):
class MockYDL:
def prepare_filename(self, entry):
return prepared_filename
return MockYDL()
def test_returns_exact_file_if_exists(self, tmp_path):
video = tmp_path / "video.mp4"
video.write_bytes(b"data")
ydl = self._make_mock_ydl(str(video))
assert ydl_entry_to_filename(ydl, {}) == str(video)
def test_skips_part_file_returns_complete(self, tmp_path):
"""Simulates yt-dlp leaving a .part file from a failed format
while a complete .webm exists."""
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
webm = tmp_path / "f5U3IKfoSYs.webm"
webm.write_bytes(b"complete video")
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
result = ydl_entry_to_filename(ydl, {})
assert result == str(webm)
assert not result.endswith(".part")
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
"""Only a .part file exists — should return False."""
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
assert ydl_entry_to_filename(ydl, {}) is False
def test_returns_false_when_no_files_match(self, tmp_path):
(tmp_path / "unrelated.txt").write_bytes(b"data")
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
assert ydl_entry_to_filename(ydl, {}) is False