Compare commits

..

18 Commits

Author SHA1 Message Date
msramalho
ac4c09810b experimental feature for one-click deployment 2026-03-12 11:47:20 +00:00
msramalho
3194fee95d fix telethon bug when running in celery workers that close the event loop 2026-03-12 10:20:11 +00:00
msramalho
0040810e2e dependencies bump 2026-03-10 14:33:25 +00:00
msramalho
23a88e3cf4 ci issues 2026-03-02 17:07:09 +00:00
msramalho
3cac160cc1 version bump 2026-03-02 17:01:33 +00:00
msramalho
e9a92272c5 bug fix: missing filename on url download 2026-03-02 17:01:16 +00:00
Miguel Sozinho Ramalho
5d6c5ac2b1 Merge pull request #406 from bellingcat/dev
1.2.3
2026-03-02 15:42:08 +00:00
msramalho
f1de07c9aa version bump 2026-03-02 15:41:03 +00:00
msramalho
1e1e060a77 closes #342 2026-03-02 15:37:55 +00:00
msramalho
b43d229326 closes #358 2026-03-02 14:27:48 +00:00
msramalho
077b03fc61 minor tests change to work in gh actions 2026-03-02 14:08:14 +00:00
Miguel Sozinho Ramalho
cf77cfa64d Merge pull request #405 from bellingcat/feat/nitter-alternative
closes #400 Feat twitter drop-in alternative
2026-03-02 12:33:34 +00:00
msramalho
bc66dd4f2a fxtwitter working instead of nitter 2026-03-02 12:31:28 +00:00
msramalho
139d647197 Merge branch 'dev' into feat/nitter-alternative 2026-03-02 12:16:22 +00:00
msramalho
f465b570cd adding missing tests (no download) 2026-03-02 12:14:47 +00:00
Miguel Sozinho Ramalho
52a7cabaf1 Merge pull request #402 from bellingcat/dev
bug fix: wacz screenshots leak in shared session
2026-02-25 10:39:54 +00:00
msramalho
a739361e12 bug fix: wacz screenshots leak in shared session 2026-02-23 16:26:36 +00:00
msramalho
b9ab26ed5a see #400 WIP nitter not working as of now 2026-02-23 12:20:10 +00:00
42 changed files with 3234 additions and 388 deletions

29
.github/workflows/tests-deploy.yaml vendored Normal file
View File

