Set up feeder manifests (not merged by source yet)

This commit is contained in:
erinhmclark
2025-01-23 09:16:42 +00:00
parent c517d35bdf
commit 79684f8348
82 changed files with 721 additions and 730 deletions

View File

@@ -0,0 +1,33 @@
{
"name": "Auto-Archiver API Database",
"type": ["database"],
"entry_point": "api_db:AAApiDb",
"requires_setup": True,
"external_dependencies": {
"python": ["requests",
"loguru"],
},
"configs": {
"api_endpoint": {"default": None, "help": "API endpoint where calls are made to"},
"api_token": {"default": None, "help": "API Bearer token."},
"public": {"default": False, "help": "whether the URL should be publicly available via the API"},
"author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"},
"allow_rearchive": {"default": True, "help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived"},
"store_results": {"default": True, "help": "when set, will send the results to the API database."},
"tags": {"default": [], "help": "what tags to add to the archived URL", "cli_set": lambda cli_val, cur_val: set(cli_val.split(","))},
},
"description": """
Provides integration with the Auto-Archiver API for querying and storing archival data.
### Features
- **API Integration**: Supports querying for existing archives and submitting results.
- **Duplicate Prevention**: Avoids redundant archiving when `allow_rearchive` is disabled.
- **Configurable**: Supports settings like API endpoint, authentication token, tags, and permissions.
- **Tagging and Metadata**: Adds tags and manages metadata for archives.
- **Optional Storage**: Archives results conditionally based on configuration.
### Setup
Requires access to an Auto-Archiver API instance and a valid API token.
""",
}

View File

@@ -0,0 +1,59 @@
from typing import Union
import requests, os
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.core import Metadata
class AAApiDb(Database):
"""
Connects to auto-archiver-api instance
"""
name = "auto_archiver_api_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.allow_rearchive = bool(self.allow_rearchive)
self.store_results = bool(self.store_results)
self.assert_valid_string("api_endpoint")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
""" query the database for the existence of this item.
Helps avoid re-archiving the same URL multiple times.
"""
if not self.allow_rearchive: return
params = {"url": item.get_url(), "limit": 15}
headers = {"Authorization": f"Bearer {self.api_token}", "accept": "application/json"}
response = requests.get(os.path.join(self.api_endpoint, "tasks/search-url"), params=params, headers=headers)
if response.status_code == 200:
if len(response.json()):
logger.success(f"API returned {len(response.json())} previously archived instance(s)")
fetched_metadata = [Metadata.from_dict(r["result"]) for r in response.json()]
return Metadata.choose_most_complete(fetched_metadata)
else:
logger.error(f"AA API FAIL ({response.status_code}): {response.json()}")
return False
def done(self, item: Metadata, cached: bool=False) -> None:
"""archival result ready - should be saved to DB"""
if not self.store_results: return
if cached:
logger.debug(f"skipping saving archive of {item.get_url()} to the AA API because it was cached")
return
logger.debug(f"saving archive of {item.get_url()} to the AA API.")
payload = {'result': item.to_json(), 'public': self.public, 'author_id': self.author_id, 'group_id': self.group_id, 'tags': list(self.tags)}
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.post(os.path.join(self.api_endpoint, "submit-archive"), json=payload, headers=headers)
if response.status_code == 200:
logger.success(f"AA API: {response.json()}")
else:
logger.error(f"AA API FAIL ({response.status_code}): {response.json()}")

View File

@@ -0,0 +1,37 @@
{
"name": "Atlos Database",
"type": ["database"],
"entry_point": "atlos_db:AtlosDb",
"requires_setup": True,
"external_dependencies":
{"python": ["loguru",
""],
"bin": [""]},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
},
"description": """
Handles integration with the Atlos platform for managing archival results.
### Features
- Outputs archival results to the Atlos API for storage and tracking.
- Updates failure status with error details when archiving fails.
- Processes and formats metadata, including ISO formatting for datetime fields.
- Skips processing for items without an Atlos ID.
### Setup
Required configs:
- atlos_url: Base URL for the Atlos API.
- api_token: Authentication token for API access.
"""
,
}

View File

@@ -0,0 +1,80 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from auto_archiver.databases import Database
from auto_archiver.core import Metadata
from auto_archiver.utils import get_atlos_config_options
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
name = "atlos_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
# TODO
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -0,0 +1,34 @@
{
"name": "Atlos Feeder",
"type": ["feeder"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
},
"description": """
AtlosFeeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Filters source materials based on visibility, processing status, and metadata.
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
- Iterates through paginated results using a cursor for efficient API interaction.
### Notes
- Requires an Atlos API endpoint and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Handles pagination transparently when retrieving data from the Atlos API.
"""
}

View File

@@ -0,0 +1,57 @@
from loguru import logger
import requests
from auto_archiver.feeders import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import get_atlos_config_options
class AtlosFeeder(Feeder):
name = "atlos_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
# TODO
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
logger.success(f"Processed {count} URL(s)")

View File

