Merge branch 'main' into feat/yt-dlp-pots

# Conflicts:
#	src/auto_archiver/modules/generic_extractor/__manifest__.py
This commit is contained in:
erinhmclark
2025-03-07 16:54:01 +00:00
76 changed files with 56452 additions and 811 deletions

View File

@@ -1 +0,0 @@
from .atlos_db import AtlosDb

View File

@@ -1,38 +0,0 @@
{
"name": "Atlos Database",
"type": ["database"],
"entry_point": "atlos_db::AtlosDb",
"requires_setup": True,
"dependencies":
{"python": ["loguru",
""],
"bin": [""]},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"required": True,
"type": "str",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
Handles integration with the Atlos platform for managing archival results.
### Features
- Outputs archival results to the Atlos API for storage and tracking.
- Updates failure status with error details when archiving fails.
- Processes and formats metadata, including ISO formatting for datetime fields.
- Skips processing for items without an Atlos ID.
### Setup
Required configs:
- atlos_url: Base URL for the Atlos API.
- api_token: Authentication token for API access.
"""
,
}

View File

@@ -1,66 +0,0 @@
from typing import Union
import requests
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -1 +0,0 @@
from .atlos_feeder import AtlosFeeder

View File

@@ -1,34 +0,0 @@
{
"name": "Atlos Feeder",
"type": ["feeder"],
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
AtlosFeeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Filters source materials based on visibility, processing status, and metadata.
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
- Iterates through paginated results using a cursor for efficient API interaction.
### Notes
- Requires an Atlos API endpoint and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Handles pagination transparently when retrieving data from the Atlos API.
"""
}

View File

@@ -1,42 +0,0 @@
import requests
from loguru import logger
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata
class AtlosFeeder(Feeder):
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break

View File

@@ -0,0 +1 @@
from .atlos_feeder_db_storage import AtlosFeederDbStorage

View File

