Merge branch 'load_modules' into docs_update

This commit is contained in:
Patrick Robertson
2025-02-11 14:10:56 +00:00
37 changed files with 1577 additions and 202 deletions

View File

@@ -1,7 +1,7 @@
{
"name": "Auto-Archiver API Database",
"type": ["database"],
"entry_point": "api_db:AAApiDb",
"entry_point": "api_db::AAApiDb",
"requires_setup": True,
"dependencies": {
"python": ["requests", "loguru"],
@@ -23,7 +23,7 @@
"default": None,
"help": "which group of users have access to the archive in case public=false as author",
},
"allow_rearchive": {
"use_api_cache": {
"default": True,
"type": "bool",
"help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
@@ -43,7 +43,7 @@
### Features
- **API Integration**: Supports querying for existing archives and submitting results.
- **Duplicate Prevention**: Avoids redundant archiving when `allow_rearchive` is disabled.
- **Duplicate Prevention**: Avoids redundant archiving when `use_api_cache` is disabled.
- **Configurable**: Supports settings like API endpoint, authentication token, tags, and permissions.
- **Tagging and Metadata**: Adds tags and manages metadata for archives.
- **Optional Storage**: Archives results conditionally based on configuration.

View File

@@ -15,11 +15,11 @@ class AAApiDb(Database):
""" query the database for the existence of this item.
Helps avoid re-archiving the same URL multiple times.
"""
if not self.allow_rearchive: return
if not self.use_api_cache: return
params = {"url": item.get_url(), "limit": 15}
headers = {"Authorization": f"Bearer {self.api_token}", "accept": "application/json"}
response = requests.get(os.path.join(self.api_endpoint, "tasks/search-url"), params=params, headers=headers)
response = requests.get(os.path.join(self.api_endpoint, "url/search"), params=params, headers=headers)
if response.status_code == 200:
if len(response.json()):
@@ -30,21 +30,26 @@ class AAApiDb(Database):
logger.error(f"AA API FAIL ({response.status_code}): {response.json()}")
return False
def done(self, item: Metadata, cached: bool=False) -> None:
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not self.store_results: return
if cached:
if cached:
logger.debug(f"skipping saving archive of {item.get_url()} to the AA API because it was cached")
return
logger.debug(f"saving archive of {item.get_url()} to the AA API.")
payload = {'result': item.to_json(), 'public': self.public, 'author_id': self.author_id, 'group_id': self.group_id, 'tags': list(self.tags)}
payload = {
'author_id': self.author_id,
'url': item.get_url(),
'public': self.public,
'group_id': self.group_id,
'tags': list(self.tags),
'result': item.to_json(),
}
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.post(os.path.join(self.api_endpoint, "submit-archive"), json=payload, headers=headers)
response = requests.post(os.path.join(self.api_endpoint, "interop/submit-archive"), json=payload, headers=headers)
if response.status_code == 200:
if response.status_code == 201:
logger.success(f"AA API: {response.json()}")
else:
logger.error(f"AA API FAIL ({response.status_code}): {response.json()}")

View File

@@ -1,7 +1,7 @@
{
"name": "Atlos Database",
"type": ["database"],
"entry_point": "atlos_db:AtlosDb",
"entry_point": "atlos_db::AtlosDb",
"requires_setup": True,
"dependencies":
{"python": ["loguru",

View File

@@ -19,9 +19,7 @@ from auto_archiver.core import Storage
class GDriveStorage(Storage):
def setup(self, config: dict) -> None:
# Step 1: Call the BaseModule setup to dynamically assign configs
super().setup(config)
def setup(self) -> None:
self.scopes = ['https://www.googleapis.com/auth/drive']
# Initialize Google Drive service
self._setup_google_drive_service()
@@ -72,9 +70,12 @@ class GDriveStorage(Storage):
for folder in path_parts[0:-1]:
folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
parent_id = folder_id
# get id of file inside folder (or sub folder)
file_id = self._get_id_from_parent_and_name(folder_id, filename)
file_id = self._get_id_from_parent_and_name(folder_id, filename, raise_on_missing=True)
if not file_id:
#
logger.info(f"file {filename} not found in folder {folder_id}")
return None
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def upload(self, media: Media, **kwargs) -> bool:
@@ -106,7 +107,13 @@ class GDriveStorage(Storage):
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=False):
def _get_id_from_parent_and_name(self, parent_id: str,
name: str,
retries: int = 1,
sleep_seconds: int = 10,
use_mime_type: bool = False,
raise_on_missing: bool = True,
use_cache=False):
"""
Retrieves the id of a folder or file from its @name and the @parent_id folder
Optionally does multiple @retries and sleeps @sleep_seconds between them

View File

@@ -1,6 +1,4 @@
from typing import Union, Tuple
import datetime
from urllib.parse import quote
from loguru import logger
@@ -8,32 +6,33 @@ from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_feeder import GWorksheet
from auto_archiver.utils.misc import get_current_timestamp
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress')
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason:str) -> None:
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f'Archive failed {reason}')
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, '')
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool=False) -> None:
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
@@ -45,23 +44,25 @@ class GsheetsDb(Database):
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == '':
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, 'status', status_message))
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid('archive', "\n".join(media.urls))
batch_if_valid('date', True, datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=datetime.timezone.utc).isoformat())
batch_if_valid('title', item.get_title())
batch_if_valid('text', item.get("content", ""))
batch_if_valid('timestamp', item.get_timestamp())
if media: batch_if_valid('hash', media.get("hash", "not-calculated"))
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
@@ -70,33 +71,44 @@ class GsheetsDb(Database):
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid('pdq_hash', ",".join(pdq_hashes))
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(screenshot, "urls"):
batch_if_valid('screenshot', "\n".join(screenshot.urls))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if (thumbnail := item.get_first_image("thumbnail")):
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid('thumbnail', f'=IMAGE("{thumbnail.urls[0]}")')
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if (browsertrix := item.get_media_by_id("browsertrix")):
batch_if_valid('wacz', "\n".join(browsertrix.urls))
batch_if_valid('replaywebpage', "\n".join([f'https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}' for wacz in browsertrix.urls]))
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', new_status)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
print(self.sheet_id)
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -21,8 +21,7 @@ from . import GWorksheet
class GsheetsFeeder(Feeder):
def setup(self, config: dict):
super().setup(config)
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
assert self.sheet or self.sheet_id, (
@@ -37,41 +36,48 @@ class GsheetsFeeder(Feeder):
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, wks in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title):
logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules")
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}')
gw = GWorksheet(wks, header_row=self.header, columns=self.columns)
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}")
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder) and self.use_sheet_names_in_stored_paths:
folder = os.path.join(folder, slugify(self.sheet), slugify(wks.title))
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
m.set_context('folder', folder)
m.set_context('gsheet', {"row": row, "worksheet": gw})
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
logger.success(f'Finished worksheet {wks.title}')
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:

View File

@@ -17,9 +17,8 @@ class HtmlFormatter(Formatter):
environment: Environment = None
template: any = None
def setup(self, config: dict) -> None:
def setup(self) -> None:
"""Sets up the Jinja2 environment and loads the template."""
super().setup(config) # Ensure the base class logic is executed
template_dir = os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")
self.environment = Environment(loader=FileSystemLoader(template_dir), autoescape=True)

View File

@@ -32,8 +32,7 @@ class InstagramAPIExtractor(Extractor):
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
)
def setup(self, config: dict) -> None:
super().setup(config)
def setup(self) -> None:
if self.api_endpoint[-1] == "/":
self.api_endpoint = self.api_endpoint[:-1]

View File

@@ -25,8 +25,7 @@ class InstagramExtractor(Extractor):
profile_pattern = re.compile(r"{valid_url}(\w+)".format(valid_url=valid_url))
# TODO: links to stories
def setup(self, config: dict) -> None:
super().setup(config)
def setup(self) -> None:
self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"

View File

@@ -27,26 +27,36 @@ class InstagramTbotExtractor(Extractor):
https://t.me/instagram_load_bot
"""
def setup(self, configs) -> None:
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
2. checks if the session file is valid
"""
super().setup(configs)
logger.info(f"SETUP {self.name} checking login...")
self._prepare_session_file()
self._initialize_telegram_client()
# make a copy of the session that is used exclusively with this archiver instance
def _prepare_session_file(self):
"""
Creates a copy of the session file for exclusive use with this archiver instance.
Ensures that a valid session file exists before proceeding.
"""
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
if not os.path.exists(f"{self.session_file}.session"):
raise FileNotFoundError(f"session file {self.session_file}.session not found, "
f"to set this up run the setup script in scripts/telegram_setup.py")
raise FileNotFoundError(f"Session file {self.session_file}.session not found.")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "")
def _initialize_telegram_client(self):
"""Initializes the Telegram client."""
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
logger.error(
f"Unable to access the {self.session_file} session. "
"Ensure that you don't use the same session file here and in telethon_extractor. "
"If you do, disable at least one of the archivers for the first-time setup of the telethon session: {e}"
)
with self.client.start():
logger.success(f"SETUP {self.name} login works.")
@@ -63,32 +73,49 @@ class InstagramTbotExtractor(Extractor):
result = Metadata()
tmp_dir = self.tmp_dir
with self.client.start():
chat = self.client.get_entity("instagram_load_bot")
since_id = self.client.send_message(entity=chat, message=url).id
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
chat, since_id = self._send_url_to_bot(url)
message = self._process_messages(chat, since_id, tmp_dir, result)
if "You must enter a URL to a post" in message:
if "You must enter a URL to a post" in message:
logger.debug(f"invalid link {url=} for {self.name}: {message}")
return False
# # TODO: It currently returns this as a success - is that intentional?
# if "Media not found or unavailable" in message:
# logger.debug(f"invalid link {url=} for {self.name}: {message}")
# return False
if message:
result.set_content(message).set_title(message[:128])
return result.success("insta-via-bot")
def _send_url_to_bot(self, url: str):
"""
Sends the URL to the 'instagram_load_bot' and returns (chat, since_id).
"""
chat = self.client.get_entity("instagram_load_bot")
since_message = self.client.send_message(entity=chat, message=url)
return chat, since_message.id
def _process_messages(self, chat, since_id, tmp_dir, result):
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
# Skip known filler message:
if post.message == 'The bot receives information through https://hikerapi.com/p/hJqpppqi':
continue
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
return message.strip()

View File

@@ -3,7 +3,7 @@
"type": ["storage"],
"requires_setup": True,
"dependencies": {
"python": ["boto3", "loguru"],
"python": ["hash_enricher", "boto3", "loguru"],
},
"configs": {
"path_generator": {
@@ -49,5 +49,6 @@
- Requires S3 credentials (API key and secret) and a bucket name to function.
- The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures.
- Uses `boto3` for interaction with the S3 API.
- Depends on the `HashEnricher` module for hash calculation.
"""
}

View File

@@ -13,8 +13,7 @@ NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage):
def setup(self, config: dict) -> None:
super().setup(config)
def setup(self) -> None:
self.s3 = boto3.client(
's3',
region_name=self.region,

View File

@@ -13,7 +13,7 @@
The `TelegramExtractor` retrieves publicly available media content from Telegram message links without requiring login credentials.
It processes URLs to fetch images and videos embedded in Telegram messages, ensuring a structured output using `Metadata`
and `Media` objects. Recommended for scenarios where login-based archiving is not viable, although `telethon_archiver`
is advised for more comprehensive functionality.
is advised for more comprehensive functionality, and higher quality media extraction.
### Features
- Extracts images and videos from public Telegram message links (`t.me`).

View File

@@ -19,6 +19,7 @@ class TelethonExtractor(Extractor):
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
2. trigger login process for telegram or proceed if already saved in a session file

View File

@@ -15,9 +15,7 @@ class TwitterApiExtractor(Extractor):
valid_url: re.Pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def setup(self, config: dict) -> None:
super().setup(config)
def setup(self) -> None:
self.api_index = 0
self.apis = []
if len(self.bearer_tokens):

View File

@@ -12,8 +12,7 @@ class VkExtractor(Extractor):
Currently only works for /wall posts
"""
def setup(self, config: dict) -> None:
super().setup(config)
def setup(self) -> None:
self.vks = VkScraper(self.username, self.password, session_file=self.session_file)
def download(self, item: Metadata) -> Metadata:

View File

@@ -18,8 +18,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
When used as an archiver it will extract the media from the .WACZ archive so it can be enriched.
"""
def setup(self, configs) -> None:
super().setup(configs)
def setup(self) -> None:
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')

View File

@@ -6,11 +6,15 @@
"python": ["s3_storage", "loguru", "requests"],
},
"configs": {
"api_endpoint": {"default": None, "help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"default": None, "help": "WhisperApi api key for authentication"},
"api_endpoint": {"required": True,
"help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"required": True,
"help": "WhisperApi api key for authentication"},
"include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."},
"action": {"default": "translate", "help": "which Whisper operation to execute", "choices": ["transcribe", "translate", "language_detection"]},
"action": {"default": "translate",
"help": "which Whisper operation to execute",
"choices": ["transcribe", "translate", "language_detection"]},
},
"description": """
Integrates with a Whisper API service to transcribe, translate, or detect the language of audio and video files.
@@ -25,6 +29,7 @@
### Notes
- Requires a Whisper API endpoint and API key for authentication.
- Only compatible with S3-compatible storage systems for media file accessibility.
- ** This stores the media files in S3 prior to enriching them as Whisper requires public URLs to access the media files.
- Handles multiple jobs and retries for failed or incomplete processing.
"""
}

View File

@@ -4,7 +4,6 @@ from loguru import logger
from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.s3_storage import S3Storage
from auto_archiver.core.module import get_module
class WhisperEnricher(Enricher):
@@ -14,18 +13,25 @@ class WhisperEnricher(Enricher):
Only works if an S3 compatible storage is used
"""
def enrich(self, to_enrich: Metadata) -> None:
if not self._get_s3_storage():
def setup(self) -> None:
self.stores = self.config['steps']['storages']
self.s3 = get_module("s3_storage", self.config)
if not "s3_storage" in self.stores:
logger.error("WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called.")
return
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"WHISPER[{self.action}]: iterating media items for {url=}.")
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
m.store(url=url, metadata=to_enrich, storages=self.storages)
# TODO: this used to pass all storage items to store now
# Now only passing S3, the rest will get added later in the usual order (?)
m.store(url=url, metadata=to_enrich, storages=[self.s3])
try:
job_id = self.submit_job(m)
job_results[job_id] = False
@@ -53,8 +59,8 @@ class WhisperEnricher(Enricher):
to_enrich.set_content(f"\n[automatic video transcript]: {v}")
def submit_job(self, media: Media):
s3 = get_module("s3_storage", self.config)
s3_url = s3.get_cdn_url(media)
s3_url = self.s3.get_cdn_url(media)
assert s3_url in media.urls, f"Could not find S3 url ({s3_url}) in list of stored media urls "
payload = {
"url": s3_url,
@@ -107,10 +113,3 @@ class WhisperEnricher(Enricher):
logger.debug(f"DELETE whisper {job_id=} result: {r_del.status_code}")
return result
return False
def _get_s3_storage(self) -> S3Storage:
try:
return next(s for s in self.storages if s.__class__ == S3Storage)
except:
logger.warning("No S3Storage instance found in storages")
return