@@ -0,0 +1,24 @@
{
"name": "CLI Feeder",
"type": ["feeder"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
"urls": {
"default": None,
"help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
},
"description": """
Processes URLs to archive passed via the command line and feeds them into the archiving pipeline.
### Features
- Takes a single URL or a list of URLs provided via the command line.
- Converts each URL into a `Metadata` object and yields it for processing.
- Ensures URLs are processed only if they are explicitly provided.
"""
}

View File

@@ -0,0 +1,32 @@
from loguru import logger
from auto_archiver.feeders import Feeder
from auto_archiver.core import Metadata, ArchivingContext
class CLIFeeder(Feeder):
name = "cli_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.urls) != list or len(self.urls) == 0:
raise Exception("CLI Feeder did not receive any URL to process")
# @staticmethod
# def configs() -> dict:
# return {
# "urls": {
# "default": None,
# "help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
# "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
# },
# }
def __iter__(self) -> Metadata:
for url in self.urls:
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

View File

@@ -0,0 +1,22 @@
{
"name": "Console Database",
"type": ["database"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"description": """
Provides a simple database implementation that outputs archival results and status updates to the console.
### Features
- Logs the status of archival tasks directly to the console, including:
- started
- failed (with error details)
- aborted
- done (with optional caching status)
- Useful for debugging or lightweight setups where no external database is required.
### Setup
No additional configuration is required.
""",
}

View File

@@ -0,0 +1,28 @@
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.core import Metadata
class ConsoleDb(Database):
"""
Outputs results to the console
"""
name = "console_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
def done(self, item: Metadata, cached: bool=False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item}")

View File

@@ -0,0 +1,22 @@
{
"name": "csv_db",
"type": ["database"],
"requires_setup": False,
"external_dependencies": {"python": ["loguru"]
},
"configs": {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
},
"description": """
Handles exporting archival results to a CSV file.
### Features
- Saves archival metadata as rows in a CSV file.
- Automatically creates the CSV file with a header if it does not exist.
- Appends new metadata entries to the existing file.
### Setup
Required config:
- csv_file: Path to the CSV file where results will be stored (default: "db.csv").
""",
}

View File

@@ -0,0 +1,29 @@
import os
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
from auto_archiver.databases import Database
from auto_archiver.core import Metadata
class CSVDb(Database):
"""
Outputs results to a CSV file
"""
name = "csv_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.assert_valid_string("csv_file")
def done(self, item: Metadata, cached: bool=False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item}")
is_empty = not os.path.isfile(self.csv_file) or os.path.getsize(self.csv_file) == 0
with open(self.csv_file, "a", encoding="utf-8") as outf:
writer = DictWriter(outf, fieldnames=asdict(Metadata()))
if is_empty: writer.writeheader()
writer.writerow(asdict(item))

View File

@@ -0,0 +1,33 @@
{
"name": "CSV Feeder",
"type": ["feeder"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
"bin": [""]
},
"configs": {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
},
"description": """
Reads URLs from CSV files and feeds them into the archiving process.
### Features
- Supports reading URLs from multiple input files, specified as a comma-separated list.
- Allows specifying the column number or name to extract URLs from.
- Skips header rows if the first value is not a valid URL.
- Integrates with the `ArchivingContext` to manage URL feeding.
### Setu N
- Input files should be formatted with one URL per line.
"""
}

View File

@@ -0,0 +1,44 @@
from loguru import logger
import csv
from auto_archiver.feeders import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import url_or_none
class CSVFeeder(Feeder):
name = "csv_feeder"
@staticmethod
def configs() -> dict:
return {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
}
def __iter__(self) -> Metadata:
url_column = self.column or 0
for file in self.files:
with open(file, "r") as f:
reader = csv.reader(f)
first_row = next(reader)
if not(url_or_none(first_row[url_column])):
# it's a header row, skip it
logger.debug(f"Skipping header row: {first_row}")
for row in reader:
url = row[0]
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

View File

@@ -0,0 +1,21 @@
# TODO merge with feeder manifest?
{
"name": "gsheet_db",
"type": ["database"],
"requires_setup": True,
"external_dependencies": {"python": [" loguru"],
},
"description": """
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
""",
}

View File

@@ -0,0 +1,108 @@
from typing import Union, Tuple
import datetime
from urllib.parse import quote
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import GWorksheet
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
name = "gsheet_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress')
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f'Archive failed {reason}')
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, '')
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool=False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == '':
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, 'status', status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid('archive', "\n".join(media.urls))
batch_if_valid('date', True, datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=datetime.timezone.utc).isoformat())
batch_if_valid('title', item.get_title())
batch_if_valid('text', item.get("content", ""))
batch_if_valid('timestamp', item.get_timestamp())
if media: batch_if_valid('hash', media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid('pdq_hash', ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(screenshot, "urls"):
batch_if_valid('screenshot', "\n".join(screenshot.urls))
if (thumbnail := item.get_first_image("thumbnail")):
if hasattr(thumbnail, "urls"):
batch_if_valid('thumbnail', f'=IMAGE("{thumbnail.urls[0]}")')
if (browsertrix := item.get_media_by_id("browsertrix")):
batch_if_valid('wacz', "\n".join(browsertrix.urls))
batch_if_valid('replaywebpage', "\n".join([f'https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}' for wacz in browsertrix.urls]))
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
# TODO: to make gsheet_db less coupled with gsheet_feeder's "gsheet" parameter, this method could 1st try to fetch "gsheet" from ArchivingContext and, if missing, manage its own singleton - not needed for now
if gsheet := ArchivingContext.get("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
print(self.sheet_id)
return gw, row

View File

@@ -0,0 +1,40 @@
{
"name": "Google Sheets Feeder",
"type": ["feeder"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "gspread", "python-slugify"],
},
"configs": {
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Features
- Validates the sheet structure and filters rows based on input configurations.
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
### Notes
- Requires a Google Service Account JSON file for authentication. Suggested location is `secrets/gsheets_service_account.json`.
- Create the sheet using the template provided in the docs.
"""
}

View File

@@ -0,0 +1,105 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import gspread, os
from loguru import logger
from slugify import slugify
# from . import Enricher
from auto_archiver.feeders import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import Gsheets, GWorksheet
class GsheetsFeeder(Gsheets, Feeder):
name = "gsheet_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.gsheets_client = gspread.service_account(filename=self.service_account)
# @staticmethod
# def configs() -> dict:
# return dict(
# Gsheets.configs(),
# ** {
# "allow_worksheets": {
# "default": set(),
# "help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
# "cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
# },
# "block_worksheets": {
# "default": set(),
# "help": "(CSV) explicitly block some worksheets from being processed",
# "cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
# },
# "use_sheet_names_in_stored_paths": {
# "default": True,
# "help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
# }
# })
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, wks in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title):
logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}')
gw = GWorksheet(wks, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}")
continue
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
ArchivingContext.set("gsheet", {"row": row, "worksheet": gw}, keep_on_reset=True)
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
ArchivingContext.set("folder", os.path.join(folder, slugify(self.sheet), slugify(wks.title)), True)
else:
ArchivingContext.set("folder", folder, True)
yield m
logger.success(f'Finished worksheet {wks.title}')
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing

View File

@@ -0,0 +1,27 @@
{
"name": "Hash Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
"algorithm": {"default": "SHA-256", "help": "hash algorithm to use", "choices": ["SHA-256", "SHA3-512"]},
"chunksize": {"default": int(1.6e7), "help": "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"},
},
"description": """
Generates cryptographic hashes for media files to ensure data integrity and authenticity.
### Features
- Calculates cryptographic hashes (SHA-256 or SHA3-512) for media files stored in `Metadata` objects.
- Ensures content authenticity, integrity validation, and duplicate identification.
- Efficiently processes large files by reading file bytes in configurable chunk sizes.
- Supports dynamic configuration of hash algorithms and chunk sizes.
- Updates media metadata with the computed hash value in the format `<algorithm>:<hash>`.
### Notes
- Default hash algorithm is SHA-256, but SHA3-512 is also supported.
- Chunk size defaults to 16 MB but can be adjusted based on memory requirements.
- Useful for workflows requiring hash-based content validation or deduplication.
""",
}

View File

@@ -0,0 +1,68 @@
""" Hash Enricher for generating cryptographic hashes of media files.
The `HashEnricher` calculates cryptographic hashes (e.g., SHA-256, SHA3-512)
for media files stored in `Metadata` objects. These hashes are used for
validating content integrity, ensuring data authenticity, and identifying
exact duplicates. The hash is computed by reading the file's bytes in chunks,
making it suitable for handling large files efficiently.
"""
import hashlib
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata, ArchivingContext
class HashEnricher(Enricher):
"""
Calculates hashes for Media instances
"""
name = "hash_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
algos = self.configs()["algorithm"]
algo_choices = algos["choices"]
if not getattr(self, 'algorithm', None):
if not config.get('algorithm'):
logger.warning(f"No hash algorithm selected, defaulting to {algos['default']}")
self.algorithm = algos["default"]
else:
self.algorithm = config["algorithm"]
assert self.algorithm in algo_choices, f"Invalid hash algorithm selected, must be one of {algo_choices} (you selected {self.algorithm})."
if not getattr(self, 'chunksize', None):
if config.get('chunksize'):
self.chunksize = config["chunksize"]
else:
self.chunksize = self.configs()["chunksize"]["default"]
self.chunksize = int(self.chunksize)
assert self.chunksize >= -1, "read length must be non-negative or -1"
ArchivingContext.set("hash_enricher.algorithm", self.algorithm, keep_on_reset=True)
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"calculating media hashes for {url=} (using {self.algorithm})")
for i, m in enumerate(to_enrich.media):
if len(hd := self.calculate_hash(m.filename)):
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")
def calculate_hash(self, filename) -> str:
hash = None
if self.algorithm == "SHA-256":
hash = hashlib.sha256()
elif self.algorithm == "SHA3-512":
hash = hashlib.sha3_512()
else: return ""
with open(filename, "rb") as f:
while True:
buf = f.read(self.chunksize)
if not buf: break
hash.update(buf)
return hash.hexdigest()

View File

@@ -8,7 +8,7 @@
"retrying",
"tqdm",],
},
"no_setup_required": False,
"requires_setup": True,
"configs": {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
@@ -25,5 +25,22 @@
"help": "if true, will remove empty values from the json output",
},
},
"description": "",
"description": """
Archives various types of Instagram content using the Instagrapi API.
### Features
- Connects to an Instagrapi API deployment to fetch Instagram profiles, posts, stories, highlights, reels, and tagged content.
- Supports advanced configuration options, including:
- Full profile download (all posts, stories, highlights, and tagged content).
- Limiting the number of posts to fetch for large profiles.
- Minimising JSON output to remove empty fields and redundant data.
- Provides robust error handling and retries for API calls.
- Ensures efficient media scraping, including handling nested or carousel media items.
- Adds downloaded media and metadata to the result for further processing.
### Notes
- Requires a valid Instagrapi API token (`access_token`) and API endpoint (`api_endpoint`).
- Full-profile downloads can be limited by setting `full_profile_max_posts`.
- Designed to fetch content in batches for large profiles, minimising API load.
""",
}

View File

@@ -45,25 +45,6 @@ class InstagramAPIArchiver(Archiver):
self.full_profile = bool(self.full_profile)
self.minimize_json_output = bool(self.minimize_json_output)
@staticmethod
def configs() -> dict:
return {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {
"default": False,
"help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information.",
},
"full_profile_max_posts": {
"default": 0,
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights",
},
"minimize_json_output": {
"default": True,
"help": "if true, will remove empty values from the json output",
},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -3,10 +3,12 @@
"type": ["extractor"],
"entry_point": "instagram_archiver:InstagramArchiver",
"external_dependencies": {
"python": ["instaloader",
"loguru",],
"python": [
"instaloader",
"loguru",
],
},
"no_setup_required": False,
"requires_setup": True,
"configs": {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {

View File

@@ -45,16 +45,7 @@ class InstagramArchiver(Archiver):
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
@staticmethod
def configs() -> dict:
return {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {"default": None, "help": "the corresponding Instagram account password"},
"download_folder": {"default": "instaloader", "help": "name of a folder to temporarily download content to"},
"session_file": {"default": "secrets/instaloader.session", "help": "path to the instagram session which saves session credentials"},
#TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -34,15 +34,6 @@ class InstagramTbotArchiver(Archiver):
self.assert_valid_string("api_hash")
self.timeout = int(self.timeout)
@staticmethod
def configs() -> dict:
return {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"session_file": {"default": "secrets/anon-insta", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
}
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup

View File

@@ -0,0 +1,22 @@
{
"name": "Archive Metadata Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"description": """
Adds metadata information about the archive operations, Adds metadata about archive operations, including file sizes and archive duration./
To be included at the end of all enrichments.
### Features
- Calculates the total size of all archived media files, storing the result in human-readable and byte formats.
- Computes the duration of the archival process, storing the elapsed time in seconds.
- Ensures all enrichments are performed only if the `Metadata` object contains valid data.
- Adds detailed metadata to provide insights into file sizes and archival performance.
### Notes
- Skips enrichment if no media or metadata is available in the `Metadata` object.
- File sizes are calculated using the `os.stat` module, ensuring accurate byte-level reporting.
""",
}

View File

@@ -0,0 +1,55 @@
import datetime
import os
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata
class MetaEnricher(Enricher):
"""
Adds metadata information about the archive operations, to be included at the end of all enrichments
"""
name = "meta_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if to_enrich.is_empty():
logger.debug(f"[SKIP] META_ENRICHER there is no media or metadata to enrich: {url=}")
return
logger.debug(f"calculating archive metadata information for {url=}")
self.enrich_file_sizes(to_enrich)
self.enrich_archive_duration(to_enrich)
def enrich_file_sizes(self, to_enrich: Metadata):
logger.debug(f"calculating archive file sizes for url={to_enrich.get_url()} ({len(to_enrich.media)} media files)")
total_size = 0
for media in to_enrich.get_all_media():
file_stats = os.stat(media.filename)
media.set("bytes", file_stats.st_size)
media.set("size", self.human_readable_bytes(file_stats.st_size))
total_size += file_stats.st_size
to_enrich.set("total_bytes", total_size)
to_enrich.set("total_size", self.human_readable_bytes(total_size))
def human_readable_bytes(self, size: int) -> str:
# receives number of bytes and returns human readble size
for unit in ["bytes", "KB", "MB", "GB", "TB"]:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
def enrich_archive_duration(self, to_enrich):
logger.debug(f"calculating archive duration for url={to_enrich.get_url()} ")
archive_duration = datetime.datetime.now(datetime.timezone.utc) - to_enrich.get("_processed_at")
to_enrich.set("archive_duration_seconds", archive_duration.seconds)

View File

@@ -0,0 +1,22 @@
{
"name": "Media Metadata Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
"bin": ["exiftool"]
},
"description": """
Extracts metadata information from files using ExifTool.
### Features
- Uses ExifTool to extract detailed metadata from media files.
- Processes file-specific data like camera settings, geolocation, timestamps, and other embedded metadata.
- Adds extracted metadata to the corresponding `Media` object within the `Metadata`.
### Notes
- Requires ExifTool to be installed and accessible via the system's PATH.
- Skips enrichment for files where metadata extraction fails.
"""
}

View File

@@ -0,0 +1,44 @@
import subprocess
import traceback
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata
class MetadataEnricher(Enricher):
"""
Extracts metadata information from files using exiftool.
"""
name = "metadata_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"extracting EXIF metadata for {url=}")
for i, m in enumerate(to_enrich.media):
if len(md := self.get_metadata(m.filename)):
to_enrich.media[i].set("metadata", md)
def get_metadata(self, filename: str) -> dict:
try:
# Run ExifTool command to extract metadata from the file
cmd = ['exiftool', filename]
result = subprocess.run(cmd, capture_output=True, text=True)
# Process the output to extract individual metadata fields
metadata = {}
for line in result.stdout.splitlines():
field, value = line.strip().split(':', 1)
metadata[field.strip()] = value.strip()
return metadata
except FileNotFoundError:
logger.error("[exif_enricher] ExifTool not found. Make sure ExifTool is installed and added to PATH.")
except Exception as e:
logger.error(f"Error occurred: {e}: {traceback.format_exc()}")
return {}

View File

@@ -0,0 +1,21 @@
{
"name": "PDQ Hash Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "pdqhash", "numpy", "Pillow"],
},
"description": """
PDQ Hash Enricher for generating perceptual hashes of media files.
### Features
- Calculates perceptual hashes for image files using the PDQ hashing algorithm.
- Enables detection of duplicate or near-duplicate visual content.
- Processes images stored in `Metadata` objects, adding computed hashes to the corresponding `Media` entries.
- Skips non-image media or files unsuitable for hashing (e.g., corrupted or unsupported formats).
### Notes
- Best used after enrichers like `thumbnail_enricher` or `screenshot_enricher` to ensure images are available.
- Uses the `pdqhash` library to compute 256-bit perceptual hashes, which are stored as hexadecimal strings.
"""
}

View File

@@ -0,0 +1,60 @@
"""
PDQ Hash Enricher for generating perceptual hashes of media files.
The `PdqHashEnricher` processes media files (e.g., images) in `Metadata`
objects and calculates perceptual hashes using the PDQ hashing algorithm.
These hashes are designed specifically for images and can be used
for detecting duplicate or near-duplicate visual content.
This enricher is typically used after thumbnail or screenshot enrichers
to ensure images are available for hashing.
"""
import traceback
import pdqhash
import numpy as np
from PIL import Image, UnidentifiedImageError
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata
class PdqHashEnricher(Enricher):
"""
Calculates perceptual hashes for Media instances using PDQ, allowing for (near-)duplicate detection.
Ideally this enrichment is orchestrated to run after the thumbnail_enricher.
"""
name = "pdq_hash_enricher"
def __init__(self, config: dict) -> None:
# Without this STEP.__init__ is not called
super().__init__(config)
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"calculating perceptual hashes for {url=}")
media_with_hashes = []
for m in to_enrich.media:
for media in m.all_inner_media(True):
media_id = media.get("id", "")
if media.is_image() and "screenshot" not in media_id and "warc-file-" not in media_id and len(hd := self.calculate_pdq_hash(media.filename)):
media.set("pdq_hash", hd)
media_with_hashes.append(media.filename)
logger.debug(f"calculated '{len(media_with_hashes)}' perceptual hashes for {url=}: {media_with_hashes}")
def calculate_pdq_hash(self, filename):
# returns a hexadecimal string with the perceptual hash for the given filename
try:
with Image.open(filename) as img:
# convert the image to RGB
image_rgb = np.array(img.convert("RGB"))
# compute the 256-bit PDQ hash (we do not store the quality score)
hash_array, _ = pdqhash.compute(image_rgb)
hash = "".join(str(b) for b in hash_array)
return hex(int(hash, 2))[2:]
except UnidentifiedImageError as e:
logger.error(f"Image {filename=} is likely corrupted or in unsupported format {e}: {traceback.format_exc()}")
return ""

View File

@@ -0,0 +1,30 @@
{
"name": "Screenshot Enricher",
"type": ["enricher"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "selenium"],
"bin": ["chromedriver"]
},
"configs": {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
},
"description": """
Captures screenshots and optionally saves web pages as PDFs using a WebDriver.
### Features
- Takes screenshots of web pages, with configurable width, height, and timeout settings.
- Optionally saves pages as PDFs, with additional configuration for PDF printing options.
- Bypasses URLs detected as authentication walls.
- Integrates seamlessly with the metadata enrichment pipeline, adding screenshots and PDFs as media.
### Notes
- Requires a WebDriver (e.g., ChromeDriver) installed and accessible via the system's PATH.
"""
}

View File

@@ -0,0 +1,57 @@
from loguru import logger
import time, os
import base64
from selenium.common.exceptions import TimeoutException
from auto_archiver.enrichers import Enricher
from auto_archiver.utils import Webdriver, UrlUtil, random_str
from auto_archiver.core import Media, Metadata, ArchivingContext
class ScreenshotEnricher(Enricher):
name = "screenshot_enricher"
def __init__(self, config: dict) -> None:
super().__init__(config)
# TODO?
# @staticmethod
# def configs() -> dict:
# return {
# "width": {"default": 1280, "help": "width of the screenshots"},
# "height": {"default": 720, "help": "height of the screenshots"},
# "timeout": {"default": 60, "help": "timeout for taking the screenshot"},
# "sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
# "http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
# "save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
# "print_options": {"default": {}, "help": "options to pass to the pdf printer"}
# }
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug(f"[SKIP] SCREENSHOT since url is behind AUTH WALL: {url=}")
return
logger.debug(f"Enriching screenshot for {url=}")
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy, print_options=self.print_options) as driver:
try:
driver.get(url)
time.sleep(int(self.sleep_before_screenshot))
screenshot_file = os.path.join(ArchivingContext.get_tmp_dir(), f"screenshot_{random_str(8)}.png")
driver.save_screenshot(screenshot_file)
to_enrich.add_media(Media(filename=screenshot_file), id="screenshot")
if self.save_to_pdf:
pdf_file = os.path.join(ArchivingContext.get_tmp_dir(), f"pdf_{random_str(8)}.pdf")
pdf = driver.print_page(driver.print_options)
with open(pdf_file, "wb") as f:
f.write(base64.b64decode(pdf))
to_enrich.add_media(Media(filename=pdf_file), id="pdf")
except TimeoutException:
logger.info("TimeoutException loading page for screenshot")
except Exception as e:
logger.error(f"Got error while loading webdriver for screenshot enricher: {e}")

View File

@@ -0,0 +1,22 @@
{
"name": "SSL Certificate Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "python-slugify"],
},
"configs": {
"skip_when_nothing_archived": {"default": True, "help": "if true, will skip enriching when no media is archived"},
},
"description": """
Retrieves SSL certificate information for a domain and stores it as a file.
### Features
- Fetches SSL certificates for domains using the HTTPS protocol.
- Stores certificates in PEM format and adds them as media to the metadata.
- Skips enrichment if no media has been archived, based on the `skip_when_nothing_archived` configuration.
### Notes
- Requires the target URL to use the HTTPS scheme; other schemes are not supported.
"""
}

View File

@@ -0,0 +1,33 @@
import ssl, os
from slugify import slugify
from urllib.parse import urlparse
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media
class SSLEnricher(Enricher):
"""
Retrieves SSL certificate information for a domain, as a file
"""
name = "ssl_enricher"
def __init__(self, config: dict) -> None:
super().__init__(config)
self.skip_when_nothing_archived = bool(self.skip_when_nothing_archived)
def enrich(self, to_enrich: Metadata) -> None:
if not to_enrich.media and self.skip_when_nothing_archived: return
url = to_enrich.get_url()
parsed = urlparse(url)
assert parsed.scheme in ["https"], f"Invalid URL scheme {url=}"
domain = parsed.netloc
logger.debug(f"fetching SSL certificate for {domain=} in {url=}")
cert = ssl.get_server_certificate((domain, 443))
cert_fn = os.path.join(ArchivingContext.get_tmp_dir(), f"{slugify(domain)}.pem")
with open(cert_fn, "w") as f: f.write(cert)
to_enrich.add_media(Media(filename=cert_fn), id="ssl_certificate")

View File

@@ -16,9 +16,6 @@ class TelegramArchiver(Archiver):
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -21,7 +21,7 @@
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
# TODO
#"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
}
},
"description": """

View File

@@ -23,20 +23,6 @@ class TelethonArchiver(Archiver):
self.assert_valid_string("api_id")
self.assert_valid_string("api_hash")
@staticmethod
def configs() -> dict:
return {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"join_channels": {"default": True, "help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"channel_invites": {
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
}
}
def setup(self) -> None:
"""

View File

@@ -0,0 +1,27 @@
{
"name": "Thumbnail Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "ffmpeg-python"],
"bin": ["ffmpeg"]
},
"configs": {
"thumbnails_per_minute": {"default": 60, "help": "how many thumbnails to generate per minute of video, can be limited by max_thumbnails"},
"max_thumbnails": {"default": 16, "help": "limit the number of thumbnails to generate per video, 0 means no limit"},
},
"description": """
Generates thumbnails for video files to provide visual previews.
### Features
- Processes video files and generates evenly distributed thumbnails.
- Calculates the number of thumbnails based on video duration, `thumbnails_per_minute`, and `max_thumbnails`.
- Distributes thumbnails equally across the video's duration and stores them as media objects.
- Adds metadata for each thumbnail, including timestamps and IDs.
### Notes
- Requires `ffmpeg` to be installed and accessible via the system's PATH.
- Handles videos without pre-existing duration metadata by probing with `ffmpeg`.
- Skips enrichment for non-video media files.
"""
}

View File

@@ -0,0 +1,69 @@
"""Thumbnail Enricher for generating visual previews of video files.
The `ThumbnailEnricher` processes video files in `Metadata` objects and
creates evenly distributed thumbnail images. These thumbnails provide
visual snapshots of the video's keyframes, helping users preview content
and identify important moments without watching the entire video.
"""
import ffmpeg, os
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Media, Metadata, ArchivingContext
from auto_archiver.utils.misc import random_str
class ThumbnailEnricher(Enricher):
"""
Generates thumbnails for all the media
"""
name = "thumbnail_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.thumbnails_per_second = int(self.thumbnails_per_minute) / 60
self.max_thumbnails = int(self.max_thumbnails)
def enrich(self, to_enrich: Metadata) -> None:
"""
Uses or reads the video duration to generate thumbnails
Calculates how many thumbnails to generate and at which timestamps based on the video duration, the number of thumbnails per minute and the max number of thumbnails.
Thumbnails are equally distributed across the video duration.
"""
logger.debug(f"generating thumbnails for {to_enrich.get_url()}")
for m_id, m in enumerate(to_enrich.media[::]):
if m.is_video():
folder = os.path.join(ArchivingContext.get_tmp_dir(), random_str(24))
os.makedirs(folder, exist_ok=True)
logger.debug(f"generating thumbnails for {m.filename}")
duration = m.get("duration")
if duration is None:
try:
probe = ffmpeg.probe(m.filename)
duration = float(next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')['duration'])
to_enrich.media[m_id].set("duration", duration)
except Exception as e:
logger.error(f"error getting duration of video {m.filename}: {e}")
return
num_thumbs = int(min(max(1, duration * self.thumbnails_per_second), self.max_thumbnails))
timestamps = [duration / (num_thumbs + 1) * i for i in range(1, num_thumbs + 1)]
thumbnails_media = []
for index, timestamp in enumerate(timestamps):
output_path = os.path.join(folder, f"out{index}.jpg")
ffmpeg.input(m.filename, ss=timestamp).filter('scale', 512, -1).output(output_path, vframes=1, loglevel="quiet").run()
try:
thumbnails_media.append(Media(
filename=output_path)
.set("id", f"thumbnail_{index}")
.set("timestamp", "%.3fs" % timestamp)
)
except Exception as e:
logger.error(f"error creating thumbnail {index} for media: {e}")
to_enrich.media[m_id].set("thumbnails", thumbnails_media)

View File

@@ -0,0 +1,40 @@
{
"name": "Timestamping Enricher",
"type": ["enricher"],
"requires_setup": True,
"external_dependencies": {
"python": [
"loguru",
"slugify",
"tsp_client",
"asn1crypto",
"certvalidator",
"certifi"
],
},
"configs": {
"tsa_urls": {
"default": [
"http://timestamp.digicert.com",
"http://timestamp.identrust.com",
"http://timestamp.globalsign.com/tsa/r6advanced1",
"http://tss.accv.es:8318/tsa"
],
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
}
},
"description": """
Generates RFC3161-compliant timestamp tokens using Time Stamp Authorities (TSA) for archived files.
### Features
- Creates timestamp tokens to prove the existence of files at a specific time, useful for legal and authenticity purposes.
- Aggregates file hashes into a text file and timestamps the concatenated data.
- Uses multiple Time Stamp Authorities (TSAs) to ensure reliability and redundancy.
- Validates timestamping certificates against trusted Certificate Authorities (CAs) using the `certifi` trust store.
### Notes
- Should be run after the `hash_enricher` to ensure file hashes are available.
- Requires internet access to interact with the configured TSAs.
"""
}

View File

@@ -0,0 +1,136 @@
import os
from loguru import logger
from tsp_client import TSPSigner, SigningSettings, TSPVerifier
from tsp_client.algorithms import DigestAlgorithm
from importlib.metadata import version
from asn1crypto.cms import ContentInfo
from certvalidator import CertificateValidator, ValidationContext
from asn1crypto import pem
import certifi
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media
from auto_archiver.archivers import Archiver
class TimestampingEnricher(Enricher):
"""
Uses several RFC3161 Time Stamp Authorities to generate a timestamp token that will be preserved. This can be used to prove that a certain file existed at a certain time, useful for legal purposes, for example, to prove that a certain file was not tampered with after a certain date.
The information that gets timestamped is concatenation (via paragraphs) of the file hashes existing in the current archive. It will depend on which archivers and enrichers ran before this one. Inner media files (like thumbnails) are not included in the .txt file. It should run AFTER the hash_enricher.
See https://gist.github.com/Manouchehri/fd754e402d98430243455713efada710 for list of timestamp authorities.
"""
name = "timestamping_enricher"
def __init__(self, config: dict) -> None:
super().__init__(config)
# @staticmethod
# def configs() -> dict:
# return {
# "tsa_urls": {
# "default": [
# # [Adobe Approved Trust List] and [Windows Cert Store]
# "http://timestamp.digicert.com",
# "http://timestamp.identrust.com",
# # "https://timestamp.entrust.net/TSS/RFC3161sha2TS", # not valid for timestamping
# # "https://timestamp.sectigo.com", # wait 15 seconds between each request.
#
# # [Adobe: European Union Trusted Lists].
# # "https://timestamp.sectigo.com/qualified", # wait 15 seconds between each request.
#
# # [Windows Cert Store]
# "http://timestamp.globalsign.com/tsa/r6advanced1",
#
# # [Adobe: European Union Trusted Lists] and [Windows Cert Store]
# # "http://ts.quovadisglobal.com/eu", # not valid for timestamping
# # "http://tsa.belgium.be/connect", # self-signed certificate in certificate chain
# # "https://timestamp.aped.gov.gr/qtss", # self-signed certificate in certificate chain
# # "http://tsa.sep.bg", # self-signed certificate in certificate chain
# # "http://tsa.izenpe.com", #unable to get local issuer certificate
# # "http://kstamp.keynectis.com/KSign", # unable to get local issuer certificate
# "http://tss.accv.es:8318/tsa",
# ],
# "help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
# "cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
# }
# }
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"RFC3161 timestamping existing files for {url=}")
# create a new text file with the existing media hashes
hashes = [m.get("hash").replace("SHA-256:", "").replace("SHA3-512:", "") for m in to_enrich.media if m.get("hash")]
if not len(hashes):
logger.warning(f"No hashes found in {url=}")
return
tmp_dir = ArchivingContext.get_tmp_dir()
hashes_fn = os.path.join(tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
with open(hashes_fn, "w") as f:
f.write(data_to_sign)
hashes_media = Media(filename=hashes_fn)
timestamp_tokens = []
from slugify import slugify
for tsa_url in self.tsa_urls:
try:
signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256)
signer = TSPSigner()
message = bytes(data_to_sign, encoding='utf8')
# send TSQ and get TSR from the TSA server
signed = signer.sign(message=message, signing_settings=signing_settings)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs
TSPVerifier(certifi.where()).verify(signed, message=message)
# download and verify timestamping certificate
cert_chain = self.download_and_verify_certificate(signed)
# continue with saving the timestamp token
tst_fn = os.path.join(tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_fn, "wb") as f: f.write(signed)
timestamp_tokens.append(Media(filename=tst_fn).set("tsa", tsa_url).set("cert_chain", cert_chain))
except Exception as e:
logger.warning(f"Error while timestamping {url=} with {tsa_url=}: {e}")
if len(timestamp_tokens):
hashes_media.set("timestamp_authority_files", timestamp_tokens)
hashes_media.set("certifi v", version("certifi"))
hashes_media.set("tsp_client v", version("tsp_client"))
hashes_media.set("certvalidator v", version("certvalidator"))
to_enrich.add_media(hashes_media, id="timestamped_hashes")
to_enrich.set("timestamped", True)
logger.success(f"{len(timestamp_tokens)} timestamp tokens created for {url=}")
else:
logger.warning(f"No successful timestamps for {url=}")
def download_and_verify_certificate(self, signed: bytes) -> list[Media]:
# returns the leaf certificate URL, fails if not set
tst = ContentInfo.load(signed)
trust_roots = []
with open(certifi.where(), 'rb') as f:
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
trust_roots.append(der_bytes)
context = ValidationContext(trust_roots=trust_roots)
certificates = tst["content"]["certificates"]
first_cert = certificates[0].dump()
intermediate_certs = []
for i in range(1, len(certificates)): # cannot use list comprehension [1:]
intermediate_certs.append(certificates[i].dump())
validator = CertificateValidator(first_cert, intermediate_certs=intermediate_certs, validation_context=context)
path = validator.validate_usage({'digital_signature'}, extended_key_usage={'time_stamping'})
cert_chain = []
for cert in path:
cert_fn = os.path.join(ArchivingContext.get_tmp_dir(), f"{str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.dump())
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.native["common_name"]))
return cert_chain

View File

@@ -12,7 +12,8 @@
},
"configs": {
"bearer_token": {"default": None, "help": "[deprecated: see bearer_tokens] twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line"},
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))},
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
"access_token": {"default": None, "help": "twitter API access_token"},

View File

@@ -34,17 +34,6 @@ class TwitterApiArchiver(Archiver):
access_token=self.access_token, access_secret=self.access_secret))
assert self.api_client is not None, "Missing Twitter API configurations, please provide either AND/OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver, you can provide both for better rate-limit results."
@staticmethod
def configs() -> dict:
return {
"bearer_token": {"default": None, "help": "[deprecated: see bearer_tokens] twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line", "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))},
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
"access_token": {"default": None, "help": "twitter API access_token"},
"access_secret": {"default": None, "help": "twitter API access_secret"},
}
@property # getter .mimetype
def api_client(self) -> str:
return self.apis[self.api_index]

View File

@@ -19,14 +19,6 @@ class VkArchiver(Archiver):
self.assert_valid_string("password")
self.vks = VkScraper(self.username, self.password, session_file=self.session_file)
@staticmethod
def configs() -> dict:
return {
"username": {"default": None, "help": "valid VKontakte username"},
"password": {"default": None, "help": "valid VKontakte password"},
"session_file": {"default": "secrets/vk_config.v2.json", "help": "valid VKontakte password"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -0,0 +1,39 @@
{
"name": "WACZ Enricher",
"type": ["enricher", "archiver"],
"requires_setup": True,
"external_dependencies": {
"python": [
"loguru",
"jsonlines",
"warcio"
],
# TODO?
"bin": [
"docker"
]
},
"configs": {
"profile": {"default": None, "help": "browsertrix-profile (for profile generation see https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles)."},
"docker_commands": {"default": None, "help":"if a custom docker invocation is needed"},
"timeout": {"default": 120, "help": "timeout for WACZ generation in seconds"},
"extract_media": {"default": False, "help": "If enabled all the images/videos/audio present in the WACZ archive will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"extract_screenshot": {"default": True, "help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"},
"socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"proxy_server": {"default": None, "help": "SOCKS server proxy URL, in development"},
},
"description": """
Creates .WACZ archives of web pages using the `browsertrix-crawler` tool, with options for media extraction and screenshot saving.
### Features
- Archives web pages into .WACZ format using Docker or direct invocation of `browsertrix-crawler`.
- Supports custom profiles for archiving private or dynamic content.
- Extracts media (images, videos, audio) and screenshots from the archive, optionally adding them to the enrichment pipeline.
- Generates metadata from the archived page's content and structure (e.g., titles, text).
### Notes
- Requires Docker for running `browsertrix-crawler` unless explicitly disabled.
- Configurable via parameters for timeout, media extraction, screenshots, and proxy settings.
"""
}

View File

@@ -0,0 +1,228 @@
import jsonlines
import mimetypes
import os, shutil, subprocess
from zipfile import ZipFile
from loguru import logger
from warcio.archiveiterator import ArchiveIterator
from auto_archiver.core import Media, Metadata, ArchivingContext
from auto_archiver.enrichers import Enricher
from auto_archiver.archivers import Archiver
from auto_archiver.utils import UrlUtil, random_str
class WaczArchiverEnricher(Enricher, Archiver):
"""
Uses https://github.com/webrecorder/browsertrix-crawler to generate a .WACZ archive of the URL
If used with [profiles](https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles)
it can become quite powerful for archiving private content.
When used as an archiver it will extract the media from the .WACZ archive so it can be enriched.
"""
name = "wacz_archiver_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def setup(self) -> None:
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
self.browsertrix_home_host = os.environ.get('BROWSERTRIX_HOME_HOST')
self.browsertrix_home_container = os.environ.get('BROWSERTRIX_HOME_CONTAINER') or self.browsertrix_home_host
# create crawls folder if not exists, so it can be safely removed in cleanup
if self.docker_in_docker:
os.makedirs(self.cwd_dind, exist_ok=True)
def cleanup(self) -> None:
if self.docker_in_docker:
logger.debug(f"Removing {self.cwd_dind=}")
shutil.rmtree(self.cwd_dind, ignore_errors=True)
def download(self, item: Metadata) -> Metadata:
# this new Metadata object is required to avoid duplication
result = Metadata()
result.merge(item)
if self.enrich(result):
return result.success("wacz")
def enrich(self, to_enrich: Metadata) -> bool:
if to_enrich.get_media_by_id("browsertrix"):
logger.info(f"WACZ enricher had already been executed: {to_enrich.get_media_by_id('browsertrix')}")
return True
url = to_enrich.get_url()
collection = random_str(8)
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(ArchivingContext.get_tmp_dir())
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
cmd = [
"crawl",
"--url", url,
"--scopeType", "page",
"--generateWACZ",
"--text", "to-pages",
"--screenshot", "fullPage",
"--collection", collection,
"--id", collection,
"--saveState", "never",
"--behaviors", "autoscroll,autoplay,autofetch,siteSpecific",
"--behaviorTimeout", str(self.timeout),
"--timeout", str(self.timeout),
"--blockAds" # TODO: test
]
if self.docker_in_docker:
cmd.extend(["--cwd", self.cwd_dind])
# call docker if explicitly enabled or we are running on the host (not in docker)
if self.use_docker:
logger.debug(f"generating WACZ in Docker for {url=}")
logger.debug(f"{browsertrix_home_host=} {browsertrix_home_container=}")
if self.docker_commands:
cmd = self.docker_commands + cmd
else:
cmd = ["docker", "run", "--rm", "-v", f"{browsertrix_home_host}:/crawls/", "webrecorder/browsertrix-crawler"] + cmd
if self.profile:
profile_fn = os.path.join(browsertrix_home_container, "profile.tar.gz")
logger.debug(f"copying {self.profile} to {profile_fn}")
shutil.copyfile(self.profile, profile_fn)
cmd.extend(["--profile", os.path.join("/crawls", "profile.tar.gz")])
else:
logger.debug(f"generating WACZ without Docker for {url=}")
if self.profile:
cmd.extend(["--profile", os.path.join("/app", str(self.profile))])
try:
logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}")
my_env = os.environ.copy()
if self.proxy_server:
logger.debug("Using PROXY_SERVER proxy for browsertrix-crawler")
my_env["PROXY_SERVER"] = self.proxy_server
elif self.socks_proxy_host and self.socks_proxy_port:
logger.debug("Using SOCKS proxy for browsertrix-crawler")
my_env["SOCKS_HOST"] = self.socks_proxy_host
my_env["SOCKS_PORT"] = str(self.socks_proxy_port)
subprocess.run(cmd, check=True, env=my_env)
except Exception as e:
logger.error(f"WACZ generation failed: {e}")
return False
if self.docker_in_docker:
wacz_fn = os.path.join(self.cwd_dind, "collections", collection, f"{collection}.wacz")
elif self.use_docker:
wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz")
else:
wacz_fn = os.path.join("collections", collection, f"{collection}.wacz")
if not os.path.exists(wacz_fn):
logger.warning(f"Unable to locate and upload WACZ {wacz_fn=}")
return False
to_enrich.add_media(Media(wacz_fn), "browsertrix")
if self.extract_media or self.extract_screenshot:
self.extract_media_from_wacz(to_enrich, wacz_fn)
if self.docker_in_docker:
jsonl_fn = os.path.join(self.cwd_dind, "collections", collection, "pages", "pages.jsonl")
elif self.use_docker:
jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl")
else:
jsonl_fn = os.path.join("collections", collection, "pages", "pages.jsonl")
if not os.path.exists(jsonl_fn):
logger.warning(f"Unable to locate and pages.jsonl {jsonl_fn=}")
else:
logger.info(f"Parsing pages.jsonl {jsonl_fn=}")
with jsonlines.open(jsonl_fn) as reader:
for obj in reader:
if 'title' in obj:
to_enrich.set_title(obj['title'])
if 'text' in obj:
to_enrich.set_content(obj['text'])
return True
def extract_media_from_wacz(self, to_enrich: Metadata, wacz_filename: str) -> None:
"""
Receives a .wacz archive, and extracts all relevant media from it, adding them to to_enrich.
"""
logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}")
# unzipping the .wacz
tmp_dir = ArchivingContext.get_tmp_dir()
unzipped_dir = os.path.join(tmp_dir, "unzipped")
with ZipFile(wacz_filename, 'r') as z_obj:
z_obj.extractall(path=unzipped_dir)
# if warc is split into multiple gzip chunks, merge those
warc_dir = os.path.join(unzipped_dir, "archive")
warc_filename = os.path.join(tmp_dir, "merged.warc")
with open(warc_filename, 'wb') as outfile:
for filename in sorted(os.listdir(warc_dir)):
if filename.endswith('.gz'):
chunk_file = os.path.join(warc_dir, filename)
with open(chunk_file, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
# get media out of .warc
counter = 0
seen_urls = set()
import json
with open(warc_filename, 'rb') as warc_stream:
for record in ArchiveIterator(warc_stream):
# only include fetched resources
if record.rec_type == "resource" and record.content_type == "image/png" and self.extract_screenshot: # screenshots
fn = os.path.join(tmp_dir, f"warc-file-{counter}.png")
with open(fn, "wb") as outf: outf.write(record.raw_stream.read())
m = Media(filename=fn)
to_enrich.add_media(m, "browsertrix-screenshot")
counter += 1
if not self.extract_media: continue
if record.rec_type != 'response': continue
record_url = record.rec_headers.get_header('WARC-Target-URI')
if not UrlUtil.is_relevant_url(record_url):
logger.debug(f"Skipping irrelevant URL {record_url} but it's still present in the WACZ.")
continue
if record_url in seen_urls:
logger.debug(f"Skipping already seen URL {record_url}.")
continue
# filter by media mimetypes
content_type = record.http_headers.get("Content-Type")
if not content_type: continue
if not any(x in content_type for x in ["video", "image", "audio"]): continue
# create local file and add media
ext = mimetypes.guess_extension(content_type)
warc_fn = f"warc-file-{counter}{ext}"
fn = os.path.join(tmp_dir, warc_fn)
record_url_best_qual = UrlUtil.twitter_best_quality_url(record_url)
with open(fn, "wb") as outf: outf.write(record.raw_stream.read())
m = Media(filename=fn)
m.set("src", record_url)
# if a link with better quality exists, try to download that
if record_url_best_qual != record_url:
try:
m.filename = self.download_from_url(record_url_best_qual, warc_fn)
m.set("src", record_url_best_qual)
m.set("src_alternative", record_url)
except Exception as e: logger.warning(f"Unable to download best quality URL for {record_url=} got error {e}, using original in WARC.")
# remove bad videos
if m.is_video() and not m.is_valid_video(): continue
to_enrich.add_media(m, warc_fn)
counter += 1
seen_urls.add(record_url)
logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")

View File

@@ -0,0 +1,29 @@
{
"name": "Wayback Machine Enricher",
"type": ["enricher", "archiver"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"timeout": {"default": 15, "help": "seconds to wait for successful archive confirmation from wayback, if more than this passes the result contains the job_id so the status can later be checked manually."},
"if_not_archived_within": {"default": None, "help": "only tell wayback to archive if no archive is available before the number of seconds specified, use None to ignore this option. For more information: https://docs.google.com/document/d/1Nsv52MvSjbLb2PCpHlat0gkzw0EvtSgpKHu4mk0MnrA"},
"key": {"default": None, "help": "wayback API key. to get credentials visit https://archive.org/account/s3.php"},
"secret": {"default": None, "help": "wayback API secret. to get credentials visit https://archive.org/account/s3.php"},
"proxy_http": {"default": None, "help": "http proxy to use for wayback requests, eg http://proxy-user:password@proxy-ip:port"},
"proxy_https": {"default": None, "help": "https proxy to use for wayback requests, eg https://proxy-user:password@proxy-ip:port"},
},
"description": """
Submits the current URL to the Wayback Machine for archiving and returns either a job ID or the completed archive URL.
### Features
- Archives URLs using the Internet Archive's Wayback Machine API.
- Supports conditional archiving based on the existence of prior archives within a specified time range.
- Provides proxies for HTTP and HTTPS requests.
- Fetches and confirms the archive URL or provides a job ID for later status checks.
### Notes
- Requires a valid Wayback Machine API key and secret.
- Handles rate-limiting by Wayback Machine and retries status checks with exponential backoff.
"""
}

View File

@@ -0,0 +1,104 @@
import json
from loguru import logger
import time, requests
from auto_archiver.enrichers import Enricher
from auto_archiver.archivers import Archiver
from auto_archiver.utils import UrlUtil
from auto_archiver.core import Metadata
class WaybackArchiverEnricher(Enricher, Archiver):
"""
Submits the current URL to the webarchive and returns a job_id or completed archive.
The Wayback machine will rate-limit IP heavy usage.
"""
name = "wayback_archiver_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
assert type(self.secret) == str and len(self.secret) > 0, "please provide a value for the wayback_enricher API key"
assert type(self.secret) == str and len(self.secret) > 0, "please provide a value for the wayback_enricher API secret"
def download(self, item: Metadata) -> Metadata:
# this new Metadata object is required to avoid duplication
result = Metadata()
result.merge(item)
if self.enrich(result):
return result.success("wayback")
def enrich(self, to_enrich: Metadata) -> bool:
proxies = {}
if self.proxy_http: proxies["http"] = self.proxy_http
if self.proxy_https: proxies["https"] = self.proxy_https
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug(f"[SKIP] WAYBACK since url is behind AUTH WALL: {url=}")
return
logger.debug(f"calling wayback for {url=}")
if to_enrich.get("wayback"):
logger.info(f"Wayback enricher had already been executed: {to_enrich.get('wayback')}")
return True
ia_headers = {
"Accept": "application/json",
"Authorization": f"LOW {self.key}:{self.secret}"
}
post_data = {'url': url}
if self.if_not_archived_within:
post_data["if_not_archived_within"] = self.if_not_archived_within
# see https://docs.google.com/document/d/1Nsv52MvSjbLb2PCpHlat0gkzw0EvtSgpKHu4mk0MnrA for more options
r = requests.post('https://web.archive.org/save/', headers=ia_headers, data=post_data, proxies=proxies)
if r.status_code != 200:
logger.error(em := f"Internet archive failed with status of {r.status_code}: {r.json()}")
to_enrich.set("wayback", em)
return False
# check job status
try:
job_id = r.json().get('job_id')
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
return False
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON with job_id from Wayback and got {r.text}")
return False
# waits at most timeout seconds until job is completed, otherwise only enriches the job_id information
start_time = time.time()
wayback_url = False
attempt = 1
while not wayback_url and time.time() - start_time <= self.timeout:
try:
logger.debug(f"GETting status for {job_id=} on {url=} ({attempt=})")
r_status = requests.get(f'https://web.archive.org/save/status/{job_id}', headers=ia_headers, proxies=proxies)
r_json = r_status.json()
if r_status.status_code == 200 and r_json['status'] == 'success':
wayback_url = f"https://web.archive.org/web/{r_json['timestamp']}/{r_json['original_url']}"
elif r_status.status_code != 200 or r_json['status'] != 'pending':
logger.error(f"Wayback failed with {r_json}")
return False
except requests.exceptions.RequestException as e:
logger.warning(f"RequestException: fetching status for {url=} due to: {e}")
break
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON from Wayback and got {r.text} for {url=}")
break
except Exception as e:
logger.warning(f"error fetching status for {url=} due to: {e}")
if not wayback_url:
attempt += 1
time.sleep(1) # TODO: can be improved with exponential backoff
if wayback_url:
to_enrich.set("wayback", wayback_url)
else:
to_enrich.set("wayback", {"job_id": job_id, "check_status": f'https://web.archive.org/save/status/{job_id}'})
to_enrich.set("check wayback", f"https://web.archive.org/web/*/{url}")
return True

View File

@@ -0,0 +1,30 @@
{
"name": "Whisper Enricher",
"type": ["enricher"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_endpoint": {"default": None, "help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"default": None, "help": "WhisperApi api key for authentication"},
"include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."},
"action": {"default": "translate", "help": "which Whisper operation to execute", "choices": ["transcribe", "translate", "language_detection"]},
},
"description": """
Integrates with a Whisper API service to transcribe, translate, or detect the language of audio and video files.
### Features
- Submits audio or video files to a Whisper API deployment for processing.
- Supports operations such as transcription, translation, and language detection.
- Optionally generates SRT subtitle files for video content.
- Integrates with S3-compatible storage systems to make files publicly accessible for processing.
- Handles job submission, status checking, artifact retrieval, and cleanup.
### Notes
- Requires a Whisper API endpoint and API key for authentication.
- Only compatible with S3-compatible storage systems for media file accessibility.
- Handles multiple jobs and retries for failed or incomplete processing.
"""
}

View File

@@ -0,0 +1,124 @@
import traceback
import requests, time
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.storages import S3Storage
class WhisperEnricher(Enricher):
"""
Connects with a Whisper API service to get texts out of audio
whisper API repository: https://github.com/bellingcat/whisperbox-transcribe/
Only works if an S3 compatible storage is used
"""
name = "whisper_enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
assert type(self.api_endpoint) == str and len(self.api_endpoint) > 0, "please provide a value for the whisper_enricher api_endpoint"
assert type(self.api_key) == str and len(self.api_key) > 0, "please provide a value for the whisper_enricher api_key"
self.timeout = int(self.timeout)
def enrich(self, to_enrich: Metadata) -> None:
if not self._get_s3_storage():
logger.error("WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called.")
return
url = to_enrich.get_url()
logger.debug(f"WHISPER[{self.action}]: iterating media items for {url=}.")
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
m.store(url=url, metadata=to_enrich)
try:
job_id = self.submit_job(m)
job_results[job_id] = False
logger.debug(f"JOB SUBMITTED: {job_id=} for {m.key=}")
to_enrich.media[i].set("whisper_model", {"job_id": job_id})
except Exception as e:
logger.error(f"Failed to submit whisper job for {m.filename=} with error {e}\n{traceback.format_exc()}")
job_results = self.check_jobs(job_results)
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
job_id = to_enrich.media[i].get("whisper_model", {}).get("job_id")
if not job_id: continue
to_enrich.media[i].set("whisper_model", {
"job_id": job_id,
"job_status_check": f"{self.api_endpoint}/jobs/{job_id}",
"job_artifacts_check": f"{self.api_endpoint}/jobs/{job_id}/artifacts",
**(job_results[job_id] if job_results[job_id] else {"result": "incomplete or failed job"})
})
# append the extracted text to the content of the post so it gets written to the DBs like gsheets text column
if job_results[job_id]:
for k,v in job_results[job_id].items():
if "_text" in k and len(v):
to_enrich.set_content(f"\n[automatic video transcript]: {v}")
def submit_job(self, media: Media):
s3 = self._get_s3_storage()
s3_url = s3.get_cdn_url(media)
assert s3_url in media.urls, f"Could not find S3 url ({s3_url}) in list of stored media urls "
payload = {
"url": s3_url,
"type": self.action,
# "language": "string" # may be a config
}
logger.debug(f"calling API with {payload=}")
response = requests.post(f'{self.api_endpoint}/jobs', json=payload, headers={'Authorization': f'Bearer {self.api_key}'})
assert response.status_code == 201, f"calling the whisper api {self.api_endpoint} returned a non-success code: {response.status_code}"
logger.debug(response.json())
return response.json()['id']
def check_jobs(self, job_results: dict):
start_time = time.time()
all_completed = False
while not all_completed and (time.time() - start_time) <= self.timeout:
all_completed = True
for job_id in job_results:
if job_results[job_id] != False: continue
all_completed = False # at least one not ready
try: job_results[job_id] = self.check_job(job_id)
except Exception as e:
logger.error(f"Failed to check {job_id=} with error {e}\n{traceback.format_exc()}")
if not all_completed: time.sleep(3)
return job_results
def check_job(self, job_id):
r = requests.get(f'{self.api_endpoint}/jobs/{job_id}', headers={'Authorization': f'Bearer {self.api_key}'})
assert r.status_code == 200, f"Job status did not respond with 200, instead with: {r.status_code}"
j = r.json()
logger.debug(f"Checked job {job_id=} with status='{j['status']}'")
if j['status'] == "processing": return False
elif j['status'] == "error": return f"Error: {j['meta']['error']}"
elif j['status'] == "success":
r_res = requests.get(f'{self.api_endpoint}/jobs/{job_id}/artifacts', headers={'Authorization': f'Bearer {self.api_key}'})
assert r_res.status_code == 200, f"Job artifacts did not respond with 200, instead with: {r_res.status_code}"
logger.success(r_res.json())
result = {}
for art_id, artifact in enumerate(r_res.json()):
subtitle = []
full_text = []
for i, d in enumerate(artifact.get("data")):
subtitle.append(f"{i+1}\n{d.get('start')} --> {d.get('end')}\n{d.get('text').strip()}")
full_text.append(d.get('text').strip())
if not len(subtitle): continue
if self.include_srt: result[f"artifact_{art_id}_subtitle"] = "\n".join(subtitle)
result[f"artifact_{art_id}_text"] = "\n".join(full_text)
# call /delete endpoint on timely success
r_del = requests.delete(f'{self.api_endpoint}/jobs/{job_id}', headers={'Authorization': f'Bearer {self.api_key}'})
logger.debug(f"DELETE whisper {job_id=} result: {r_del.status_code}")
return result
return False
def _get_s3_storage(self) -> S3Storage:
try:
return next(s for s in ArchivingContext.get("storages") if s.__class__ == S3Storage)
except:
logger.warning("No S3Storage instance found in storages")
return