@@ -0,0 +1,46 @@
{
"name": "Atlos Feeder Database Storage",
"type": ["feeder", "database", "storage"],
"entry_point": "atlos_feeder_db_storage::AtlosFeederDbStorage",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
A module that integrates with the Atlos API to fetch source material URLs for archival, uplaod extracted media,
[Atlos](https://www.atlos.org/) is a visual investigation and archiving platform designed for investigative research, journalism, and open-source intelligence (OSINT).
It helps users organize, analyze, and store media from various sources, making it easier to track and investigate digital evidence.
To get started create a new project and obtain an API token from the settings page. You can group event's into Atlos's 'incidents'.
Here you can add 'source material' by URLn and the Atlos feeder will fetch these URLs for archival.
You can use Atlos only as a 'feeder', however you can also implement the 'database' and 'storage' features to store the media files in Atlos which is recommended.
The Auto Archiver will retain the Atlos ID for each item, ensuring that the media and database outputs are uplaoded back into the relevant media item.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Iterates through the URLs from all source material items which are unprocessed, visible, and ready to archive.
- If the storage option is selected, it will store the media files alongside the original source material item in Atlos.
- Is the database option is selected it will output the results to the media item, as well as updating failure status with error details when archiving fails.
- Skips Storege/ database upload for items without an Atlos ID - restricting that you must use the Atlos feeder so that it has the Atlos ID to store the results with.
### Notes
- Requires an Atlos account with a project and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Feches any media items within an Atlos project, regardless of separation into incidents.
"""
}

View File

@@ -0,0 +1,153 @@
import hashlib
import os
from typing import IO, Iterator, Optional, Union
import requests
from loguru import logger
from auto_archiver.core import Database, Feeder, Media, Metadata, Storage
from auto_archiver.utils import calculate_file_hash
class AtlosFeederDbStorage(Feeder, Database, Storage):
def setup(self) -> requests.Session:
"""create and return a persistent session."""
self.session = requests.Session()
def _get(self, endpoint: str, params: Optional[dict] = None) -> dict:
"""Wrapper for GET requests to the Atlos API."""
url = f"{self.atlos_url}{endpoint}"
response = self.session.get(
url, headers={"Authorization": f"Bearer {self.api_token}"}, params=params
)
response.raise_for_status()
return response.json()
def _post(
self,
endpoint: str,
json: Optional[dict] = None,
params: Optional[dict] = None,
files: Optional[dict] = None,
) -> dict:
"""Wrapper for POST requests to the Atlos API."""
url = f"{self.atlos_url}{endpoint}"
response = self.session.post(
url,
headers={"Authorization": f"Bearer {self.api_token}"},
json=json,
params=params,
files=files,
)
response.raise_for_status()
return response.json()
# ! Atlos Module - Feeder Methods
def __iter__(self) -> Iterator[Metadata]:
"""Iterate over unprocessed, visible source materials from Atlos."""
cursor = None
while True:
data = self._get("/api/v2/source_material", params={"cursor": cursor})
cursor = data.get("next")
results = data.get("results", [])
for item in results:
if (
item.get("source_url") not in [None, ""]
and not item.get("metadata", {}).get("auto_archiver", {}).get("processed", False)
and item.get("visibility") == "visible"
and item.get("status") not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set("atlos_id", item["id"])
if not results or cursor is None:
break
# ! Atlos Module - Database Methods
def failed(self, item: Metadata, reason: str) -> None:
"""Mark an item as failed in Atlos, if the ID exists."""
atlos_id = item.metadata.get("atlos_id")
if not atlos_id:
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
self._post(
f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver",
json={"metadata": {"processed": True, "status": "error", "error": reason}},
)
logger.info(f"Stored failure for {item.get_url()} (ID {atlos_id}) on Atlos: {reason}")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""Mark an item as successfully archived in Atlos."""
atlos_id = item.metadata.get("atlos_id")
if not atlos_id:
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
self._post(
f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver",
json={
"metadata": {
"processed": True,
"status": "success",
"results": self._process_metadata(item),
}
},
)
logger.info(f"Stored success for {item.get_url()} (ID {atlos_id}) on Atlos")
# ! Atlos Module - Storage Methods
def get_cdn_url(self, _media: Media) -> str:
"""Return the base Atlos URL as the CDN URL."""
return self.atlos_url
def upload(self, media: Media, metadata: Optional[Metadata] = None, **_kwargs) -> bool:
"""Upload a media file to Atlos if it has not been uploaded already."""
if metadata is None:
logger.error(f"No metadata provided for {media.filename}")
return False
atlos_id = metadata.get("atlos_id")
if not atlos_id:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} in Atlos.")
return False
media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096)
# Check whether the media has already been uploaded
source_material = self._get(f"/api/v2/source_material/{atlos_id}")["result"]
existing_media = [
artifact.get("file_hash_sha256")
for artifact in source_material.get("artifacts", [])
]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
with open(media.filename, "rb") as file_obj:
self._post(
f"/api/v2/source_material/upload/{atlos_id}",
params={"title": media.properties},
files={"file": (os.path.basename(media.filename), file_obj)},
)
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool:
"""Upload a file-like object; not implemented."""
pass

View File

@@ -1 +0,0 @@
from .atlos_storage import AtlosStorage

View File

@@ -1,32 +0,0 @@
{
"name": "Atlos Storage",
"type": ["storage"],
"requires_setup": True,
"dependencies": {
"python": ["loguru", "boto3"],
"bin": []
},
"description": """
Stores media files in a [Atlos](https://www.atlos.org/).
### Features
- Saves media files to Atlos, organizing them into folders based on the provided path structure.
### Notes
- Requires setup with Atlos credentials.
- Files are uploaded to the specified `root_folder_id` and organized by the `media.key` structure.
""",
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"required": True,
"type": "str"
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
}
}

View File

@@ -1,66 +0,0 @@
import hashlib
import os
from typing import IO, Optional
import requests
from loguru import logger
from auto_archiver.core import Media, Metadata
from auto_archiver.core import Storage
class AtlosStorage(Storage):
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -0,0 +1,23 @@
{
'name': 'Command Line Feeder',
'type': ['feeder'],
'entry_point': 'cli_feeder::CLIFeeder',
'requires_setup': False,
'description': 'Feeds URLs to orchestrator from the command line',
'configs': {
'urls': {
'default': None,
'help': 'URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml',
},
},
'description': """
The Command Line Feeder is the default enabled feeder for the Auto Archiver. It allows you to pass URLs directly to the orchestrator from the command line
without the need to specify any additional configuration or command line arguments:
`auto-archiver --feeder cli_feeder -- "https://example.com/1/,https://example.com/2/"`
You can pass multiple URLs by separating them with a space. The URLs will be processed in the order they are provided.
`auto-archiver --feeder cli_feeder -- https://example.com/1/ https://example.com/2/`
""",
}

View File