@@ -0,0 +1,29 @@
name: Deploy Tests
on:
push:
branches: [ main ]
paths:
- deploy/**
pull_request:
paths:
- deploy/**
jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Python 3.12
uses: actions/setup-python@v6
with:
python-version: "3.12"
- name: Install dependencies
run: pip install pytest fastapi httpx python-multipart pyyaml
- name: Run Deploy Tests
working-directory: deploy
run: python -m pytest tests/ -v

View File

@@ -41,11 +41,21 @@ COPY ./src/ .
RUN /poetry-venv/bin/poetry install --only main --no-cache
# Run as non-root user to avoid permission issues with mounted volumes (see #342)
# The base image already has an 'ubuntu' user at UID/GID 1000.
# Ensure directories that need write access at runtime are writable.
RUN chown 1000:1000 /app && \
chown -R 1000:1000 /app/.venv/lib/python3.12/site-packages/seleniumbase/drivers/ && \
mkdir -p /app/local_archive /app/secrets /tmp/archive && \
chown -R 1000:1000 /app/local_archive /app/secrets /tmp/archive
# Update PATH to include virtual environment binaries
# Allowing entry point to run the application directly with Python
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
USER 1000
ENTRYPOINT ["python3", "-m", "auto_archiver"]
# should be executed with 2 volumes (3 if local_storage is used)

View File

@@ -22,7 +22,40 @@ Auto Archiver is a Python tool to automatically archive content on the web in a
Read the [article about Auto Archiver on bellingcat.com](https://www.bellingcat.com/resources/2022/09/22/preserve-vital-online-content-with-bellingcats-auto-archiver-tool/).
## Installation
## One-Click Cloud Deploy
Deploy your own Auto Archiver instance to the cloud — no coding required:
[![Deploy on Railway](https://railway.app/button.svg)](https://railway.app/new/template?template=https://github.com/bellingcat/auto-archiver&envs=AUTH_PASSWORD,GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&optionalEnvs=GSHEET_URL,GOOGLE_SERVICE_ACCOUNT_JSON,POLL_INTERVAL,S3_BUCKET,S3_KEY,S3_SECRET,S3_REGION,TELEGRAM_API_ID,TELEGRAM_API_HASH,TELEGRAM_BOT_TOKEN,ENABLE_SCREENSHOTS,LOG_LEVEL&AUTH_PASSWORDDesc=Password+to+access+your+archiver+web+interface&GSHEET_URLDesc=Google+Sheet+URL+to+monitor+for+new+URLs+(leave+empty+to+disable)&POLL_INTERVALDesc=Seconds+between+Google+Sheet+checks+(min+60)&POLL_INTERVALDefault=300&S3_BUCKETDesc=S3+bucket+name+for+storage+(leave+empty+for+local+only)&S3_REGIONDefault=us-east-1&LOG_LEVELDefault=INFO)
**What you get:** A web interface where you can paste URLs and archive them instantly. Optionally connect a Google Sheet for automated monitoring, S3 for cloud storage, and Telegram for archiving channels.
**Only required setting:** `AUTH_PASSWORD` — everything else is optional and can be configured later via the Railway dashboard.
<details>
<summary>📋 Environment variables reference</summary>
| Variable | Required | Description |
|----------|----------|-------------|
| `AUTH_PASSWORD` | **Yes** | Password to access the web interface |
| `GSHEET_URL` | No | Google Sheet URL to monitor for new URLs [use this template](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?gid=0#gid=0) |
| `GOOGLE_SERVICE_ACCOUNT_JSON` | No | Google service account JSON (required with Sheets) [follow these instructions](https://auto-archiver.readthedocs.io/en/v1.0.1/how_to/gsheets_setup.html) |
| `POLL_INTERVAL` | No | Seconds between Sheet checks (default: 300) |
| `S3_BUCKET` | No | S3 bucket name for archived content, ideal for cloud hosting your archives but not mandatory, any S3-compatible storage works |
| `S3_KEY` / `S3_SECRET` | No | S3 credentials |
| `S3_REGION` | No | S3 region (default: us-east-1) |
| `S3_ENDPOINT` | No | S3 endpoint URL |
| `TELEGRAM_API_ID` / `TELEGRAM_API_HASH` | No | Telegram API credentials |
| `TELEGRAM_BOT_TOKEN` | No | Telegram bot token |
| `ENABLE_SCREENSHOTS` | No | Set to `true` for full-page screenshots |
| `ENABLE_THUMBNAILS` | No | Set to `true` for video thumbnails |
| `ENABLE_CSV_DB` | No | Set to `true` for CSV logging |
| `LOG_LEVEL` | No | DEBUG, INFO, WARNING, ERROR (default: INFO) |
</details>
## Traditional Installation
View the [Installation Guide](https://auto-archiver.readthedocs.io/en/latest/installation/installation.html) for full instructions

34
deploy/Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# ── Cloud Deploy ──────────────────────────────────────────────────────
# Thin web UI + config generator layer on top of the published
# auto-archiver Docker image. Used by the Railway one-click deploy.
#
# Build:
# docker build -f deploy/Dockerfile -t auto-archiver-deploy .
#
# Run:
# docker run -p 8080:8080 -e PORT=8080 -e AUTH_PASSWORD=secret auto-archiver-deploy
# ──────────────────────────────────────────────────────────────────────
FROM bellingcat/auto-archiver:latest
USER root
# Install the lightweight web layer dependencies
RUN pip install --no-cache-dir fastapi uvicorn[standard] python-multipart pyyaml
# Copy deploy scripts into the image
COPY deploy/ /app/deploy/
# Ensure writable dirs exist
RUN mkdir -p /app/local_archive /app/secrets && \
chown -R 1000:1000 /app/local_archive /app/secrets /app/deploy
USER 1000
# Railway sets PORT; default to 8080
ENV PORT=8080
EXPOSE ${PORT}
# Override the CLI entrypoint with the web server
ENTRYPOINT ["python3", "-m", "deploy.start"]

1
deploy/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Cloud deployment layer for auto-archiver

163
deploy/generate_config.py Normal file
View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Generates orchestration.yaml from environment variables.
This script bridges Railway's env-var-based configuration with
auto-archiver's YAML-based configuration system. It runs at container
startup before the web UI server starts.
"""
import os
from pathlib import Path
import yaml
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
SECRETS_DIR = Path("/app/secrets")
def build_config() -> dict:
"""Build an orchestration config dict from environment variables."""
# -- Base config: always present ------------------------------------
config = {
"steps": {
"feeders": ["cli_feeder"],
"extractors": ["generic_extractor"],
"enrichers": ["hash_enricher"],
"databases": ["console_db"],
"storages": ["local_storage"],
"formatters": ["html_formatter"],
},
"logging": {
"level": os.environ.get("LOG_LEVEL", "INFO"),
},
"local_storage": {
"save_to": "/app/local_archive",
"path_generator": "flat",
"filename_generator": "static",
},
"generic_extractor": {
"subtitles": os.environ.get("SUBTITLES", "false").lower() == "true",
"comments": False,
"livestreams": False,
"live_from_start": False,
"end_means_success": True,
"allow_playlist": False,
},
"hash_enricher": {
"algorithm": "SHA-256",
},
"html_formatter": {
"detect_thumbnails": True,
},
"authentication": {},
}
# -- Google Sheets feeder (optional) --------------------------------
gsheet_url = os.environ.get("GSHEET_URL", "")
if gsheet_url:
config["steps"]["feeders"].append("gsheet_feeder")
config["steps"]["databases"].append("gsheet_db")
config["gsheet_feeder"] = {
"sheet": gsheet_url,
"header": 1,
"service_account": str(SECRETS_DIR / "service_account.json"),
"use_sheet_names_in_stored_paths": False,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "textual content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
},
}
# -- Google service account JSON (optional) -------------------------
sa_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")
if sa_json:
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
sa_path = SECRETS_DIR / "service_account.json"
sa_path.write_text(sa_json)
print(f"[deploy] Wrote Google service account to {sa_path}")
# -- S3 storage (optional) ------------------------------------------
s3_bucket = os.environ.get("S3_BUCKET", "")
if s3_bucket:
config["steps"]["storages"].append("s3_storage")
config["s3_storage"] = {
"bucket": s3_bucket,
"region": os.environ.get("S3_REGION", "us-east-1"),
"key": os.environ.get("S3_KEY", ""),
"secret": os.environ.get("S3_SECRET", ""),
"endpoint_url": os.environ.get("S3_ENDPOINT", "https://s3.{region}.amazonaws.com"),
"cdn_url": os.environ.get(
"S3_CDN_URL",
"https://{bucket}.s3.{region}.amazonaws.com/{key}",
),
"private": os.environ.get("S3_PRIVATE", "false").lower() == "true",
"random_no_duplicate": True,
"key_path": "random",
}
# -- Telegram extractor (optional) ----------------------------------
tg_api_id = os.environ.get("TELEGRAM_API_ID", "")
tg_api_hash = os.environ.get("TELEGRAM_API_HASH", "")
if tg_api_id and tg_api_hash:
config["steps"]["extractors"].append("telegram_extractor")
config["telegram_extractor"] = {
"api_id": tg_api_id,
"api_hash": tg_api_hash,
}
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
if bot_token:
config["telegram_extractor"]["bot_token"] = bot_token
# -- Screenshot enricher (optional) ---------------------------------
if os.environ.get("ENABLE_SCREENSHOTS", "").lower() == "true":
config["steps"]["enrichers"].append("screenshot_enricher")
config["screenshot_enricher"] = {
"width": 1280,
"height": 7200,
"save_to_pdf": True,
}
# -- Thumbnail enricher (optional) ----------------------------------
if os.environ.get("ENABLE_THUMBNAILS", "").lower() == "true":
config["steps"]["enrichers"].append("thumbnail_enricher")
config["thumbnail_enricher"] = {
"thumbnails_per_minute": 60,
"max_thumbnails": 16,
}
# -- CSV database (optional) ----------------------------------------
if os.environ.get("ENABLE_CSV_DB", "").lower() == "true":
config["steps"]["databases"].append("csv_db")
config["csv_db"] = {
"csv_file": "/app/local_archive/db.csv",
}
return config
def main():
config = build_config()
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(CONFIG_PATH, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
print(f"[deploy] Generated config at {CONFIG_PATH}")
print(f"[deploy] Active steps: {config['steps']}")
if __name__ == "__main__":
main()

71
deploy/gsheet_poller.py Normal file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
Background Google Sheets poller for auto-archiver cloud deployments.
When GSHEET_URL is set, periodically runs auto-archiver with gsheet_feeder
to check for new URLs in the configured spreadsheet. Runs as a daemon thread
alongside the web UI.
"""
import logging
import os
import subprocess
import threading
import time
logger = logging.getLogger("gsheet_poller")
CONFIG_PATH = "/app/secrets/orchestration.yaml"
def _poll_once():
"""Run auto-archiver once to process any new rows in the Google Sheet."""
logger.info("Polling Google Sheet for new URLs...")
try:
result = subprocess.run(
["python3", "-m", "auto_archiver", "--config", CONFIG_PATH],
capture_output=True,
text=True,
cwd="/app",
timeout=600, # 10 minute timeout per poll
)
if result.returncode == 0:
logger.info("Sheet poll completed successfully.")
else:
logger.warning("Sheet poll exited with code %d: %s", result.returncode, result.stderr[-500:])
except subprocess.TimeoutExpired:
logger.error("Sheet poll timed out after 600s")
except Exception:
logger.exception("Sheet poll failed")
def _poll_loop(interval: int):
"""Run the poll loop at the given interval (seconds)."""
logger.info("Google Sheets poller started (interval=%ds)", interval)
while True:
_poll_once()
time.sleep(interval)
def start_poller():
"""
Start the Google Sheets poller as a daemon thread if GSHEET_URL is set.
Call this once at application startup.
"""
gsheet_url = os.environ.get("GSHEET_URL", "")
if not gsheet_url:
logger.info("GSHEET_URL not set Sheet poller disabled.")
return
interval = int(os.environ.get("POLL_INTERVAL", "300"))
if interval < 60:
interval = 60 # minimum 1 minute
thread = threading.Thread(
target=_poll_loop,
args=(interval,),
daemon=True,
name="gsheet-poller",
)
thread.start()
logger.info("Google Sheets poller thread started.")

2
deploy/pytest.ini Normal file
View File

@@ -0,0 +1,2 @@
[pytest]
testpaths = tests

37
deploy/start.py Normal file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""
Startup entrypoint for cloud deployments.
1. Generates orchestration.yaml from environment variables
2. Starts the Google Sheets poller (if GSHEET_URL is set)
3. Starts the FastAPI web UI
"""
import os
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
# Generate config from env vars
from deploy.generate_config import main as generate_config # noqa: E402
generate_config()
# Start gsheet poller (no-op if GSHEET_URL not set)
from deploy.gsheet_poller import start_poller # noqa: E402
start_poller()
# Start web server
import uvicorn # noqa: E402
port = int(os.environ.get("PORT", "8080"))
uvicorn.run(
"deploy.web_ui:app",
host="0.0.0.0",
port=port,
log_level="info",
)

0
deploy/tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,354 @@
"""Tests for deploy/generate_config.py config generation from env vars."""
import json
import os
from unittest.mock import patch
import yaml
from deploy.generate_config import build_config, main
# ── Helpers ───────────────────────────────────────────────────────────
def _env(**overrides):
"""Return a clean env dict with only the given overrides (no leak from host)."""
# Clear all deploy-relevant env vars, then apply overrides
deploy_vars = [
"LOG_LEVEL",
"SUBTITLES",
"GSHEET_URL",
"GOOGLE_SERVICE_ACCOUNT_JSON",
"S3_BUCKET",
"S3_KEY",
"S3_SECRET",
"S3_REGION",
"S3_ENDPOINT",
"S3_CDN_URL",
"S3_PRIVATE",
"TELEGRAM_API_ID",
"TELEGRAM_API_HASH",
"TELEGRAM_BOT_TOKEN",
"ENABLE_SCREENSHOTS",
"ENABLE_THUMBNAILS",
"ENABLE_CSV_DB",
]
clean = {k: v for k, v in os.environ.items() if k not in deploy_vars}
clean.update(overrides)
return clean
# ── Base config (no optional env vars) ────────────────────────────────
class TestBaseConfig:
"""When no optional env vars are set, build_config returns a minimal working config."""
def test_base_steps(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
steps = cfg["steps"]
assert steps["feeders"] == ["cli_feeder"]
assert steps["extractors"] == ["generic_extractor"]
assert steps["enrichers"] == ["hash_enricher"]
assert steps["databases"] == ["console_db"]
assert steps["storages"] == ["local_storage"]
assert steps["formatters"] == ["html_formatter"]
def test_base_has_required_module_configs(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "local_storage" in cfg
assert "generic_extractor" in cfg
assert "hash_enricher" in cfg
assert "html_formatter" in cfg
def test_default_log_level_is_info(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["logging"]["level"] == "INFO"
def test_custom_log_level(self):
with patch.dict(os.environ, _env(LOG_LEVEL="DEBUG"), clear=True):
cfg = build_config()
assert cfg["logging"]["level"] == "DEBUG"
def test_authentication_present_and_empty(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["authentication"] == {}
def test_local_storage_defaults(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
ls = cfg["local_storage"]
assert ls["save_to"] == "/app/local_archive"
assert ls["path_generator"] == "flat"
assert ls["filename_generator"] == "static"
def test_subtitles_default_false(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is False
def test_subtitles_enabled(self):
with patch.dict(os.environ, _env(SUBTITLES="true"), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is True
def test_subtitles_case_insensitive(self):
with patch.dict(os.environ, _env(SUBTITLES="True"), clear=True):
cfg = build_config()
assert cfg["generic_extractor"]["subtitles"] is True
def test_no_optional_modules_present(self):
"""Ensure optional modules don't appear when their env vars are absent."""
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "gsheet_feeder" not in cfg
assert "s3_storage" not in cfg
assert "telegram_extractor" not in cfg
assert "screenshot_enricher" not in cfg
assert "thumbnail_enricher" not in cfg
assert "csv_db" not in cfg
def test_config_is_valid_yaml(self):
"""The output dict should round-trip through YAML cleanly."""
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
dumped = yaml.dump(cfg)
reloaded = yaml.safe_load(dumped)
assert reloaded == cfg
# ── Google Sheets ─────────────────────────────────────────────────────
class TestGSheetConfig:
def test_gsheet_adds_feeder_and_db(self):
with patch.dict(os.environ, _env(GSHEET_URL="https://docs.google.com/spreadsheets/d/abc"), clear=True):
cfg = build_config()
assert "gsheet_feeder" in cfg["steps"]["feeders"]
assert "gsheet_db" in cfg["steps"]["databases"]
def test_gsheet_feeder_config(self):
url = "https://docs.google.com/spreadsheets/d/abc123"
with patch.dict(os.environ, _env(GSHEET_URL=url), clear=True):
cfg = build_config()
gf = cfg["gsheet_feeder"]
assert gf["sheet"] == url
assert gf["header"] == 1
assert "service_account" in gf
assert gf["columns"]["url"] == "link"
assert gf["columns"]["status"] == "archive status"
def test_gsheet_preserves_cli_feeder(self):
"""cli_feeder should still be present even when gsheet is added."""
with patch.dict(os.environ, _env(GSHEET_URL="https://example.com/sheet"), clear=True):
cfg = build_config()
assert "cli_feeder" in cfg["steps"]["feeders"]
def test_service_account_json_written(self, tmp_path):
"""When GOOGLE_SERVICE_ACCOUNT_JSON is set, it writes the file."""
sa_data = json.dumps({"type": "service_account", "project_id": "test"})
secrets_dir = tmp_path / "secrets"
with (
patch.dict(os.environ, _env(GOOGLE_SERVICE_ACCOUNT_JSON=sa_data), clear=True),
patch("deploy.generate_config.SECRETS_DIR", secrets_dir),
):
build_config()
sa_path = secrets_dir / "service_account.json"
assert sa_path.exists()
assert json.loads(sa_path.read_text())["project_id"] == "test"
# ── S3 storage ────────────────────────────────────────────────────────
class TestS3Config:
def test_s3_adds_storage(self):
with patch.dict(os.environ, _env(S3_BUCKET="my-bucket"), clear=True):
cfg = build_config()
assert "s3_storage" in cfg["steps"]["storages"]
assert "local_storage" in cfg["steps"]["storages"] # local still there
def test_s3_config_values(self):
env = _env(
S3_BUCKET="my-bucket",
S3_KEY="AKID",
S3_SECRET="shhh",
S3_REGION="eu-west-1",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
s3 = cfg["s3_storage"]
assert s3["bucket"] == "my-bucket"
assert s3["key"] == "AKID"
assert s3["secret"] == "shhh"
assert s3["region"] == "eu-west-1"
assert s3["private"] is False
assert s3["random_no_duplicate"] is True
def test_s3_defaults(self):
with patch.dict(os.environ, _env(S3_BUCKET="b"), clear=True):
cfg = build_config()
s3 = cfg["s3_storage"]
assert s3["region"] == "us-east-1"
assert "{region}" in s3["endpoint_url"]
def test_s3_private_flag(self):
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_PRIVATE="true"), clear=True):
cfg = build_config()
assert cfg["s3_storage"]["private"] is True
def test_s3_custom_endpoint(self):
endpoint = "https://nyc3.digitaloceanspaces.com"
with patch.dict(os.environ, _env(S3_BUCKET="b", S3_ENDPOINT=endpoint), clear=True):
cfg = build_config()
assert cfg["s3_storage"]["endpoint_url"] == endpoint
# ── Telegram ──────────────────────────────────────────────────────────
class TestTelegramConfig:
def test_telegram_added_when_both_set(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert "telegram_extractor" in cfg["steps"]["extractors"]
assert cfg["telegram_extractor"]["api_id"] == "12345"
assert cfg["telegram_extractor"]["api_hash"] == "abc"
def test_telegram_not_added_if_only_id(self):
with patch.dict(os.environ, _env(TELEGRAM_API_ID="12345"), clear=True):
cfg = build_config()
assert "telegram_extractor" not in cfg["steps"]["extractors"]
def test_telegram_not_added_if_only_hash(self):
with patch.dict(os.environ, _env(TELEGRAM_API_HASH="abc"), clear=True):
cfg = build_config()
assert "telegram_extractor" not in cfg["steps"]["extractors"]
def test_telegram_bot_token_optional(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc", TELEGRAM_BOT_TOKEN="bot:tok")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert cfg["telegram_extractor"]["bot_token"] == "bot:tok"
def test_telegram_no_bot_token(self):
env = _env(TELEGRAM_API_ID="12345", TELEGRAM_API_HASH="abc")
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
assert "bot_token" not in cfg["telegram_extractor"]
# ── Optional enrichers / databases ────────────────────────────────────
class TestOptionalModules:
def test_screenshots_disabled_by_default(self):
with patch.dict(os.environ, _env(), clear=True):
cfg = build_config()
assert "screenshot_enricher" not in cfg["steps"]["enrichers"]
def test_screenshots_enabled(self):
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="true"), clear=True):
cfg = build_config()
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
assert cfg["screenshot_enricher"]["width"] == 1280
def test_thumbnails_enabled(self):
with patch.dict(os.environ, _env(ENABLE_THUMBNAILS="true"), clear=True):
cfg = build_config()
assert "thumbnail_enricher" in cfg["steps"]["enrichers"]
assert cfg["thumbnail_enricher"]["max_thumbnails"] == 16
def test_csv_db_enabled(self):
with patch.dict(os.environ, _env(ENABLE_CSV_DB="true"), clear=True):
cfg = build_config()
assert "csv_db" in cfg["steps"]["databases"]
assert cfg["csv_db"]["csv_file"] == "/app/local_archive/db.csv"
def test_case_insensitive_boolean(self):
with patch.dict(os.environ, _env(ENABLE_SCREENSHOTS="TRUE"), clear=True):
cfg = build_config()
assert "screenshot_enricher" in cfg["steps"]["enrichers"]
# ── Combined / full config ────────────────────────────────────────────
class TestCombinedConfig:
def test_all_optional_modules_together(self):
"""Enable everything at once and verify no conflicts."""
env = _env(
GSHEET_URL="https://example.com/sheet",
S3_BUCKET="bucket",
S3_KEY="key",
S3_SECRET="secret",
TELEGRAM_API_ID="123",
TELEGRAM_API_HASH="abc",
TELEGRAM_BOT_TOKEN="tok",
ENABLE_SCREENSHOTS="true",
ENABLE_THUMBNAILS="true",
ENABLE_CSV_DB="true",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
steps = cfg["steps"]
assert "gsheet_feeder" in steps["feeders"]
assert "telegram_extractor" in steps["extractors"]
assert "screenshot_enricher" in steps["enrichers"]
assert "thumbnail_enricher" in steps["enrichers"]
assert "csv_db" in steps["databases"]
assert "gsheet_db" in steps["databases"]
assert "s3_storage" in steps["storages"]
assert "local_storage" in steps["storages"]
# All module configs present
for key in [
"gsheet_feeder",
"s3_storage",
"telegram_extractor",
"screenshot_enricher",
"thumbnail_enricher",
"csv_db",
]:
assert key in cfg, f"{key} config missing"
def test_full_config_valid_yaml(self):
env = _env(
GSHEET_URL="https://example.com/sheet",
S3_BUCKET="bucket",
TELEGRAM_API_ID="123",
TELEGRAM_API_HASH="abc",
ENABLE_SCREENSHOTS="true",
ENABLE_CSV_DB="true",
)
with patch.dict(os.environ, env, clear=True):
cfg = build_config()
dumped = yaml.dump(cfg)
reloaded = yaml.safe_load(dumped)
assert reloaded == cfg
# ── main() writes file ───────────────────────────────────────────────
class TestMainFunction:
def test_main_writes_config_file(self, tmp_path):
config_path = tmp_path / "orchestration.yaml"
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
main()
assert config_path.exists()
cfg = yaml.safe_load(config_path.read_text())
assert cfg["steps"]["feeders"] == ["cli_feeder"]
def test_main_creates_parent_dirs(self, tmp_path):
config_path = tmp_path / "nested" / "dir" / "orchestration.yaml"
with patch.dict(os.environ, _env(), clear=True), patch("deploy.generate_config.CONFIG_PATH", config_path):
main()
assert config_path.exists()

View File

@@ -0,0 +1,124 @@
"""Tests for deploy/gsheet_poller.py background Google Sheets polling."""
import os
from unittest.mock import patch, MagicMock
from deploy.gsheet_poller import start_poller, _poll_once
# ── start_poller ──────────────────────────────────────────────────────
class TestStartPoller:
def test_disabled_when_no_gsheet_url(self):
"""No thread should be started when GSHEET_URL is empty."""
with (
patch.dict(os.environ, {"GSHEET_URL": ""}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
start_poller()
mock_thread.assert_not_called()
def test_disabled_when_gsheet_url_absent(self):
env = {k: v for k, v in os.environ.items() if k != "GSHEET_URL"}
with patch.dict(os.environ, env, clear=True), patch("deploy.gsheet_poller.threading.Thread") as mock_thread:
start_poller()
mock_thread.assert_not_called()
def test_starts_thread_when_gsheet_url_set(self):
with (
patch.dict(os.environ, {"GSHEET_URL": "https://example.com/sheet"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_instance = MagicMock()
mock_thread.return_value = mock_instance
start_poller()
mock_thread.assert_called_once()
assert mock_thread.call_args.kwargs["daemon"] is True
assert mock_thread.call_args.kwargs["name"] == "gsheet-poller"
mock_instance.start.assert_called_once()
def test_default_interval_300(self):
env = {"GSHEET_URL": "https://example.com/sheet"}
# Remove POLL_INTERVAL if present
clean_env = {k: v for k, v in os.environ.items() if k != "POLL_INTERVAL"}
clean_env.update(env)
with (
patch.dict(os.environ, clean_env, clear=True),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
# interval should be passed as arg to _poll_loop
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (300,)
def test_custom_interval(self):
with (
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "600"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (600,)
def test_interval_minimum_enforced(self):
"""Intervals below 60 should be clamped to 60."""
with (
patch.dict(os.environ, {"GSHEET_URL": "x", "POLL_INTERVAL": "10"}, clear=False),
patch("deploy.gsheet_poller.threading.Thread") as mock_thread,
):
mock_thread.return_value = MagicMock()
start_poller()
args = mock_thread.call_args.kwargs.get("args") or mock_thread.call_args[1].get("args")
assert args == (60,)
# ── _poll_once ────────────────────────────────────────────────────────
class TestPollOnce:
def test_calls_subprocess_with_config(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
mock_run.assert_called_once()
cmd = mock_run.call_args[0][0]
assert "auto_archiver" in " ".join(cmd)
assert "--config" in cmd
def test_handles_nonzero_exit(self):
"""Should not raise on non-zero exit, just log a warning."""
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stderr="some error")
_poll_once() # should not raise
def test_handles_timeout(self):
"""Should not raise on timeout, just log."""
import subprocess
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=600)
_poll_once() # should not raise
def test_handles_exception(self):
"""Should not raise on arbitrary exceptions."""
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.side_effect = OSError("broken")
_poll_once() # should not raise
def test_uses_correct_config_path(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
cmd = mock_run.call_args[0][0]
config_idx = cmd.index("--config")
assert cmd[config_idx + 1] == "/app/secrets/orchestration.yaml"
def test_timeout_set(self):
with patch("deploy.gsheet_poller.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stderr="")
_poll_once()
assert mock_run.call_args[1]["timeout"] == 600

310
deploy/tests/test_web_ui.py Normal file
View File

@@ -0,0 +1,310 @@
"""Tests for deploy/web_ui.py FastAPI web interface."""
from unittest.mock import patch, AsyncMock
import pytest
from fastapi.testclient import TestClient
# ── Fixtures ──────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def _reset_state():
"""Reset in-memory state between tests."""
import deploy.web_ui as mod
mod._valid_sessions.clear()
mod._jobs.clear()
yield
mod._valid_sessions.clear()
mod._jobs.clear()
@pytest.fixture
def client_no_auth():
"""Test client with auth disabled (no AUTH_PASSWORD)."""
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", ""):
from deploy.web_ui import app
yield TestClient(app, raise_server_exceptions=False)
@pytest.fixture
def client_with_auth():
"""Test client with auth enabled."""
with patch.object(__import__("deploy.web_ui", fromlist=["web_ui"]), "AUTH_PASSWORD", "secret123"):
from deploy.web_ui import app
yield TestClient(app, raise_server_exceptions=False)
def _login(client, password="secret123"):
"""Helper: log in and return the session cookie."""
resp = client.post("/login", data={"password": password}, follow_redirects=False)
return resp.cookies.get("aa_session")
# ── Health check ──────────────────────────────────────────────────────
class TestHealthCheck:
def test_status_returns_ok(self, client_no_auth):
resp = client_no_auth.get("/status")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
def test_status_no_auth_required(self, client_with_auth):
resp = client_with_auth.get("/status")
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
# ── Auth disabled ─────────────────────────────────────────────────────
class TestNoAuth:
def test_index_accessible(self, client_no_auth):
resp = client_no_auth.get("/")
assert resp.status_code == 200
assert "Auto Archiver" in resp.text
def test_login_page_redirects_to_index(self, client_no_auth):
resp = client_no_auth.get("/login", follow_redirects=False)
assert resp.status_code == 302
assert resp.headers["location"] == "/"
def test_login_post_redirects_to_index(self, client_no_auth):
resp = client_no_auth.post("/login", data={"password": "anything"}, follow_redirects=False)
assert resp.status_code == 302
def test_no_logout_link_shown(self, client_no_auth):
resp = client_no_auth.get("/")
assert "Logout" not in resp.text
# ── Auth enabled ──────────────────────────────────────────────────────
class TestAuth:
def test_index_redirects_to_login(self, client_with_auth):
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
assert resp.headers["location"] == "/login"
def test_login_page_renders(self, client_with_auth):
resp = client_with_auth.get("/login")
assert resp.status_code == 200
assert "Password" in resp.text
def test_wrong_password_returns_401(self, client_with_auth):
resp = client_with_auth.post("/login", data={"password": "wrong"})
assert resp.status_code == 401
assert "Wrong password" in resp.text
def test_correct_password_sets_cookie(self, client_with_auth):
resp = client_with_auth.post("/login", data={"password": "secret123"}, follow_redirects=False)
assert resp.status_code == 302
assert "aa_session" in resp.cookies
def test_authenticated_access(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/")
assert resp.status_code == 200
assert "Auto Archiver" in resp.text
def test_logout_clears_session(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/logout", follow_redirects=False)
assert resp.status_code == 302
# After logout, index should redirect to login again
client_with_auth.cookies.clear()
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
def test_logout_link_shown_when_auth_enabled(self, client_with_auth):
cookie = _login(client_with_auth)
client_with_auth.cookies.set("aa_session", cookie)
resp = client_with_auth.get("/")
assert "Logout" in resp.text
def test_results_requires_auth(self, client_with_auth):
resp = client_with_auth.get("/results", follow_redirects=False)
assert resp.status_code == 307
def test_invalid_session_rejected(self, client_with_auth):
client_with_auth.cookies.set("aa_session", "bogus-token")
resp = client_with_auth.get("/", follow_redirects=False)
assert resp.status_code == 307
# ── Archive submission ────────────────────────────────────────────────
class TestArchive:
def test_archive_creates_job(self, client_no_auth):
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
resp = client_no_auth.post(
"/archive",
data={"urls": "https://example.com\nhttps://example.org"},
follow_redirects=False,
)
assert resp.status_code == 303
assert resp.headers["location"] == "/"
from deploy.web_ui import _jobs
assert len(_jobs) == 1
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
assert _jobs[0]["status"] == "running"
def test_archive_empty_urls_returns_400(self, client_no_auth):
resp = client_no_auth.post("/archive", data={"urls": " \n \n"})
assert resp.status_code == 400
def test_archive_strips_whitespace(self, client_no_auth):
with patch("deploy.web_ui._run_archive", new_callable=AsyncMock):
client_no_auth.post(
"/archive",
data={"urls": " https://example.com \n\n https://example.org \n"},
follow_redirects=False,
)
from deploy.web_ui import _jobs
assert _jobs[0]["urls"] == ["https://example.com", "https://example.org"]
def test_archive_requires_auth(self, client_with_auth):
resp = client_with_auth.post(
"/archive",
data={"urls": "https://example.com"},
follow_redirects=False,
)
assert resp.status_code == 307
# ── Results page ──────────────────────────────────────────────────────
class TestResults:
def test_results_empty(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "No archived files yet" in resp.text
def test_results_lists_files(self, client_no_auth, tmp_path):
(tmp_path / "test.html").write_text("<html>archived</html>")
(tmp_path / "video.mp4").write_bytes(b"\x00" * 10)
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "test.html" in resp.text
assert "video.mp4" in resp.text
def test_results_nonexistent_dir(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path / "nonexistent"):
resp = client_no_auth.get("/results")
assert resp.status_code == 200
assert "No archived files yet" in resp.text
# ── File serving ──────────────────────────────────────────────────────
class TestFileServing:
def test_serve_existing_file(self, client_no_auth, tmp_path):
(tmp_path / "report.html").write_text("<html>done</html>")
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/files/report.html")
assert resp.status_code == 200
def test_serve_nonexistent_file(self, client_no_auth, tmp_path):
with patch("deploy.web_ui.ARCHIVE_DIR", tmp_path):
resp = client_no_auth.get("/files/nope.txt")
assert resp.status_code == 404
def test_path_traversal_blocked(self, client_no_auth, tmp_path):
# Create a file outside the archive dir
outside = tmp_path / "outside"
outside.mkdir()
(outside / "secret.txt").write_text("secret")
archive = tmp_path / "archive"
archive.mkdir()
# Symlink into archive pointing outside
(archive / "escape").symlink_to(outside / "secret.txt")
with patch("deploy.web_ui.ARCHIVE_DIR", archive):
resp = client_no_auth.get("/files/escape")
assert resp.status_code == 403
# ── Job rendering ─────────────────────────────────────────────────────
class TestJobRendering:
def test_no_jobs_shows_message(self, client_no_auth):
resp = client_no_auth.get("/")
assert "No archiving jobs yet" in resp.text
def test_jobs_shown_in_table(self, client_no_auth):
from deploy.web_ui import _jobs
_jobs.append(
{
"id": 1,
"urls": ["https://example.com"],
"status": "done",
"started": "2026-01-01 00:00 UTC",
"output": "",
}
)
resp = client_no_auth.get("/")
assert "example.com" in resp.text
assert "done" in resp.text
def test_many_urls_truncated(self, client_no_auth):
from deploy.web_ui import _jobs
_jobs.append(
{
"id": 1,
"urls": [f"https://example.com/{i}" for i in range(10)],
"status": "running",
"started": "2026-01-01 00:00 UTC",
"output": "",
}
)
resp = client_no_auth.get("/")
assert "+7 more" in resp.text
# ── HTML template rendering ──────────────────────────────────────────
class TestTemplates:
"""Verify HTML templates can be .format()-ed without KeyError."""
def test_login_html_renders(self):
from deploy.web_ui import LOGIN_HTML
result = LOGIN_HTML.format(error="")
assert "Auto Archiver" in result
def test_login_html_renders_with_error(self):
from deploy.web_ui import LOGIN_HTML
result = LOGIN_HTML.format(error='<p class="err">Nope</p>')
assert "Nope" in result
def test_main_html_renders(self):
from deploy.web_ui import MAIN_HTML
result = MAIN_HTML.format(logout="", jobs_html="")
assert "Auto Archiver" in result
def test_results_html_renders(self):
from deploy.web_ui import RESULTS_HTML
result = RESULTS_HTML.format(file_list="<p>empty</p>")
assert "Archived Files" in result

269
deploy/web_ui.py Normal file
View File

@@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
Minimal web UI for auto-archiver cloud deployments.
Provides:
- GET / → HTML form to submit URLs for archiving
- POST /archive → Runs auto-archiver on submitted URLs
- GET /results → Lists archived files available for download
- GET /files/{path} → Serves archived files
- GET /status → Health check
"""
import asyncio
import html
import os
import secrets
from datetime import datetime, timezone
from pathlib import Path
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
AUTH_PASSWORD = os.environ.get("AUTH_PASSWORD", "")
ARCHIVE_DIR = Path("/app/local_archive")
CONFIG_PATH = Path("/app/secrets/orchestration.yaml")
COOKIE_NAME = "aa_session"
# In-memory session tokens (reset on restart, which is fine for this use case)
_valid_sessions: set[str] = set()
# In-memory job log
_jobs: list[dict] = []
app = FastAPI(title="Auto Archiver", docs_url=None, redoc_url=None)
# ── Auth helpers ──────────────────────────────────────────────────────
def _check_auth(request: Request):
"""Dependency: redirect to /login if auth is enabled and session is missing."""
if not AUTH_PASSWORD:
return # auth disabled
token = request.cookies.get(COOKIE_NAME, "")
if token not in _valid_sessions:
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
headers={"Location": "/login"},
)
# ── Pages ─────────────────────────────────────────────────────────────
LOGIN_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver Login</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 420px; margin: 80px auto; padding: 0 1rem; }}
h1 {{ font-size: 1.4rem; }}
input[type=password], button {{ font-size: 1rem; padding: .5rem .8rem; }}
input[type=password] {{ width: 100%; box-sizing: border-box; margin: .5rem 0; }}
button {{ cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; }}
.err {{ color: #dc2626; }}
</style></head><body>
<h1>🔐 Auto Archiver</h1>
<form method="POST" action="/login">
<label>Password<br><input type="password" name="password" autofocus required></label><br>
<button type="submit">Log in</button>
{error}
</form></body></html>"""
MAIN_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; line-height: 1.6; }}
h1 {{ font-size: 1.5rem; }}
textarea {{ width: 100%; box-sizing: border-box; font-size: .95rem; font-family: monospace; }}
button {{ font-size: 1rem; padding: .5rem 1.2rem; cursor: pointer; background: #2563eb; color: #fff; border: none; border-radius: 4px; margin-top: .5rem; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 1rem; }}
th, td {{ border: 1px solid #e5e7eb; padding: .4rem .6rem; text-align: left; font-size: .9rem; }}
th {{ background: #f9fafb; }}
.status {{ padding: 2px 8px; border-radius: 4px; font-size: .85rem; }}
.running {{ background: #fef3c7; color: #92400e; }}
.done {{ background: #d1fae5; color: #065f46; }}
.failed {{ background: #fee2e2; color: #991b1b; }}
a {{ color: #2563eb; }}
.info {{ color: #6b7280; font-size: .9rem; }}
nav {{ display: flex; gap: 1rem; align-items: center; }}
nav a {{ text-decoration: none; }}
</style></head><body>
<nav>
<h1>📦 Auto Archiver</h1>
<a href="/results">Browse files</a>
{logout}
</nav>
<form method="POST" action="/archive">
<label for="urls"><strong>URLs to archive</strong> (one per line)</label><br>
<textarea id="urls" name="urls" rows="5" placeholder="https://example.com/post&#10;https://youtube.com/watch?v=..." required></textarea><br>
<button type="submit">Archive</button>
</form>
{jobs_html}
</body></html>"""
RESULTS_HTML = """<!DOCTYPE html>
<html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Auto Archiver Files</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 700px; margin: 2rem auto; padding: 0 1rem; }}
h1 {{ font-size: 1.4rem; }}
a {{ color: #2563eb; }}
li {{ margin: .3rem 0; font-family: monospace; font-size: .9rem; }}
</style></head><body>
<h1>📁 Archived Files</h1>
<p><a href="/">← Back</a></p>
{file_list}
</body></html>"""
# ── Routes ────────────────────────────────────────────────────────────
@app.get("/login", response_class=HTMLResponse)
async def login_page():
if not AUTH_PASSWORD:
return RedirectResponse("/", status_code=302)
return LOGIN_HTML.format(error="")
@app.post("/login")
async def login_submit(password: str = Form(...)):
if not AUTH_PASSWORD:
return RedirectResponse("/", status_code=302)
if password != AUTH_PASSWORD:
return HTMLResponse(
LOGIN_HTML.format(error='<p class="err">Wrong password.</p>'),
status_code=401,
)
token = secrets.token_urlsafe(32)
_valid_sessions.add(token)
resp = RedirectResponse("/", status_code=302)
resp.set_cookie(COOKIE_NAME, token, httponly=True, samesite="lax", max_age=86400 * 30)
return resp
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, _=Depends(_check_auth)):
logout = '<a href="/logout">Logout</a>' if AUTH_PASSWORD else ""
jobs_html = _render_jobs()
return MAIN_HTML.format(logout=logout, jobs_html=jobs_html)
@app.post("/archive")
async def archive(request: Request, urls: str = Form(...), _=Depends(_check_auth)):
url_list = [u.strip() for u in urls.strip().splitlines() if u.strip()]
if not url_list:
raise HTTPException(400, "No URLs provided")
job = {
"id": len(_jobs) + 1,
"urls": url_list,
"status": "running",
"started": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
"output": "",
}
_jobs.insert(0, job)
# Run in background so the user sees the page immediately
asyncio.create_task(_run_archive(job))
return RedirectResponse("/", status_code=303)
@app.get("/results", response_class=HTMLResponse)
async def results(request: Request, _=Depends(_check_auth)):
if not ARCHIVE_DIR.exists():
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
files = sorted(ARCHIVE_DIR.rglob("*"), key=lambda p: p.stat().st_mtime, reverse=True)
files = [f for f in files if f.is_file()]
if not files:
return RESULTS_HTML.format(file_list="<p>No archived files yet.</p>")
items = []
for f in files[:200]: # cap listing
rel = f.relative_to(ARCHIVE_DIR)
items.append(f'<li><a href="/files/{rel}">{html.escape(str(rel))}</a></li>')
return RESULTS_HTML.format(file_list="<ul>" + "\n".join(items) + "</ul>")
@app.get("/files/{path:path}")
async def serve_file(path: str, request: Request, _=Depends(_check_auth)):
full = ARCHIVE_DIR / path
if not full.exists() or not full.is_file():
raise HTTPException(404, "File not found")
# Security: ensure the resolved path is within ARCHIVE_DIR
try:
full.resolve().relative_to(ARCHIVE_DIR.resolve())
except ValueError:
raise HTTPException(403, "Forbidden")
return FileResponse(full)
@app.get("/status")
async def health():
return {"status": "ok"}
@app.get("/logout")
async def logout(request: Request):
token = request.cookies.get(COOKIE_NAME, "")
_valid_sessions.discard(token)
resp = RedirectResponse("/login", status_code=302)
resp.delete_cookie(COOKIE_NAME)
return resp
# ── Helpers ───────────────────────────────────────────────────────────
async def _run_archive(job: dict):
"""Run auto-archiver as a subprocess for the given URLs."""
cmd = [
"python3",
"-m",
"auto_archiver",
"--config",
str(CONFIG_PATH),
] + job["urls"]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd="/app",
)
stdout, _ = await proc.communicate()
job["output"] = stdout.decode(errors="replace")[-5000:] # keep last 5k chars
job["status"] = "done" if proc.returncode == 0 else "failed"
except Exception as e:
job["output"] = str(e)
job["status"] = "failed"
def _render_jobs() -> str:
if not _jobs:
return '<p class="info">No archiving jobs yet. Submit URLs above to get started.</p>'
rows = []
for j in _jobs[:50]:
urls_str = html.escape(", ".join(j["urls"][:3]))
if len(j["urls"]) > 3:
urls_str += f" (+{len(j['urls']) - 3} more)"
status_cls = j["status"]
rows.append(
f"<tr><td>{j['id']}</td>"
f"<td>{urls_str}</td>"
f'<td><span class="status {status_cls}">{j["status"]}</span></td>'
f"<td>{j['started']}</td></tr>"
)
return (
"<h2>Recent Jobs</h2>"
"<table><thead><tr><th>#</th><th>URLs</th><th>Status</th><th>Started</th></tr></thead>"
"<tbody>" + "\n".join(rows) + "</tbody></table>"
)

View File

@@ -6,6 +6,9 @@ services:
context: .
dockerfile: Dockerfile
container_name: auto-archiver
# Override user to match host UID/GID and avoid permission issues on volumes.
# Set USER_ID and GROUP_ID env vars, or defaults to 1000:1000.
user: "${USER_ID:-1000}:${GROUP_ID:-1000}"
volumes:
- ./secrets:/app/secrets
- ./local_archive:/app/local_archive

764
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.2.1"
version = "1.2.5"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"

99
railway.json Normal file
View File

@@ -0,0 +1,99 @@
{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"dockerfilePath": "deploy/Dockerfile"
},
"deploy": {
"startCommand": "python3 -m deploy.start",
"healthcheckPath": "/status",
"healthcheckTimeout": 30,
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 5
},
"variables": {
"AUTH_PASSWORD": {
"description": "Password to access your archiver web interface",
"required": true
},
"GSHEET_URL": {
"description": "Google Sheet URL to monitor for new URLs (leave empty to disable)",
"required": false,
"default": ""
},
"GOOGLE_SERVICE_ACCOUNT_JSON": {
"description": "Full JSON contents of your Google service account key (required for Sheets)",
"required": false,
"default": ""
},
"POLL_INTERVAL": {
"description": "Seconds between Google Sheet checks (min 60)",
"required": false,
"default": "300"
},
"S3_BUCKET": {
"description": "S3 bucket name for storage (leave empty for local-only)",
"required": false,
"default": ""
},
"S3_KEY": {
"description": "S3 access key ID",
"required": false,
"default": ""
},
"S3_SECRET": {
"description": "S3 secret access key",
"required": false,
"default": ""
},
"S3_REGION": {
"description": "S3 region (e.g. us-east-1, nyc3 for DO Spaces)",
"required": false,
"default": "us-east-1"
},
"S3_ENDPOINT": {
"description": "S3 endpoint URL template",
"required": false,
"default": "https://s3.{region}.amazonaws.com"
},
"S3_CDN_URL": {
"description": "Public CDN URL template for archived files",
"required": false,
"default": "https://{bucket}.s3.{region}.amazonaws.com/{key}"
},
"TELEGRAM_API_ID": {
"description": "Telegram API ID from https://my.telegram.org",
"required": false,
"default": ""
},
"TELEGRAM_API_HASH": {
"description": "Telegram API hash from https://my.telegram.org",
"required": false,
"default": ""
},
"TELEGRAM_BOT_TOKEN": {
"description": "Telegram bot token from @BotFather",
"required": false,
"default": ""
},
"ENABLE_SCREENSHOTS": {
"description": "Set to true to capture full-page screenshots",
"required": false,
"default": "false"
},
"ENABLE_THUMBNAILS": {
"description": "Set to true to generate video thumbnails",
"required": false,
"default": "false"
},
"ENABLE_CSV_DB": {
"description": "Set to true to save a CSV log of archived items",
"required": false,
"default": "false"
},
"LOG_LEVEL": {
"description": "Logging level: DEBUG, INFO, WARNING, ERROR",
"required": false,
"default": "INFO"
}
}
}

View File

@@ -181,6 +181,9 @@ class Metadata:
media_hashes = set()
new_media = []
for m in self.media:
if not m.filename:
new_media.append(m)
continue
h = m.get("hash")
if not h:
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)

View File

@@ -73,6 +73,7 @@ class AntibotExtractorEnricher(Extractor, Enricher):
if self.enrich(result):
result.status = "antibot"
return result
return False
def _prepare_user_data_dir(self):
if self.user_data_dir:
@@ -88,8 +89,18 @@ class AntibotExtractorEnricher(Extractor, Enricher):
using_user_data_dir = self.user_data_dir if custom_data_dir else None
url = to_enrich.get_url()
# Use xvfb in Docker environments where no display is available
use_xvfb = bool(os.environ.get("RUNNING_IN_DOCKER"))
try:
with SB(uc=True, agent=self.agent, headed=None, user_data_dir=using_user_data_dir, proxy=self.proxy) as sb:
with SB(
uc=True,
agent=self.agent,
headed=None,
user_data_dir=using_user_data_dir,
proxy=self.proxy,
xvfb=use_xvfb,
) as sb:
logger.info(f"Selenium browser is up with agent {self.agent}, opening url...")
sb.uc_open_with_reconnect(url, 4)

View File

@@ -39,12 +39,18 @@ class Bluesky(GenericDropin):
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
for image_media in image_medias:
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
image_media = archiver.download_from_url(url)
media.append(Media(image_media))
filename = archiver.download_from_url(url)
if filename:
media.append(Media(filename))
else:
logger.warning(f"Failed to download Bluesky image from {url}")
for video_media in video_medias:
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
video_media = archiver.download_from_url(url)
media.append(Media(video_media))
filename = archiver.download_from_url(url)
if filename:
media.append(Media(filename))
else:
logger.warning(f"Failed to download Bluesky video from {url}")
return media
def _get_post_data(self, post: dict) -> dict:

View File

@@ -204,8 +204,11 @@ class GenericExtractor(Extractor):
if thumbnail_url:
try:
cover_image_path = self.download_from_url(thumbnail_url)
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
if cover_image_path:
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
else:
logger.warning(f"Failed to download cover image from {thumbnail_url}")
except Exception as e:
logger.error(f"Could not download cover image {thumbnail_url}: {e}")

View File

@@ -1,6 +1,7 @@
from typing import Type
from auto_archiver.utils import traverse_obj
from auto_archiver.utils.custom_logger import logger
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.core.extractor import Extractor
from yt_dlp.extractor.common import InfoExtractor
@@ -58,6 +59,9 @@ class Truth(GenericDropin):
# add the media
for media in post.get("media_attachments", []):
filename = archiver.download_from_url(media["url"])
if not filename:
logger.warning(f"Failed to download media from {media['url']}")
continue
result.add_media(Media(filename), id=media.get("id"))
return result

View File

@@ -9,6 +9,8 @@ from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
from auto_archiver.core.extractor import Extractor
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
from auto_archiver.modules.generic_extractor.dropin import GenericDropin, InfoExtractor
import requests
from retrying import retry
class Twitter(GenericDropin):
@@ -29,7 +31,85 @@ class Twitter(GenericDropin):
def extract_post(self, url: str, ie_instance: InfoExtractor):
twid = ie_instance._match_valid_url(url).group("id")
return ie_instance._extract_status(twid=twid)
try:
post_data = ie_instance._extract_status(twid=twid)
if not post_data or not post_data.get("user") or not post_data.get("created_at"):
raise ValueError("Error retrieving post with twitter dropin")
return post_data
except Exception as e:
logger.debug(f"yt-dlp twitter extraction failed: {e}")
# try fxtwitter API as fallback
return self._fetch_fxtwitter(twid)
def _fetch_fxtwitter(self, twid: str) -> dict:
"""Fetch tweet data from fxtwitter API and convert to expected format."""
fxtwitter_url = f"https://api.fxtwitter.com/status/{twid}"
logger.info(f"Falling back to fxtwitter API for tweet extraction: {fxtwitter_url}")
@retry(wait_random_min=500, wait_random_max=2000, stop_max_attempt_number=3)
def fetch_fxtwitter_data(url):
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/112.0"}
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code != 200:
raise ValueError(f"Failed to retrieve tweet from fxtwitter API: {resp.status_code}")
data = resp.json()
if "tweet" not in data:
raise ValueError(f"No tweet data in fxtwitter response: {data.get('message', 'Unknown error')}")
return data["tweet"]
tweet = fetch_fxtwitter_data(fxtwitter_url)
# Convert fxtwitter format to expected format
author = tweet.get("author", {}).get("name", "")
created_at = tweet.get("created_at", "") # Format: "Sun Feb 08 18:45:00 +0000 2026"
full_text = tweet.get("text", "") or tweet.get("raw_text", "")
# Convert media format
media = []
fx_media = tweet.get("media", {})
# Handle photos
for photo in fx_media.get("photos", []):
media.append({"type": "photo", "media_url_https": photo.get("url", "")})
# Handle videos
for video in fx_media.get("videos", []):
variants = video.get("variants", [])
# Convert to expected variant format
converted_variants = []
for var in variants:
converted_variants.append(
{
"url": var.get("url", ""),
"content_type": var.get("content_type", "video/mp4"),
"bitrate": var.get("bitrate", 0),
}
)
if converted_variants:
media.append({"type": "video", "video_info": {"variants": converted_variants}})
# Handle animated gifs (fxtwitter may include these in videos)
for item in fx_media.get("all", []):
if item.get("type") == "gif":
variants = item.get("variants", [])
converted_variants = []
for var in variants:
converted_variants.append(
{
"url": var.get("url", ""),
"content_type": var.get("content_type", "video/mp4"),
"bitrate": var.get("bitrate", 0),
}
)
if converted_variants:
media.append({"type": "animated_gif", "video_info": {"variants": converted_variants}})
return {
"user": {"name": author},
"created_at": created_at,
"full_text": full_text,
"entities": {"media": media},
}
def keys_to_clean(self, video_data, info_extractor):
return ["user", "created_at", "entities", "favorited", "translator_type"]
@@ -77,5 +157,8 @@ class Twitter(GenericDropin):
mimetype = variant["content_type"]
ext = mimetypes.guess_extension(mimetype)
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
if not media.filename:
logger.warning(f"Failed to download media from {media.get('src')}")
continue
result.add_media(media)
return result

View File

@@ -25,6 +25,9 @@ class HashEnricher(Enricher):
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
for i, m in enumerate(to_enrich.media):
if not m.filename:
logger.warning(f"Skipping hash for media without filename: {m}")
continue
if len(hd := self.calculate_hash(m.filename)):
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")

View File

@@ -99,7 +99,10 @@ class InstagramAPIExtractor(Extractor):
result.set_title(user.get("full_name", username)).set("data", user)
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
filename = self.download_from_url(pic_url)
result.add_media(Media(filename=filename), id="profile_picture")
if filename:
result.add_media(Media(filename=filename), id="profile_picture")
else:
logger.warning(f"Failed to download profile picture from {pic_url}")
count_posts = 0
if self.full_profile:
@@ -202,7 +205,10 @@ class InstagramAPIExtractor(Extractor):
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
filename = self.download_from_url(cover_media)
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
if filename:
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
else:
logger.warning(f"Failed to download cover media from {cover_media}")
items = h_info.get("items", [])[::-1] # newest to oldest
items = items[: min(max_to_download, len(items))]
@@ -345,7 +351,10 @@ class InstagramAPIExtractor(Extractor):
image_media = None
if image_url := item.get("thumbnail_url"):
filename = self.download_from_url(image_url, verbose=False)
image_media = Media(filename=filename)
if filename:
image_media = Media(filename=filename)
else:
logger.warning(f"Failed to download thumbnail from {image_url}")
# retrieve video info
best_id = item.get("id", item.get("pk"))
@@ -357,16 +366,19 @@ class InstagramAPIExtractor(Extractor):
if video_url := item.get("video_url"):
filename = self.download_from_url(video_url, verbose=False)
video_media = Media(filename=filename)
if taken_at:
video_media.set("date", taken_at)
if code:
video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
if filename:
video_media = Media(filename=filename)
if taken_at:
video_media.set("date", taken_at)
if code:
video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
else:
logger.warning(f"Failed to download video from {video_url}")
elif image_media:
if taken_at:
image_media.set("date", taken_at)

View File

@@ -25,6 +25,9 @@ class MetaEnricher(Enricher):
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
total_size = 0
for media in to_enrich.get_all_media():
if not media.filename:
logger.warning(f"Skipping file size for media without filename: {media}")
continue
file_stats = os.stat(media.filename)
media.set("bytes", file_stats.st_size)
media.set("size", self.human_readable_bytes(file_stats.st_size))

View File

@@ -49,10 +49,18 @@ class TelegramExtractor(Extractor):
if not len(image_urls):
return False
for img_url in image_urls:
result.add_media(Media(self.download_from_url(img_url)))
filename = self.download_from_url(img_url)
if not filename:
logger.warning(f"Failed to download image from {img_url}")
continue
result.add_media(Media(filename))
else:
video_url = video.get("src")
m_video = Media(self.download_from_url(video_url))
video_filename = self.download_from_url(video_url)
if not video_filename:
logger.warning(f"Failed to download video from {video_url}")
return False
m_video = Media(video_filename)
# extract duration from HTML
try:
duration = s.find_all("time")[0].contents[0]

View File

@@ -1,3 +1,4 @@
import asyncio
import os
import shutil
import re
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
shutil.copy(base_session_filepath, f"{self.session_file}.session")
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -190,6 +201,9 @@ class TelethonExtractor(Extractor):
)
for i, om_url in enumerate(other_media_urls):
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
if not filename:
logger.warning(f"Failed to download media from {om_url}")
continue
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))

View File

@@ -114,6 +114,9 @@ class TwitterApiExtractor(Extractor):
logger.info(f"Found media {media}")
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
if not media.filename:
logger.warning(f"Failed to download media from {media.get('src')}")
continue
result.add_media(media)
result.set_content(

View File

@@ -24,8 +24,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
self.use_docker = os.environ.get("WACZ_ENABLE_DOCKER") or not os.environ.get("RUNNING_IN_DOCKER")
self.docker_in_docker = os.environ.get("WACZ_ENABLE_DOCKER") and os.environ.get("RUNNING_IN_DOCKER")
self.crawl_id = random_str(8)
self.cwd_dind = f"/crawls/crawls{self.crawl_id}"
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
self.browsertrix_home_host = os.environ.get("BROWSERTRIX_HOME_HOST")
self.browsertrix_home_container = os.environ.get("BROWSERTRIX_HOME_CONTAINER") or self.browsertrix_home_host
# create crawls folder if not exists, so it can be safely removed in cleanup
@@ -51,7 +50,8 @@ class WaczExtractorEnricher(Enricher, Extractor):
url = to_enrich.get_url()
collection = self.crawl_id
crawl_id = random_str(8)
collection = crawl_id
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(self.tmp_dir)
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
@@ -83,8 +83,10 @@ class WaczExtractorEnricher(Enricher, Extractor):
# "--blockAds" # note: this has been known to cause issues on cloudflare protected sites
]
crawl_cwd_dind = os.path.join(self.cwd_dind, crawl_id)
if self.docker_in_docker:
cmd.extend(["--cwd", self.cwd_dind])
os.makedirs(crawl_cwd_dind, exist_ok=True)
cmd.extend(["--cwd", crawl_cwd_dind])
if self.auth_for_site(url):
# there's an auth for this site, but browsertrix only supports username/password auth
@@ -109,7 +111,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
] + cmd
if self.profile:
profile_file = f"profile-{self.crawl_id}.tar.gz"
profile_file = f"profile-{crawl_id}.tar.gz"
profile_fn = os.path.join(browsertrix_home_container, profile_file)
logger.debug(f"Copying {self.profile} to {profile_fn}")
shutil.copyfile(self.profile, profile_fn)
@@ -137,7 +139,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
return False
if self.docker_in_docker:
wacz_fn = os.path.join(self.cwd_dind, "collections", collection, f"{collection}.wacz")
wacz_fn = os.path.join(crawl_cwd_dind, "collections", collection, f"{collection}.wacz")
elif self.use_docker:
wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz")
else:
@@ -152,7 +154,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
self.extract_media_from_wacz(to_enrich, wacz_fn)
if self.docker_in_docker:
jsonl_fn = os.path.join(self.cwd_dind, "collections", collection, "pages", "pages.jsonl")
jsonl_fn = os.path.join(crawl_cwd_dind, "collections", collection, "pages", "pages.jsonl")
elif self.use_docker:
jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl")
else:

1
tests/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Core module tests

198
tests/core/test_media.py Normal file
View File

@@ -0,0 +1,198 @@
"""
Tests for the Media class from auto_archiver.core.media
"""
import pytest
from unittest.mock import Mock, patch
from auto_archiver.core.media import Media
class TestMediaBasics:
"""Test basic Media properties and methods."""
def test_media_creation_with_filename(self):
media = Media(filename="test.mp4")
assert media.filename == "test.mp4"
assert media.urls == []
assert media.properties == {}
def test_media_key_property(self):
media = Media(filename="test.mp4", _key="my_key")
assert media.key == "my_key"
def test_media_set_get_properties(self):
media = Media(filename="test.mp4")
result = media.set("author", "John Doe")
assert result is media # returns self for chaining
assert media.get("author") == "John Doe"
assert media.get("nonexistent") is None
assert media.get("nonexistent", "default") == "default"
def test_media_add_url(self):
media = Media(filename="test.mp4")
media.add_url("https://example.com/test.mp4")
assert "https://example.com/test.mp4" in media.urls
media.add_url("https://cdn.example.com/test.mp4")
assert len(media.urls) == 2
class TestMediaMimetype:
"""Test mimetype detection and handling."""
@pytest.mark.parametrize(
"filename,expected_mimetype",
[
("video.mp4", "video/mp4"),
("image.jpg", "image/jpeg"),
("image.png", "image/png"),
("audio.mp3", "audio/mpeg"),
("document.pdf", "application/pdf"),
("text.txt", "text/plain"),
],
)
def test_mimetype_detection(self, filename, expected_mimetype):
media = Media(filename=filename)
assert media.mimetype == expected_mimetype
def test_mimetype_setter(self):
media = Media(filename="file.unknown")
media.mimetype = "custom/type"
assert media.mimetype == "custom/type"
def test_mimetype_empty_filename(self):
media = Media(filename="")
assert media.mimetype == ""
class TestMediaTypeChecks:
"""Test media type checking methods."""
@pytest.mark.parametrize(
"filename,is_video,is_audio,is_image",
[
("video.mp4", True, False, False),
("video.avi", True, False, False),
("audio.mp3", False, True, False),
("audio.wav", False, True, False),
("image.jpg", False, False, True),
("image.png", False, False, True),
("document.pdf", False, False, False),
],
)
def test_type_checks(self, filename, is_video, is_audio, is_image):
media = Media(filename=filename)
assert media.is_video() == is_video
assert media.is_audio() == is_audio
assert media.is_image() == is_image
class TestMediaStore:
"""Test media storage functionality."""
def test_store_with_no_storages(self, caplog):
media = Media(filename="test.mp4")
metadata = Mock()
media.store(metadata, storages=[])
assert "No storages found" in caplog.text
def test_store_with_storage(self):
media = Media(filename="test.mp4")
metadata = Mock()
mock_storage = Mock()
media.store(metadata, url="https://example.com", storages=[mock_storage])
mock_storage.store.assert_called_once()
class TestMediaInnerMedia:
"""Test nested media retrieval."""
def test_all_inner_media_no_nested(self):
media = Media(filename="test.mp4")
inner = list(media.all_inner_media(include_self=False))
assert len(inner) == 0
inner_with_self = list(media.all_inner_media(include_self=True))
assert len(inner_with_self) == 1
assert inner_with_self[0] is media
def test_all_inner_media_with_nested(self):
parent = Media(filename="parent.mp4")
child = Media(filename="child.jpg")
grandchild = Media(filename="grandchild.png")
child.set("thumbnail", grandchild)
parent.set("preview", child)
inner = list(parent.all_inner_media(include_self=False))
assert len(inner) == 2
assert child in inner
assert grandchild in inner
def test_all_inner_media_with_list_property(self):
parent = Media(filename="parent.mp4")
child1 = Media(filename="frame1.jpg")
child2 = Media(filename="frame2.jpg")
parent.set("frames", [child1, child2])
inner = list(parent.all_inner_media(include_self=False))
assert len(inner) == 2
assert child1 in inner
assert child2 in inner
class TestMediaIsStored:
"""Test the is_stored method."""
def test_is_stored_no_urls(self):
media = Media(filename="test.mp4")
storage = Mock()
storage.config = {"steps": {"storages": ["s3", "local"]}}
assert media.is_stored(storage) is False
def test_is_stored_partial_urls(self):
media = Media(filename="test.mp4")
media.add_url("https://s3.example.com/test.mp4")
storage = Mock()
storage.config = {"steps": {"storages": ["s3", "local"]}}
assert media.is_stored(storage) is False
def test_is_stored_full_urls(self):
media = Media(filename="test.mp4")
media.add_url("https://s3.example.com/test.mp4")
media.add_url("file:///local/test.mp4")
storage = Mock()
storage.config = {"steps": {"storages": ["s3", "local"]}}
assert media.is_stored(storage) is True
class TestMediaValidVideo:
"""Test video validation functionality."""
def test_is_valid_video_with_valid_probe(self):
media = Media(filename="test.mp4")
mock_streams = {"streams": [{"duration_ts": 1000}]}
with patch("ffmpeg.probe", return_value=mock_streams):
assert media.is_valid_video() is True
def test_is_valid_video_with_no_duration(self):
media = Media(filename="test.mp4")
mock_streams = {"streams": [{"duration_ts": 0}]}
with patch("ffmpeg.probe", return_value=mock_streams):
assert media.is_valid_video() is False
def test_is_valid_video_with_ffmpeg_error(self):
media = Media(filename="test.mp4")
with patch("ffmpeg.probe", side_effect=Exception("ffmpeg error")):
with patch("os.path.getsize", return_value=100):
# Falls back to file size check, small file
assert media.is_valid_video() is False
with patch("os.path.getsize", return_value=30000):
# Falls back to file size check, larger file
assert media.is_valid_video() is True

View File

@@ -0,0 +1,98 @@
"""
Tests for validators module from auto_archiver.core.validators
"""
import argparse
import json
import pytest
from auto_archiver.core.validators import positive_number, valid_file, json_loader
class TestPositiveNumber:
"""Test the positive_number validator."""
@pytest.mark.parametrize(
"value,expected",
[
(0, 0),
(1, 1),
(100, 100),
(0.5, 0.5),
(999999, 999999),
],
)
def test_positive_values(self, value, expected):
assert positive_number(value) == expected
@pytest.mark.parametrize(
"value",
[
-1,
-100,
-0.5,
-999999,
],
)
def test_negative_values_raise_error(self, value):
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
positive_number(value)
assert "not a positive number" in str(exc_info.value)
class TestValidFile:
"""Test the valid_file validator."""
def test_valid_file_exists(self, tmp_path):
test_file = tmp_path / "test.txt"
test_file.write_text("test content")
result = valid_file(str(test_file))
assert result == str(test_file)
def test_valid_file_not_exists(self):
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
valid_file("/nonexistent/path/to/file.txt")
assert "does not exist" in str(exc_info.value)
def test_valid_file_directory_not_file(self, tmp_path):
# A directory is not a file
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
valid_file(str(tmp_path))
assert "does not exist" in str(exc_info.value)
class TestJsonLoader:
"""Test the json_loader validator."""
@pytest.mark.parametrize(
"json_str,expected",
[
('{"key": "value"}', {"key": "value"}),
('{"number": 123}', {"number": 123}),
('{"list": [1, 2, 3]}', {"list": [1, 2, 3]}),
('{"nested": {"inner": "value"}}', {"nested": {"inner": "value"}}),
("[]", []),
("[1, 2, 3]", [1, 2, 3]),
('"string"', "string"),
("123", 123),
("true", True),
("false", False),
("null", None),
],
)
def test_valid_json(self, json_str, expected):
assert json_loader(json_str) == expected
@pytest.mark.parametrize(
"invalid_json",
[
"{invalid}",
"{'single': 'quotes'}",
"{missing: quotes}",
'{"unclosed": "brace"',
"",
],
)
def test_invalid_json_raises_error(self, invalid_json):
with pytest.raises(json.JSONDecodeError):
json_loader(invalid_json)

View File

@@ -0,0 +1,62 @@
"""
Tests for the ConsoleDb module
"""
import pytest
@pytest.fixture
def console_db(setup_module):
return setup_module("console_db")
class TestConsoleDb:
"""Test the ConsoleDb functionality."""
def test_started_logs_info(self, console_db, make_item, caplog):
"""Test that started() logs an info message."""
item = make_item("https://example.com/test")
with caplog.at_level("INFO"):
console_db.started(item)
assert "STARTED" in caplog.text
assert "example.com" in caplog.text
def test_failed_logs_error(self, console_db, make_item, caplog):
"""Test that failed() logs an error message with reason."""
item = make_item("https://example.com/test")
reason = "Connection timeout"
with caplog.at_level("ERROR"):
console_db.failed(item, reason)
assert "FAILED" in caplog.text
assert "Connection timeout" in caplog.text
def test_aborted_logs_warning(self, console_db, make_item, caplog):
"""Test that aborted() logs a warning message."""
item = make_item("https://example.com/test")
with caplog.at_level("WARNING"):
console_db.aborted(item)
assert "ABORTED" in caplog.text
def test_done_logs_success(self, console_db, make_item, caplog):
"""Test that done() logs a success message."""
item = make_item("https://example.com/test")
with caplog.at_level("INFO"):
console_db.done(item)
assert "DONE" in caplog.text
def test_done_cached(self, console_db, make_item, caplog):
"""Test done() with cached=True (should behave the same)."""
item = make_item("https://example.com/test")
with caplog.at_level("INFO"):
console_db.done(item, cached=True)
assert "DONE" in caplog.text

View File

@@ -0,0 +1,72 @@
"""
Tests for the JsonEnricher module
"""
import json
import os
import pytest
@pytest.fixture
def json_enricher(setup_module):
return setup_module("json_enricher")
class TestJsonEnricher:
"""Test the JsonEnricher functionality."""
def test_enrich_creates_json_file(self, json_enricher, make_item):
"""Test that enrich creates a metadata.json file."""
item = make_item("https://example.com/test")
item.set("title", "Test Title")
item.set("description", "Test description")
json_enricher.enrich(item)
# Check that a media with id 'metadata_json' was added
json_media = item.get_media_by_id("metadata_json")
assert json_media is not None
assert json_media.filename.endswith("metadata.json")
assert os.path.exists(json_media.filename)
def test_enrich_json_content(self, json_enricher, make_item):
"""Test that the JSON content is correct."""
item = make_item("https://example.com/test")
item.set("title", "Test Title")
item.set("custom_field", "custom_value")
json_enricher.enrich(item)
json_media = item.get_media_by_id("metadata_json")
with open(json_media.filename, "r", encoding="utf-8") as f:
content = json.load(f)
# The to_dict() returns nested structure: {status, metadata: {...}, media: [...]}
assert content["metadata"]["title"] == "Test Title"
assert content["metadata"]["custom_field"] == "custom_value"
assert content["metadata"]["url"] == "https://example.com/test"
def test_enrich_handles_special_characters(self, json_enricher, make_item):
"""Test that special characters are handled correctly."""
item = make_item("https://example.com/test")
item.set("title", "Test with émojis 🎉 and üñíçödé")
json_enricher.enrich(item)
json_media = item.get_media_by_id("metadata_json")
with open(json_media.filename, "r", encoding="utf-8") as f:
content = json.load(f)
# Access the nested metadata structure
assert "émojis 🎉" in content["metadata"]["title"]
assert "üñíçödé" in content["metadata"]["title"]
def test_enrich_empty_metadata(self, json_enricher, make_item):
"""Test enriching metadata with minimal content."""
item = make_item("https://example.com/minimal")
json_enricher.enrich(item)
json_media = item.get_media_by_id("metadata_json")
assert json_media is not None
assert os.path.exists(json_media.filename)

View File

@@ -53,6 +53,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
}
@pytest.mark.download
@pytest.mark.flaky(reruns=2, reruns_delay=5)
@pytest.mark.parametrize(
"url,in_title,in_text,image_count,video_count,skip_ci",
[
@@ -60,7 +61,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"https://en.wikipedia.org/wiki/Western_barn_owl",
"western barn owl",
"Tyto alba",
5,
3, # Reduced due to Wikipedia rate limiting (429 errors)
0,
False,
),
@@ -128,6 +129,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
item = make_item(url)
result = self.extractor.download(item)
assert result, f"download() returned {result!r} — Selenium may have failed (e.g., window close timeout)"
assert result.status == "antibot", "Expected status to be 'antibot'"
# Check title contains all required words (case-insensitive)
@@ -142,9 +144,9 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
)
image_media = [m for m in result.media if m.is_image() and not m.get("id") == "screenshot"]
assert len(image_media) == image_count, f"Expected {image_count} image items, got {len(image_media)}"
assert len(image_media) >= image_count, f"Expected at least {image_count} image items, got {len(image_media)}"
video_media = [m for m in result.media if m.is_video()]
assert len(video_media) == video_count, f"Expected {video_count} video items, got {len(video_media)}"
assert len(video_media) >= video_count, f"Expected at least {video_count} video items, got {len(video_media)}"
for expected_id in ["screenshot", "pdf", "html_source_code"]:
assert any(m.get("id") == expected_id for m in result.media), (

View File

@@ -1,3 +1,4 @@
import asyncio
import os
from datetime import date
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
def test_invite_pattern_regex(invite, expected, get_lazy_module):
match = TelethonExtractor.invite_pattern.search(invite)
assert bool(match) == expected
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the Celery worker scenario where the asyncio event loop is closed
before setup() runs. The fix should create a new event loop so that
TelegramClient.start() does not raise 'Event loop is closed'.
"""
# create a session file so setup doesn't fail on missing file
session_file = tmp_path / "test.session"
session_file.touch()
# close the current event loop to simulate a Celery worker environment
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.close()
lazy_module = get_lazy_module("telethon_extractor")
module = lazy_module.load(
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
)
# setup should have succeeded and a new open event loop should exist
new_loop = asyncio.get_event_loop()
assert not new_loop.is_closed()
assert module.client is not None
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the scenario where there is no current event loop at all
(e.g. running in a non-main thread). The fix should create one.
"""
session_file = tmp_path / "test.session"
session_file.touch()
# Remove the current event loop entirely
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
new_loop_mock = mocker.MagicMock()
new_loop_mock.is_closed.return_value = False
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
set_loop = mocker.patch("asyncio.set_event_loop")
lazy_module = get_lazy_module("telethon_extractor")
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
# a new event loop should have been created and set
asyncio.new_event_loop.assert_called_once()
set_loop.assert_called_once_with(new_loop_mock)

View File

@@ -0,0 +1,238 @@
"""
Tests for the Twitter dropin extractor with fxtwitter fallback
"""
import pytest
from unittest.mock import Mock, patch
from auto_archiver.modules.generic_extractor.twitter import Twitter
@pytest.fixture
def twitter_dropin():
return Twitter()
class TestTwitterFxTwitterFallback:
"""Test the fxtwitter API fallback functionality."""
@pytest.fixture
def mock_fxtwitter_video_response(self):
return {
"code": 200,
"message": "OK",
"tweet": {
"url": "https://x.com/user/status/123456789",
"id": "123456789",
"text": "Test tweet with video",
"author": {
"id": "111",
"name": "Test User",
"screen_name": "testuser",
},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"media": {
"all": [
{
"type": "video",
"url": "https://video.twimg.com/test.mp4",
"variants": [
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
{
"url": "https://video.twimg.com/test_480.mp4",
"content_type": "video/mp4",
"bitrate": 632000,
},
{
"url": "https://video.twimg.com/test_720.mp4",
"content_type": "video/mp4",
"bitrate": 2176000,
},
],
}
],
"videos": [
{
"url": "https://video.twimg.com/test.mp4",
"variants": [
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
{
"url": "https://video.twimg.com/test_480.mp4",
"content_type": "video/mp4",
"bitrate": 632000,
},
{
"url": "https://video.twimg.com/test_720.mp4",
"content_type": "video/mp4",
"bitrate": 2176000,
},
],
}
],
},
},
}
@pytest.fixture
def mock_fxtwitter_photo_response(self):
return {
"code": 200,
"message": "OK",
"tweet": {
"url": "https://x.com/user/status/123456790",
"id": "123456790",
"text": "Test tweet with photo",
"author": {
"id": "111",
"name": "Test User",
"screen_name": "testuser",
},
"created_at": "Mon Feb 09 10:30:00 +0000 2026",
"media": {
"all": [
{
"type": "photo",
"url": "https://pbs.twimg.com/media/test.jpg?name=orig",
}
],
"photos": [
{
"type": "photo",
"url": "https://pbs.twimg.com/media/test.jpg?name=orig",
}
],
},
},
}
def test_fetch_fxtwitter_video(self, twitter_dropin, mock_fxtwitter_video_response):
"""Test fetching a tweet with video via fxtwitter API."""
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_fxtwitter_video_response
mock_get.return_value = mock_response
result = twitter_dropin._fetch_fxtwitter("123456789")
assert result["user"]["name"] == "Test User"
assert result["created_at"] == "Sun Feb 08 18:45:00 +0000 2026"
assert result["full_text"] == "Test tweet with video"
assert len(result["entities"]["media"]) == 1
assert result["entities"]["media"][0]["type"] == "video"
assert "video_info" in result["entities"]["media"][0]
assert len(result["entities"]["media"][0]["video_info"]["variants"]) == 3
def test_fetch_fxtwitter_photo(self, twitter_dropin, mock_fxtwitter_photo_response):
"""Test fetching a tweet with photo via fxtwitter API."""
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_fxtwitter_photo_response
mock_get.return_value = mock_response
result = twitter_dropin._fetch_fxtwitter("123456790")
assert result["user"]["name"] == "Test User"
assert result["created_at"] == "Mon Feb 09 10:30:00 +0000 2026"
assert result["full_text"] == "Test tweet with photo"
assert len(result["entities"]["media"]) == 1
assert result["entities"]["media"][0]["type"] == "photo"
assert result["entities"]["media"][0]["media_url_https"] == "https://pbs.twimg.com/media/test.jpg?name=orig"
def test_fetch_fxtwitter_no_media(self, twitter_dropin):
"""Test fetching a text-only tweet via fxtwitter API."""
mock_response_data = {
"code": 200,
"message": "OK",
"tweet": {
"id": "123456791",
"text": "Just text, no media",
"author": {"name": "Text Only User"},
"created_at": "Tue Feb 10 12:00:00 +0000 2026",
"media": {},
},
}
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_response_data
mock_get.return_value = mock_response
result = twitter_dropin._fetch_fxtwitter("123456791")
assert result["user"]["name"] == "Text Only User"
assert result["full_text"] == "Just text, no media"
assert result["entities"]["media"] == []
def test_fetch_fxtwitter_api_error(self, twitter_dropin):
"""Test handling of fxtwitter API errors."""
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 404
mock_get.return_value = mock_response
with pytest.raises(Exception):
twitter_dropin._fetch_fxtwitter("nonexistent")
class TestTwitterChooseVariant:
"""Test the video variant selection logic."""
def test_choose_highest_quality_video(self, twitter_dropin):
"""Test that the highest quality video variant is selected."""
variants = [
{"url": "https://video.twimg.com/vid/320x240/test.mp4", "content_type": "video/mp4"},
{"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"},
{"url": "https://video.twimg.com/vid/640x480/test.mp4", "content_type": "video/mp4"},
]
result = twitter_dropin.choose_variant(variants)
assert result["url"] == "https://video.twimg.com/vid/1280x720/test.mp4"
def test_choose_variant_fallback_for_non_mp4(self, twitter_dropin):
"""Test fallback when no mp4 variant is available."""
variants = [
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
]
result = twitter_dropin.choose_variant(variants)
assert result["url"] == "https://video.twimg.com/test.m3u8"
def test_choose_variant_prefers_mp4(self, twitter_dropin):
"""Test that mp4 is preferred over other formats when quality is equal."""
variants = [
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
{"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"},
]
result = twitter_dropin.choose_variant(variants)
assert result["content_type"] == "video/mp4"
@pytest.mark.download
class TestTwitterFxTwitterLive:
"""Live integration tests for fxtwitter API - requires network access."""
@pytest.mark.parametrize(
"tweet_id,expected_media_type",
[
("2020569571682312581", "video"), # Video tweet
("2020410438198890618", "video"), # Video tweet
("2020341585502957801", "photo"), # Photo tweet
],
)
def test_fetch_real_tweets(self, twitter_dropin, tweet_id, expected_media_type):
"""Test fetching real tweets from fxtwitter API."""
result = twitter_dropin._fetch_fxtwitter(tweet_id)
assert result["user"]["name"] # Author should be non-empty
assert result["created_at"] # Should have timestamp
assert result["full_text"] # Should have text content
media = result["entities"]["media"]
assert len(media) >= 1
assert media[0]["type"] == expected_media_type

View File

@@ -0,0 +1,70 @@
"""
Tests for the CLIFeeder module
"""
import pytest
from auto_archiver.modules.cli_feeder.cli_feeder import CLIFeeder
from auto_archiver.core.consts import SetupError
from auto_archiver.core.metadata import Metadata
@pytest.fixture
def cli_feeder_instance():
"""Create a CLIFeeder instance with mocked config."""
def _create(urls):
feeder = CLIFeeder()
# Mock the config structure that cli_feeder expects
feeder.config = {"urls": urls}
feeder.name = "cli_feeder"
feeder.tmp_dir = "/tmp"
return feeder
return _create
class TestCLIFeeder:
"""Test the CLIFeeder functionality."""
def test_iter_yields_metadata_for_urls(self, cli_feeder_instance):
"""Test that iteration yields Metadata objects for each URL."""
urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"]
feeder = cli_feeder_instance(urls)
feeder.setup()
items = list(feeder)
assert len(items) == 3
assert all(isinstance(item, Metadata) for item in items)
assert items[0].get_url() == "https://example.com/1"
assert items[1].get_url() == "https://example.com/2"
assert items[2].get_url() == "https://example.com/3"
def test_iter_single_url(self, cli_feeder_instance):
"""Test iteration with a single URL."""
feeder = cli_feeder_instance(["https://example.com/single"])
feeder.setup()
items = list(feeder)
assert len(items) == 1
assert items[0].get_url() == "https://example.com/single"
def test_setup_raises_without_urls(self, cli_feeder_instance):
"""Test that setup raises SetupError when no URLs provided."""
feeder = cli_feeder_instance([])
with pytest.raises(SetupError) as exc_info:
feeder.setup()
assert "No URLs provided" in str(exc_info.value)
def test_setup_raises_with_none_urls(self, cli_feeder_instance):
"""Test that setup raises SetupError when urls is None."""
feeder = cli_feeder_instance(None)
with pytest.raises(SetupError) as exc_info:
feeder.setup()
assert "No URLs provided" in str(exc_info.value)

View File

@@ -0,0 +1,43 @@
"""
Tests for the MuteFormatter module
"""
import pytest
from auto_archiver.core.metadata import Metadata
@pytest.fixture
def mute_formatter(setup_module):
return setup_module("mute_formatter")
class TestMuteFormatter:
"""Test the MuteFormatter functionality."""
def test_format_returns_none(self, mute_formatter, make_item):
"""Test that format always returns None (mutes output)."""
item = make_item("https://example.com/test")
item.set("title", "Test Title")
result = mute_formatter.format(item)
assert result is None
def test_format_with_empty_metadata(self, mute_formatter):
"""Test format with empty metadata."""
item = Metadata().set_url("https://example.com/empty")
result = mute_formatter.format(item)
assert result is None
def test_format_with_media(self, mute_formatter, make_item):
"""Test that format still returns None even with media attached."""
from auto_archiver.core.media import Media
item = make_item("https://example.com/with-media")
item.add_media(Media(filename="test.mp4"))
result = mute_formatter.format(item)
assert result is None

View File

@@ -0,0 +1,259 @@
"""
Tests for handling Media objects with None filename.
When download_from_url fails, it returns None. Various enrichers and
the metadata deduplication logic must gracefully handle Media objects
where filename is None, rather than crashing with TypeError.
"""
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.modules.meta_enricher import MetaEnricher
# ── HashEnricher ──────────────────────────────────────────────────────
class TestHashEnricherNoneFilename:
"""hash_enricher should skip media with None filename without crashing."""
@pytest.fixture(autouse=True)
def setup(self, setup_module):
self.enricher = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 100})
def test_skips_none_filename(self):
m = Metadata().set_url("https://example.com")
media = Media(filename=None)
media.set("src", "https://example.com/video.mp4")
m.add_media(media)
# Should not raise
self.enricher.enrich(m)
# No hash should be set
assert m.media[0].get("hash") is None
def test_hashes_valid_skips_none(self, tmp_path):
"""Mix of valid and None-filename media: only valid ones get hashed."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("hello world")
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=str(valid_file)))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert m.media[0].get("hash") is not None
assert m.media[1].get("hash") is None
def test_all_none_filenames(self):
"""All media have None filename enricher should not crash."""
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=None))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert len(m.media) == 2
for media in m.media:
assert media.get("hash") is None
# ── MetaEnricher ──────────────────────────────────────────────────────
class TestMetaEnricherNoneFilename:
"""meta_enricher should skip media with None filename without crashing."""
@pytest.fixture(autouse=True)
def setup(self, setup_module):
self.enricher = setup_module(MetaEnricher, {})
def test_skips_none_filename(self):
m = Metadata().set_url("https://example.com")
m.set("_processed_at", datetime.now(timezone.utc))
media = Media(filename=None)
media.set("src", "https://example.com/video.mp4")
m.add_media(media)
# Should not raise
self.enricher.enrich(m)
assert m.get("total_bytes") == 0
def test_sizes_valid_skips_none(self, tmp_path):
"""Mix of valid and None-filename media: only valid ones get sized."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("A" * 500)
m = Metadata().set_url("https://example.com")
m.set("_processed_at", datetime.now(timezone.utc))
m.add_media(Media(filename=str(valid_file)))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert m.media[0].get("bytes") == 500
assert m.media[1].get("bytes") is None
assert m.get("total_bytes") == 500
# ── Metadata.remove_duplicate_media_by_hash ───────────────────────────
class TestRemoveDuplicateMediaNoneFilename:
"""remove_duplicate_media_by_hash should keep media with None filename."""
def test_none_filename_kept(self):
m = Metadata().set_url("https://example.com")
none_media = Media(filename=None)
none_media.set("src", "https://example.com/video.mp4")
m.add_media(none_media)
m.remove_duplicate_media_by_hash()
assert len(m.media) == 1
assert m.media[0].filename is None
def test_none_and_valid_mixed(self, tmp_path):
"""None-filename media is kept alongside valid-filename media."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("content")
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=str(valid_file)))
none_media = Media(filename=None)
none_media.set("src", "https://example.com/video.mp4")
m.add_media(none_media)
m.remove_duplicate_media_by_hash()
assert len(m.media) == 2
def test_multiple_none_filename_all_kept(self):
"""Multiple None-filename media are all kept (can't deduplicate without file)."""
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=None))
m.add_media(Media(filename=None))
m.remove_duplicate_media_by_hash()
assert len(m.media) == 2
# ── Twitter dropin create_metadata ────────────────────────────────────
class TestTwitterDropinNoneFilename:
"""Twitter dropin should skip media when download_from_url returns None."""
@pytest.fixture
def twitter_dropin(self):
from auto_archiver.modules.generic_extractor.twitter import Twitter
return Twitter()
def test_create_metadata_skips_failed_photo_download(self, twitter_dropin):
"""When download_from_url returns None for a photo, it's not added to media."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with photo",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
]
},
}
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = None # simulate failed download
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
# The result should have no media since the download failed
assert len(result.media) == 0
def test_create_metadata_skips_failed_video_download(self, twitter_dropin):
"""When download_from_url returns None for a video, it's not added to media."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with video",
"entities": {
"media": [
{
"type": "video",
"video_info": {
"variants": [
{
"url": "https://video.twimg.com/vid/1280x720/test.mp4",
"content_type": "video/mp4",
},
]
},
},
]
},
}
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = None
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 0
def test_create_metadata_keeps_successful_download(self, twitter_dropin, tmp_path):
"""When download_from_url succeeds, media is added."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with photo",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
]
},
}
test_file = tmp_path / "test.jpg"
test_file.write_text("fake image data")
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = str(test_file)
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 1
assert result.media[0].filename == str(test_file)
def test_create_metadata_mixed_downloads(self, twitter_dropin, tmp_path):
"""One download succeeds, one fails only successful one is kept."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with two photos",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test1.jpg"},
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test2.jpg"},
]
},
}
test_file = tmp_path / "test1.jpg"
test_file.write_text("fake image data")
mock_archiver = MagicMock()
# First call succeeds, second fails
mock_archiver.download_from_url.side_effect = [str(test_file), None]
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 1
assert result.media[0].filename == str(test_file)