@@ -0,0 +1,21 @@
from loguru import logger
from auto_archiver.core.feeder import Feeder
from auto_archiver.core.metadata import Metadata
class CLIFeeder(Feeder):
def setup(self) -> None:
self.urls = self.config['urls']
if not self.urls:
raise ValueError("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.")
def __iter__(self) -> Metadata:
urls = self.config['urls']
for url in urls:
logger.debug(f"Processing {url}")
m = Metadata().set_url(url)
m.set_context("folder", "cli")
yield m
logger.success(f"Processed {len(urls)} URL(s)")

View File

@@ -10,7 +10,7 @@ class ConsoleDb(Database):
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
logger.info(f"STARTED {item}")
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}")

View File

@@ -6,7 +6,7 @@
},
'entry_point': 'csv_db::CSVDb',
"configs": {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
"csv_file": {"default": "db.csv", "help": "CSV file name to save metadata to"},
},
"description": """
Handles exporting archival results to a CSV file.

View File

@@ -28,6 +28,13 @@ the broader archiving framework.
metadata objects. Some dropins are included in this generic_archiver by default, but
custom dropins can be created to handle additional websites and passed to the archiver
via the command line using the `--dropins` option (TODO!).
### Auto-Updates
The Generic Extractor will also automatically check for updates to `yt-dlp` (every 5 days by default).
This can be configured using the `ytdlp_update_interval` setting (or disabled by setting it to -1).
If you are having issues with the extractor, you can review the version of `yt-dlp` being used with `yt-dlp --version`.
""",
"configs": {
"subtitles": {"default": True, "help": "download subtitles if available", "type": "bool"},
@@ -69,5 +76,10 @@ via the command line using the `--dropins` option (TODO!).
"help": "Additional arguments to pass to the yt-dlp extractor. See https://github.com/yt-dlp/yt-dlp/blob/master/README.md#extractor-arguments.",
"type": "json_loader",
},
"ytdlp_update_interval": {
"default": 5,
"help": "How often to check for yt-dlp updates (days). If positive, will check and update yt-dlp every [num] days. Set it to -1 to disable, or 0 to always update on every run.",
"type": "int",
},
},
}

View File

@@ -1,7 +1,11 @@
import datetime, os, yt_dlp, pysubs2
import datetime, os
import importlib
import subprocess
from typing import Generator, Type
import yt_dlp
from yt_dlp.extractor.common import InfoExtractor
import pysubs2
from loguru import logger
@@ -11,6 +15,44 @@ from auto_archiver.core import Metadata, Media
class GenericExtractor(Extractor):
_dropins = {}
def setup(self):
# check for file .ytdlp-update in the secrets folder
if self.ytdlp_update_interval < 0:
return
use_secrets = os.path.exists('secrets')
path = os.path.join('secrets' if use_secrets else '', '.ytdlp-update')
next_update_check = None
if os.path.exists(path):
with open(path, "r") as f:
next_update_check = datetime.datetime.fromisoformat(f.read())
if not next_update_check or next_update_check < datetime.datetime.now():
self.update_ytdlp()
next_update_check = datetime.datetime.now() + datetime.timedelta(days=self.ytdlp_update_interval)
with open(path, "w") as f:
f.write(next_update_check.isoformat())
def update_ytdlp(self):
logger.info("Checking and updating yt-dlp...")
logger.info(f"Tip: change the 'ytdlp_update_interval' setting to control how often yt-dlp is updated. Set to -1 to disable or 0 to enable on every run. Current setting: {self.ytdlp_update_interval}")
from importlib.metadata import version as get_version
old_version = get_version("yt-dlp")
try:
# try and update with pip (this works inside poetry environment and in a normal virtualenv)
result = subprocess.run(["pip", "install", "--upgrade", "yt-dlp"], check=True, capture_output=True)
if "Successfully installed yt-dlp" in result.stdout.decode():
new_version = importlib.metadata.version("yt-dlp")
logger.info(f"yt-dlp successfully (from {old_version} to {new_version})")
importlib.reload(yt_dlp)
else:
logger.info("yt-dlp already up to date")
except Exception as e:
logger.error(f"Error updating yt-dlp: {e}")
def suitable_extractors(self, url: str) -> Generator[str, None, None]:
"""
Returns a list of valid extractors for the given URL"""

View File

@@ -1 +0,0 @@
from .gsheet_db import GsheetsDb

View File

@@ -1,38 +0,0 @@
{
"name": "Google Sheets Database",
"type": ["database"],
"entry_point": "gsheet_db::GsheetsDb",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsDatabase:
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
"""
}

View File

@@ -1,114 +0,0 @@
from typing import Union, Tuple
from urllib.parse import quote
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_feeder import GWorksheet
from auto_archiver.utils.misc import get_current_timestamp
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -1,2 +0,0 @@
from .gworksheet import GWorksheet
from .gsheet_feeder import GsheetsFeeder

View File

@@ -1,95 +0,0 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import os
import gspread
from loguru import logger
from slugify import slugify
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata
from . import GWorksheet
class GsheetsFeeder(Feeder):
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
if not self.sheet and not self.sheet_id:
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing

View File

@@ -0,0 +1,2 @@
from .gworksheet import GWorksheet
from .gsheet_feeder_db import GsheetsFeederDB

View File

@@ -1,7 +1,7 @@
{
"name": "Google Sheets Feeder",
"type": ["feeder"],
"entry_point": "gsheet_feeder::GsheetsFeeder",
"name": "Google Sheets Feeder Database",
"type": ["feeder", "database"],
"entry_point": "gsheet_feeder_db::GsheetsFeederDB",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
@@ -12,7 +12,9 @@
"default": None,
"help": "the id of the sheet to archive (alternative to 'sheet' config)",
},
"header": {"default": 1, "help": "index of the header row (starts at 1)", "type": "int"},
"header": {"default": 1,
"type": "int",
"help": "index of the header row (starts at 1)", "type": "int"},
"service_account": {
"default": "secrets/service_account.json",
"help": "service account JSON file path. Learn how to create one: https://gspread.readthedocs.io/en/latest/oauth2.html",
@@ -51,10 +53,23 @@
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
"type": "bool",
},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsFeeder
A Google Sheets-based feeder for the Auto Archiver.
GsheetsFeederDatabase
A Google Sheets-based feeder and optional database for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
@@ -64,11 +79,16 @@
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
- If the database is enabled, this updates the Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Setup
- Requires a Google Service Account JSON file for authentication, which should be stored in `secrets/gsheets_service_account.json`.
To set up a service account, follow the instructions [here](https://gspread.readthedocs.io/en/latest/oauth2.html).
- Define the `sheet` or `sheet_id` configuration to specify the sheet to archive.
- Customize the column names in your Google sheet using the `columns` configuration.
- The Google Sheet can be used soley as a feeder or as a feeder and database, but note you can't currently feed into the database from an alternate feeder.
""",
}

View File

@@ -0,0 +1,196 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import os
from typing import Tuple, Union
from urllib.parse import quote
import gspread
from loguru import logger
from slugify import slugify
from auto_archiver.core import Feeder, Database, Media
from auto_archiver.core import Metadata
from auto_archiver.modules.gsheet_feeder_db import GWorksheet
from auto_archiver.utils.misc import calculate_file_hash, get_current_timestamp
class GsheetsFeederDB(Feeder, Database):
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
if not self.sheet and not self.sheet_id:
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -7,7 +7,9 @@
"bin": [""]
},
"configs": {
"detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
"detect_thumbnails": {"default": True,
"help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'",
"type": "bool"},
},
"description": """ """,
}

View File

@@ -10,25 +10,30 @@
"requires_setup": True,
"configs": {
"username": {"required": True,
"help": "a valid Instagram username"},
"help": "A valid Instagram username."},
"password": {
"required": True,
"help": "the corresponding Instagram account password",
"help": "The corresponding Instagram account password.",
},
"download_folder": {
"default": "instaloader",
"help": "name of a folder to temporarily download content to",
"help": "Name of a folder to temporarily download content to.",
},
"session_file": {
"default": "secrets/instaloader.session",
"help": "path to the instagram session which saves session credentials",
"help": "Path to the instagram session file which saves session credentials. If one doesn't exist this gives the path to store a new one.",
},
# TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
},
"description": """
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram.
> ⚠️ **Warning**
> This module is not actively maintained due to known issues with blocking.
> Prioritise usage of the [Instagram Tbot Extractor](./instagram_tbot_extractor.md) and [Instagram API Extractor](./instagram_api_extractor.md)
This class handles both individual posts and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts.
Authentication is required via username/password or a session file.

View File

@@ -3,7 +3,7 @@
highlights, and tagged posts. Authentication is required via username/password or a session file.
"""
import re, os, shutil, traceback
import re, os, shutil
import instaloader
from loguru import logger
@@ -15,10 +15,9 @@ class InstagramExtractor(Extractor):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
# NB: post regex should be tested before profile
valid_url = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/")
# https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"{valid_url}(?:p|reel)\/(\w+)".format(valid_url=valid_url))
# https://regex101.com/r/6Wbsxa/1
@@ -28,19 +27,22 @@ class InstagramExtractor(Extractor):
def setup(self) -> None:
self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"
download_geotags=True,
download_comments=True,
compress_json=False,
dirname_pattern=self.download_folder,
filename_pattern="{date_utc}_UTC_{target}__{typename}"
)
try:
self.insta.load_session_from_file(self.username, self.session_file)
except Exception as e:
logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
try:
self.insta.login(self.username, config.instagram_self.password)
# TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758
logger.debug(f"Session file failed", exc_info=True)
logger.info("No valid session file found - Attempting login with use and password.")
self.insta.login(self.username, self.password)
self.insta.save_session_to_file(self.session_file)
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
except Exception as e:
logger.error(f"Failed to setup Instagram Extractor with Instagrapi. {e}")
def download(self, item: Metadata) -> Metadata:

View File

@@ -104,7 +104,7 @@ class InstagramTbotExtractor(Extractor):
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
while attempts < max(self.timeout - 3, 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):

View File

@@ -17,7 +17,9 @@
"choices": ["random", "static"],
},
"save_to": {"default": "./local_archive", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
"save_absolute": {"default": False,
"type": "bool",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
},
"description": """
LocalStorage: A storage module for saving archived content locally on the filesystem.

View File

@@ -6,13 +6,25 @@
"python": ["loguru", "selenium"],
},
"configs": {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"width": {"default": 1280,
"type": "int",
"help": "width of the screenshots"},
"height": {"default": 1024,
"type": "int",
"help": "height of the screenshots"},
"timeout": {"default": 60,
"type": "int",
"help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4,
"type": "int",
"help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
"save_to_pdf": {"default": False,
"type": "bool",
"help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {},
"help": "options to pass to the pdf printer, in JSON format. See https://www.selenium.dev/documentation/webdriver/interactions/print_page/ for more information",
"type": "json_loader"},
},
"description": """
Captures screenshots and optionally saves web pages as PDFs using a WebDriver.

View File

@@ -7,7 +7,9 @@
},
'entry_point': 'ssl_enricher::SSLEnricher',
"configs": {
"skip_when_nothing_archived": {"default": True, "help": "if true, will skip enriching when no media is archived"},
"skip_when_nothing_archived": {"default": True,
"type": 'bool',
"help": "if true, will skip enriching when no media is archived"},
},
"description": """
Retrieves SSL certificate information for a domain and stores it as a file.

View File

@@ -14,7 +14,9 @@
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"join_channels": {"default": True, "help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"join_channels": {"default": True,
"type": "bool",
"help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"channel_invites": {
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",

View File

@@ -17,11 +17,19 @@
"configs": {
"profile": {"default": None, "help": "browsertrix-profile (for profile generation see https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles)."},
"docker_commands": {"default": None, "help":"if a custom docker invocation is needed"},
"timeout": {"default": 120, "help": "timeout for WACZ generation in seconds"},
"extract_media": {"default": False, "help": "If enabled all the images/videos/audio present in the WACZ archive will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"extract_screenshot": {"default": True, "help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"timeout": {"default": 120,
"type": "int",
"help": "timeout for WACZ generation in seconds", "type": "int"},
"extract_media": {"default": False,
"type": 'bool',
"help": "If enabled all the images/videos/audio present in the WACZ archive will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."
},
"extract_screenshot": {"default": True,
"type": 'bool',
"help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."
},
"socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"},
"socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"socks_proxy_port": {"default": None, "type":"int", "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"proxy_server": {"default": None, "help": "SOCKS server proxy URL, in development"},
},
"description": """

View File

@@ -9,6 +9,7 @@
"configs": {
"timeout": {
"default": 15,
"type": "int",
"help": "seconds to wait for successful archive confirmation from wayback, if more than this passes the result contains the job_id so the status can later be checked manually.",
},
"if_not_archived_within": {

View File

@@ -10,8 +10,12 @@
"help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"required": True,
"help": "WhisperApi api key for authentication"},
"include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."},
"include_srt": {"default": False,
"type": "bool",
"help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90,
"type": "int",
"help": "How many seconds to wait at most for a successful job completion."},
"action": {"default": "translate",
"help": "which Whisper operation to execute",
"choices": ["transcribe", "translate", "language_detection"]},