More manifests, base modules and rename from archiver to extractor.

This commit is contained in:
erinhmclark
2025-01-23 16:40:48 +00:00
parent 9db26cdfc2
commit 1274a1b231
93 changed files with 378 additions and 238 deletions

View File

@@ -2,7 +2,7 @@ from typing import Union
import requests, os
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.base_modules import Database
from auto_archiver.core import Metadata

View File

@@ -0,0 +1,38 @@
{
"name": "atlos_storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "requests"],
"bin": [""]
},
"configs": {
# TODO: get base storage configs
# TODO also? get_atlos_config_options()
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
},
"description": """
AtlosStorage: A storage module for saving media files to the Atlos platform.
### Features
- Uploads media files to Atlos using Atlos-specific APIs.
- Automatically calculates SHA-256 hashes of media files for integrity verification.
- Skips uploads for files that already exist on Atlos with the same hash.
- Supports attaching metadata, such as `atlos_id`, to the uploaded files.
- Provides CDN-like URLs for accessing uploaded media.
### Notes
- Requires Atlos API configuration, including `atlos_url` and `api_token`.
- Files are linked to an `atlos_id` in the metadata, ensuring proper association with Atlos source materials.
"""
}

View File

@@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
from auto_archiver.core import Media, Metadata
from auto_archiver.base_modules import Storage
from auto_archiver.utils import get_atlos_config_options
class AtlosStorage(Storage):
name = "atlos_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -1,11 +1,12 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from auto_archiver.databases import Database
from auto_archiver.base_modules import Database
from auto_archiver.core import Metadata
from auto_archiver.utils import get_atlos_config_options

View File

@@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

View File

@@ -1,7 +1,7 @@
from loguru import logger
import requests
from auto_archiver.feeders import Feeder
from auto_archiver.base_modules import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import get_atlos_config_options

View File

@@ -1,6 +1,6 @@
from loguru import logger
from auto_archiver.feeders import Feeder
from auto_archiver.base_modules import Feeder
from auto_archiver.core import Metadata, ArchivingContext

View File

@@ -1,6 +1,6 @@
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.base_modules import Database
from auto_archiver.core import Metadata

View File

@@ -3,7 +3,7 @@ from loguru import logger
from csv import DictWriter
from dataclasses import asdict
from auto_archiver.databases import Database
from auto_archiver.base_modules import Database
from auto_archiver.core import Metadata

View File

@@ -1,7 +1,7 @@
from loguru import logger
import csv
from auto_archiver.feeders import Feeder
from auto_archiver.base_modules import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import url_or_none

View File

@@ -0,0 +1,34 @@
m = {
"name": "Google Drive Storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {
"python": [
"loguru",
"google-api-python-client",
"google-auth",
"google-auth-oauthlib",
"google-auth-httplib2"
],
},
"configs": {
# TODO: get base storage configs
"root_folder_id": {"default": None, "help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None, "help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path, same as used for Google Sheets. NOTE: storage used will count towards the developer account."},
},
"description": """
GDriveStorage: A storage module for saving archived content to Google Drive.
### Features
- Saves media files to Google Drive, organizing them into folders based on the provided path structure.
- Supports OAuth token-based authentication or service account credentials for API access.
- Automatically creates folders in Google Drive if they don't exist.
- Retrieves CDN URLs for stored files, enabling easy sharing and access.
### Notes
- Requires setup with either a Google OAuth token or a service account JSON file.
- Files are uploaded to the specified `root_folder_id` and organized by the `media.key` structure.
- Automatically handles Google Drive API token refreshes for long-running jobs.
"""
}

View File

@@ -0,0 +1,186 @@
import shutil, os, time, json
from typing import IO
from loguru import logger
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from auto_archiver.core import Media
from auto_archiver.base_modules import Storage
class GDriveStorage(Storage):
name = "gdrive_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
SCOPES = ['https://www.googleapis.com/auth/drive']
if self.oauth_token is not None:
"""
Tokens are refreshed after 1 hour
however keep working for 7 days (tbc)
so as long as the job doesn't last for 7 days
then this method of refreshing only once per run will work
see this link for details on the token
https://davemateer.com/2022/04/28/google-drive-with-python#tokens
"""
logger.debug(f'Using GD OAuth token {self.oauth_token}')
# workaround for missing 'refresh_token' in from_authorized_user_file
with open(self.oauth_token, 'r') as stream:
creds_json = json.load(stream)
creds_json['refresh_token'] = creds_json.get("refresh_token", "")
creds = Credentials.from_authorized_user_info(creds_json, SCOPES)
# creds = Credentials.from_authorized_user_file(self.oauth_token, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.debug('Requesting new GD OAuth token')
creds.refresh(Request())
else:
raise Exception("Problem with creds - create the token again")
# Save the credentials for the next run
with open(self.oauth_token, 'w') as token:
logger.debug('Saving new GD OAuth token')
token.write(creds.to_json())
else:
logger.debug('GD OAuth Token valid')
else:
gd_service_account = self.service_account
logger.debug(f'Using GD Service Account {gd_service_account}')
creds = service_account.Credentials.from_service_account_file(gd_service_account, scopes=SCOPES)
self.service = build('drive', 'v3', credentials=creds)
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"root_folder_id": {"default": None, "help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None, "help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path, same as used for Google Sheets. NOTE: storage used will count towards the developer account."},
})
def get_cdn_url(self, media: Media) -> str:
"""
only support files saved in a folder for GD
S3 supports folder and all stored in the root
"""
# full_name = os.path.join(self.folder, media.key)
parent_id, folder_id = self.root_folder_id, None
path_parts = media.key.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"looking for folders for {path_parts[0:-1]} before getting url for {filename=}")
for folder in path_parts[0:-1]:
folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
parent_id = folder_id
# get id of file inside folder (or sub folder)
file_id = self._get_id_from_parent_and_name(folder_id, filename)
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def upload(self, media: Media, **kwargs) -> bool:
logger.debug(f'[{self.__class__.name}] storing file {media.filename} with key {media.key}')
"""
1. for each sub-folder in the path check if exists or create
2. upload file to root_id/other_paths.../filename
"""
parent_id, upload_to = self.root_folder_id, None
path_parts = media.key.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"checking folders {path_parts[0:-1]} exist (or creating) before uploading {filename=}")
for folder in path_parts[0:-1]:
upload_to = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=False)
if upload_to is None:
upload_to = self._mkdir(folder, parent_id)
parent_id = upload_to
# upload file to gd
logger.debug(f'uploading {filename=} to folder id {upload_to}')
file_metadata = {
'name': [filename],
'parents': [upload_to]
}
media = MediaFileUpload(media.filename, resumable=True)
gd_file = self.service.files().create(supportsAllDrives=True, body=file_metadata, media_body=media, fields='id').execute()
logger.debug(f'uploadf: uploaded file {gd_file["id"]} successfully in folder={upload_to}')
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=False):
"""
Retrieves the id of a folder or file from its @name and the @parent_id folder
Optionally does multiple @retries and sleeps @sleep_seconds between them
If @use_mime_type will restrict search to "mimeType='application/vnd.google-apps.folder'"
If @raise_on_missing will throw error when not found, or returns None
Will remember previous calls to avoid duplication if @use_cache - might not have all edge cases tested, so use at own risk
Returns the id of the file or folder from its name as a string
"""
# cache logic
if use_cache:
self.api_cache = getattr(self, "api_cache", {})
cache_key = f"{parent_id}_{name}_{use_mime_type}"
if cache_key in self.api_cache:
logger.debug(f"cache hit for {cache_key=}")
return self.api_cache[cache_key]
# API logic
debug_header: str = f"[searching {name=} in {parent_id=}]"
query_string = f"'{parent_id}' in parents and name = '{name}' and trashed = false "
if use_mime_type:
query_string += f" and mimeType='application/vnd.google-apps.folder' "
for attempt in range(retries):
results = self.service.files().list(
# both below for Google Shared Drives
supportsAllDrives=True,
includeItemsFromAllDrives=True,
q=query_string,
spaces='drive', # ie not appDataFolder or photos
fields='files(id, name)'
).execute()
items = results.get('files', [])
if len(items) > 0:
logger.debug(f"{debug_header} found {len(items)} matches, returning last of {','.join([i['id'] for i in items])}")
_id = items[-1]['id']
if use_cache: self.api_cache[cache_key] = _id
return _id
else:
logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}.')
if attempt < retries - 1:
logger.debug(f'sleeping for {sleep_seconds} second(s)')
time.sleep(sleep_seconds)
if raise_on_missing:
raise ValueError(f'{debug_header} not found after {retries} attempt(s)')
return None
def _mkdir(self, name: str, parent_id: str):
"""
Creates a new GDrive folder @name inside folder @parent_id
Returns id of the created folder
"""
logger.debug(f'Creating new folder with {name=} inside {parent_id=}')
file_metadata = {
'name': [name],
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id]
}
gd_folder = self.service.files().create(supportsAllDrives=True, body=file_metadata, fields='id').execute()
return gd_folder.get('id')
# def exists(self, key):
# try:
# self.get_cdn_url(key)
# return True
# except: return False

View File

@@ -1,17 +1,12 @@
import os
import mimetypes
import requests
from loguru import logger
from auto_archiver.core.context import ArchivingContext
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_modules.extractor import Extractor
from auto_archiver.core.metadata import Metadata, Media
from .dropin import GenericDropin, InfoExtractor
class Bluesky(GenericDropin):
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
result = Metadata()
result.set_url(url)
result.set_title(post["record"]["text"])
@@ -42,7 +37,7 @@ class Bluesky(GenericDropin):
def _download_bsky_embeds(self, post: dict, archiver: Archiver) -> list[Media]:
def _download_bsky_embeds(self, post: dict, archiver: Extractor) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""

View File

@@ -1,6 +1,6 @@
from yt_dlp.extractor.common import InfoExtractor
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_modules.extractor import Extractor
class GenericDropin:
"""Base class for dropins for the generic extractor.
@@ -30,7 +30,7 @@ class GenericDropin:
raise NotImplementedError("This method should be implemented in the subclass")
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
"""
This method should create a Metadata object from the post data.
"""

View File

@@ -5,10 +5,10 @@ from yt_dlp.extractor.common import InfoExtractor
from loguru import logger
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_modules.extractor import Extractor
from ...core import Metadata, Media, ArchivingContext
class GenericExtractor(Archiver):
class GenericExtractor(Extractor):
name = "youtubedl_archiver" #left as is for backwards compat
_dropins = {}

View File

@@ -2,7 +2,7 @@ from typing import Type
from auto_archiver.utils import traverse_obj
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_modules.extractor import Extractor
from yt_dlp.extractor.common import InfoExtractor
from dateutil.parser import parse as parse_dt
@@ -19,7 +19,7 @@ class Truth(GenericDropin):
def skip_ytdlp_download(self, url, ie_instance: Type[InfoExtractor]) -> bool:
return True
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
"""
Creates metadata from a truth social post

View File

@@ -6,7 +6,7 @@ from slugify import slugify
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.utils import UrlUtil
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_modules.extractor import Extractor
from .dropin import GenericDropin, InfoExtractor
@@ -32,7 +32,7 @@ class Twitter(GenericDropin):
twid = ie_instance._match_valid_url(url).group('id')
return ie_instance._extract_status(twid=twid)
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):

View File

@@ -1,21 +0,0 @@
# TODO merge with feeder manifest?
{
"name": "gsheet_db",
"type": ["database"],
"requires_setup": True,
"external_dependencies": {"python": [" loguru"],
},
"description": """
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
""",
}

View File

@@ -1,5 +1,5 @@
{
"name": "Google Sheets Feeder",
"name": "Google Sheets Procesor",
"type": ["feeder"],
"requires_setup": True,
"external_dependencies": {
@@ -22,7 +22,12 @@
}
},
"description": """
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
Google Sheets Module.
Handles feeding from a google sheet as well as an optional write back to the sheet.
## GsheetsFeeder
A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
@@ -36,5 +41,18 @@
### Notes
- Requires a Google Service Account JSON file for authentication. Suggested location is `secrets/gsheets_service_account.json`.
- Create the sheet using the template provided in the docs.
## GsheetsDatabase:
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
"""
}

View File

@@ -1,10 +1,11 @@
from typing import Union, Tuple
import datetime
from urllib.parse import quote
from loguru import logger
from auto_archiver.databases import Database
from auto_archiver.base_modules import Database
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import GWorksheet

View File

@@ -13,8 +13,7 @@ import gspread, os
from loguru import logger
from slugify import slugify
# from . import Enricher
from auto_archiver.feeders import Feeder
from auto_archiver.base_modules import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import Gsheets, GWorksheet

View File

@@ -0,0 +1 @@
from hash_enricher import HashEnricher

View File

@@ -7,7 +7,7 @@
},
"configs": {
"algorithm": {"default": "SHA-256", "help": "hash algorithm to use", "choices": ["SHA-256", "SHA3-512"]},
"chunksize": {"default": int(1.6e7), "help": "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"},
"chunksize": {"default": 1.6e7, "help": "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"},
},
"description": """
Generates cryptographic hashes for media files to ensure data integrity and authenticity.

View File

@@ -10,7 +10,7 @@ making it suitable for handling large files efficiently.
import hashlib
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata, ArchivingContext
@@ -40,7 +40,11 @@ class HashEnricher(Enricher):
else:
self.chunksize = self.configs()["chunksize"]["default"]
self.chunksize = int(self.chunksize)
try:
self.chunksize = int(self.chunksize)
except ValueError:
raise ValueError(f"Invalid chunksize value: {self.chunksize}. Must be an integer.")
assert self.chunksize >= -1, "read length must be non-negative or -1"
ArchivingContext.set("hash_enricher.algorithm", self.algorithm, keep_on_reset=True)

View File

@@ -0,0 +1,13 @@
m = {
"name": "HTML Formatter",
"type": ["formatter"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "jinja2"],
"bin": [""]
},
"configs": {
"detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
},
"description": """ """,
}

View File

@@ -0,0 +1,99 @@
from __future__ import annotations
from dataclasses import dataclass
import mimetypes, os, pathlib
from jinja2 import Environment, FileSystemLoader
from urllib.parse import quote
from loguru import logger
import json
import base64
from auto_archiver.version import __version__
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.base_modules import Formatter
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str
@dataclass
class HtmlFormatter(Formatter):
name = "html_formatter"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")), autoescape=True)
# JinjaHelper class static methods are added as filters
self.environment.filters.update({
k: v.__func__ for k, v in JinjaHelpers.__dict__.items() if isinstance(v, staticmethod)
})
self.template = self.environment.get_template("html_template.html")
# @staticmethod
# def configs() -> dict:
# return {
# "detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
# }
def format(self, item: Metadata) -> Media:
url = item.get_url()
if item.is_empty():
logger.debug(f"[SKIP] FORMAT there is no media or metadata to format: {url=}")
return
content = self.template.render(
url=url,
title=item.get_title(),
media=item.media,
metadata=item.metadata,
version=__version__
)
html_path = os.path.join(ArchivingContext.get_tmp_dir(), f"formatted{random_str(24)}.html")
with open(html_path, mode="w", encoding="utf-8") as outf:
outf.write(content)
final_media = Media(filename=html_path, _mimetype="text/html")
he = HashEnricher({"hash_enricher": {"algorithm": ArchivingContext.get("hash_enricher.algorithm"), "chunksize": 1.6e7}})
if len(hd := he.calculate_hash(final_media.filename)):
final_media.set("hash", f"{he.algorithm}:{hd}")
return final_media
# JINJA helper filters
class JinjaHelpers:
@staticmethod
def is_list(v) -> bool:
return isinstance(v, list)
@staticmethod
def is_video(s: str) -> bool:
m = mimetypes.guess_type(s)[0]
return "video" in (m or "")
@staticmethod
def is_image(s: str) -> bool:
m = mimetypes.guess_type(s)[0]
return "image" in (m or "")
@staticmethod
def is_audio(s: str) -> bool:
m = mimetypes.guess_type(s)[0]
return "audio" in (m or "")
@staticmethod
def is_media(v) -> bool:
return isinstance(v, Media)
@staticmethod
def get_extension(filename: str) -> str:
return os.path.splitext(filename)[1]
@staticmethod
def quote(s: str) -> str:
return quote(s)
@staticmethod
def json_dump_b64(d: dict) -> str:
j = json.dumps(d, indent=4, default=str)
return base64.b64encode(j.encode()).decode()

View File

@@ -0,0 +1,332 @@
{# templates/results.html #}
{% import 'macros.html' as macros %}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto:300,300italic,700,700italic">
<title>{{ url }}</title>
<style>
html {
font-family: 'Roboto', sans-serif;
}
table {
table-layout: fixed;
width: 90%;
}
table td {
word-wrap: break-word;
overflow-wrap: break-word;
padding: 5px;
}
table,
th,
td {
margin: auto;
border: 1px solid;
border-collapse: collapse;
vertical-align: top;
}
table.metadata td:first-child {
text-align: center;
}
table.content td:nth-child(2),
.center {
text-align: center;
}
.copy:hover {
background: aliceblue;
cursor: copy;
}
#notification {
position: fixed;
right: 20px;
top: 20px;
background: aquamarine;
box-shadow: 6px 8px 5px 0px #000000;
padding: 10px;
font-size: large;
display: none;
}
img,
video {
filter: gray;
-webkit-filter: grayscale(1);
filter: grayscale(1);
}
/* Disable grayscale on hover */
/* img:hover,
video:hover {
-webkit-filter: grayscale(0);
filter: none;
} */
.collapsible {
background-color: #777;
color: white;
cursor: pointer;
padding: 5px;
margin: 10px;
width: 100%;
border: none;
text-align: left;
outline: none;
font-size: 15px;
}
.active,
.collapsible:hover {
background-color: #555;
}
.collapsible-content {
padding: 0 18px;
display: none;
overflow: hidden;
background-color: #f1f1f1;
}
.pem-certificate, .text-preview {
text-align: left;
font-size: small;
}
.text-preview{
padding-left: 10px;
padding-right: 10px;
white-space: pre-wrap;
}
</style>
</head>
<body>
<div id="notification"></div>
<h2>Archived media for <span class="copy">{{ url }}</span> - <a href="{{ url }}">open</a></h2>
{% if title | string | length > 0 %}
<p><b>title:</b> '<span class="copy">{{ title }}</span>'</p>
{% endif %}
<h2 class="center">content {{ media | length }} item(s)</h2>
<form class="center">
<label>
<input type="checkbox" id="safe-media-view" checked>
Safe Media View
</label>
</form>
<table class="content">
<tr>
<th>about</th>
<th>files and preview</th>
</tr>
<tbody>
{% for m in media %}
<tr>
<td>
{{ macros.display_recursive(m, true) }}
</td>
<td>
{{ macros.display_media(m, true, url) }}
</td>
</tr>
{% endfor %}
</tbody>
</table>
<h2 class="center">metadata</h2>
<table class="metadata">
<tr>
<th>key</th>
<th>value</th>
</tr>
{% for key in metadata %}
<tr>
<td>{{ key }}</td>
<td>
{% if metadata[key] is mapping %}
<div class="center copy" copy-value64='{{metadata[key] | json_dump_b64}}'>Copy as JSON</div>
{% endif %}
{{ macros.copy_urlize(metadata[key]) }}
</td>
</tr>
{% endfor %}
</table>
<p class="center">Made with <a href="https://github.com/bellingcat/auto-archiver">bellingcat/auto-archiver</a>
v{{ version }}</p>
</body>
<script src="https://cdnjs.cloudflare.com/ajax/libs/forge/0.10.0/forge.min.js"></script>
<script defer>
// partial decode of SSL certificates
function decodeCertificate(sslCert) {
var cert = forge.pki.certificateFromPem(sslCert);
return `SSL CERTIFICATE PREVIEW:<br/><ul>
<li><b>Subject:</b> <span class="copy">${cert.subject.attributes.map(attr => `${attr.shortName}: ${attr.value}`).join(", ")}</span></li>
<li><b>Issuer:</b> <span class="copy">${cert.issuer.attributes.map(attr => `${attr.shortName}: ${attr.value}`).join(", ")}</span></li>
<li><b>Valid From:</b> <span class="copy">${cert.validity.notBefore}</span></li>
<li><b>Valid To:</b> <span class="copy">${cert.validity.notAfter}</span></li>
<li><b>Serial Number:</b> <span class="copy">${cert.serialNumber}</span></li>
</ul>`;
}
async function run() {
let setupFunctions = [
previewCertificates,
previewText,
enableCopyLogic,
enableCollapsibleLogic,
setupSafeView
];
setupFunctions.forEach(async f => {
try {
await f();
} catch (e) {
console.error(`Error in ${f.name}: ${e}`);
}
});
}
async function previewCertificates() {
await Promise.all(
Array.from(document.querySelectorAll(".pem-certificate")).map(async el => {
let certificate = await (await fetch(el.getAttribute("pem"))).text();
el.innerHTML = decodeCertificate(certificate);
let cyberChefUrl =
`https://gchq.github.io/CyberChef/#recipe=Parse_X.509_certificate('PEM')&input=${btoa(certificate)}`;
// create a new anchor with this url and append after the code
let a = document.createElement("a");
a.href = cyberChefUrl;
a.textContent = "Full certificate details";
el.parentElement.appendChild(a);
})
);
console.log("certificate preview done");
}
async function previewText() {
await Promise.all(
Array.from(document.querySelectorAll(".text-preview")).map(async el => {
let textContent = await (await fetch(el.getAttribute("url"))).text();
el.textContent = textContent;
})
);
console.log("text preview done");
}
// notification logic
const notification = document.getElementById("notification");
function showNotification(message, miliseconds) {
notification.style.display = "block";
notification.innerText = message;
setTimeout(() => {
notification.style.display = "none";
notification.innerText = "";
}, miliseconds || 1000)
}
// copy logic
async function enableCopyLogic() {
await Promise.all(
Array.from(document.querySelectorAll(".copy")).map(el => {
el.onclick = () => {
document.execCommand("copy");
}
el.addEventListener("copy", (e) => {
e.preventDefault();
if (e.clipboardData) {
if (el.hasAttribute("copy-value")) {
e.clipboardData.setData("text/plain", el.getAttribute("copy-value"));
} else if (el.hasAttribute("copy-value64")) {
// TODO: figure out how to decode unicode chars into utf-8
e.clipboardData.setData("text/plain", new String(atob(el.getAttribute(
"copy-value64"))));
} else {
e.clipboardData.setData("text/plain", el.textContent);
}
console.log(e.clipboardData.getData("text"))
showNotification("copied!")
}
})
})
)
console.log("copy logic enabled");
}
// collapsibles
async function enableCollapsibleLogic() {
let coll = document.getElementsByClassName("collapsible");
for (let i = 0; i < coll.length; i++) {
await new Promise(resolve => {
coll[i].addEventListener("click", function () {
this.classList.toggle("active");
// let content = this.nextElementSibling;
let content = this.parentElement.querySelector(".collapsible-content");
if (content.style.display === "block") {
content.style.display = "none";
} else {
content.style.display = "block";
}
});
resolve();
})
}
console.log("collapsible logic enabled");
}
async function setupSafeView() {
// logic for enabled/disabled greyscale
// Get references to the checkboxes and images/videos
const safeImageViewCheckbox = document.getElementById('safe-media-view');
const visualPreviews = document.querySelectorAll('img, video,embed');
// Function to toggle grayscale effect
function toggleGrayscale() {
visualPreviews.forEach(element => {
if (safeImageViewCheckbox.checked) {
// Enable grayscale effect
element.style.filter = 'grayscale(1)';
element.style.webkitFilter = 'grayscale(1)';
} else {
// Disable grayscale effect
element.style.filter = 'none';
element.style.webkitFilter = 'none';
}
});
}
// Add event listener to the checkbox to trigger the toggleGrayscale function
safeImageViewCheckbox.addEventListener('change', toggleGrayscale);
// Handle the hover effect using JavaScript
visualPreviews.forEach(element => {
element.addEventListener('mouseenter', () => {
// Disable grayscale effect on hover
element.style.filter = 'none';
element.style.webkitFilter = 'none';
});
element.addEventListener('mouseleave', () => {
// Re-enable grayscale effect if checkbox is checked
if (safeImageViewCheckbox.checked) {
element.style.filter = 'grayscale(1)';
element.style.webkitFilter = 'grayscale(1)';
}
});
});
toggleGrayscale();
console.log("grayscale logic enabled");
}
run();
</script>
</html>

View File

@@ -0,0 +1,151 @@
{% macro display_media(m, links, main_url) -%}
{% for url in m.urls %}
{% if url | length == 0 %}
No URL available for {{ m.key }}.
{% elif 'http://' in url or 'https://' in url or url.startswith('/') %}
{% if 'image' in m.mimetype %}
<div>
<a href="{{ url }}">
<img src="{{ url }}" style="max-height:400px;max-width:400px;"></img>
</a>
<div>
Reverse Image Search:&nbsp;
<a href="https://www.google.com/searchbyimage?sbisrc=4chanx&image_url={{ url | quote }}&safe=off">Google</a>,&nbsp;
<a href="https://lens.google.com/uploadbyurl?url={{ url | quote }}">Google Lens</a>,&nbsp;
<a href="https://yandex.ru/images/touch/search?rpt=imageview&url={{ url | quote }}">Yandex</a>,&nbsp;
<a href="https://www.bing.com/images/search?view=detailv2&iss=sbi&form=SBIVSP&sbisrc=UrlPaste&q=imgurl:{{ url | quote }}">Bing</a>,&nbsp;
<a href="https://www.tineye.com/search/?url={{ url | quote }}">Tineye</a>
</div>
<div>
Image Forensics:&nbsp;
<a href="https://fotoforensics.com/?url={{ url | quote }}">FotoForensics</a>,&nbsp;
<a href="https://mever.iti.gr/forensics/?image={{ url }}">Media Verification Assistant</a>
</div>
<p></p>
</div>
{% elif 'video' in m.mimetype %}
<div>
<video src="{{ url }}" controls style="max-height:400px;max-width:600px;">
Your browser does not support the video element.
</video>
</div>
{% elif 'application/pdf' in m.mimetype %}
<div>
<embed src="{{ url }}" width="100%" height="400px"/>
</div>
{% elif 'audio' in m.mimetype %}
<div>
<audio controls>
<source src="{{ url }}" type="{{ m.mimetype }}">
Your browser does not support the audio element.
</audio>
</div>
{% elif m.filename | get_extension == ".wacz" %}
<a href="https://replayweb.page/?source={{ url | quote }}#view=pages&url={{ main_url }}">replayweb</a>
{% elif m.filename | get_extension == ".pem" %}
<code class="pem-certificate" pem="{{url}}"></code>
{% elif 'text' in m.mimetype %}
<div>PREVIEW:<br/><code><pre class="text-preview" url="{{url}}"></pre></code></div>
{% else %}
No preview available for <code>{{ m.key }}</code>.
{% endif %}
{% else %}
{{ m.url | urlize }}
{% endif %}
{% if links %}
<a href="{{ url }}">open</a> or
<a href="{{ url }}" download="">download</a> or
{{ copy_urlize(url, "copy") }}
<br>
{% endif %}
{% endfor %}
{%- endmacro -%}
{% macro copy_urlize(val, href_text) -%}
{% if val | is_list %}
{% for item in val %}
{{ copy_urlize(item) }}
{% endfor %}
{% elif val is mapping %}
<ul>
{% for key in val %}
<li>
<b>{{ key }}:</b> {{ copy_urlize(val[key]) }}
</li>
{% endfor %}
</ul>
{% else %}
{% if href_text | length == 0 %}
<span class="copy">{{ val | string | urlize }}</span>
{% else %}
<span class="copy" copy-value="{{val}}">{{ href_text | string | urlize }}</span>
{% endif %}
{% endif %}
{%- endmacro -%}
{% macro display_recursive(prop, skip_display) -%}
{% if prop is mapping %}
<div class="center copy" copy-value64='{{prop | json_dump_b64}}'>Copy as JSON</div>
<ul>
{% for subprop in prop %}
<li>
<b>{{ subprop }}:</b>
{{ display_recursive(prop[subprop]) }}
</li>
{% endfor %}
</ul>
{% elif prop | is_list %}
{% for item in prop %}
<li>
{{ display_recursive(item) }}
</li>
{% endfor %}
{% elif prop | is_media %}
{% if not skip_display %}
{{ display_media(prop, true) }}
{% endif %}
<ul>
<li><b>key:</b> <span class="copy">{{ prop.key }}</span></li>
<li><b>type:</b> <span class="copy">{{ prop.mimetype }}</span></li>
{% for subprop in prop.properties %}
{% if prop.properties[subprop] | is_list %}
<p></p>
<div>
<b class="collapsible" title="expand">{{ subprop }} ({{ prop.properties[subprop] | length }}):</b>
<p></p>
<div class="collapsible-content">
{% for subsubprop in prop.properties[subprop] %}
{{ display_recursive(subsubprop) }}
{% endfor %}
</div>
</div>
<p></p>
{% elif prop.properties[subprop] | string | length > 1 %}
<li><b>{{ subprop }}:</b> {{ copy_urlize(prop.properties[subprop]) }}</li>
{% endif %}
{% endfor %}
</ul>
{% else %}
{{ copy_urlize(prop) }}
{% endif %}
{%- endmacro -%}

View File

@@ -1,7 +1,6 @@
{
"name": "Instagram API Archiver",
"name": "Instagram API Extractor",
"type": ["extractor"],
"entry_point": "instagram_api_archiver:InstagramApiArchiver",
"external_dependencies":
{"python": ["requests",
"loguru",

View File

@@ -1,5 +1,5 @@
"""
The `instagram_api_archiver` module provides tools for archiving various types of Instagram content
The `instagram_api_extractor` module provides tools for archiving various types of Instagram content
using the [Instagrapi API](https://github.com/subzeroid/instagrapi).
Connects to an Instagrapi API deployment and allows for downloading Instagram user profiles,
@@ -16,19 +16,19 @@ from loguru import logger
from retrying import retry
from tqdm import tqdm
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Media
from auto_archiver.core import Metadata
class InstagramAPIArchiver(Archiver):
class InstagramAPIExtractor(Extractor):
"""
Uses an https://github.com/subzeroid/instagrapi API deployment to fetch instagram posts data
# TODO: improvement collect aggregates of locations[0].location and mentions for all posts
"""
name = "instagram_api_archiver"
name = "instagram_api_extractor"
global_pattern = re.compile(
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"

View File

@@ -1,7 +1,6 @@
{
"name": "Instagram Archiver",
"name": "Instagram Extractor",
"type": ["extractor"],
"entry_point": "instagram_archiver:InstagramArchiver",
"external_dependencies": {
"python": [
"instaloader",

View File

@@ -7,15 +7,15 @@ import re, os, shutil, traceback
import instaloader # https://instaloader.github.io/as-module.html
from loguru import logger
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata
from auto_archiver.core import Media
class InstagramArchiver(Archiver):
class InstagramExtractor(Extractor):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
name = "instagram_archiver"
name = "instagram_extractor"
# NB: post regex should be tested before profile
# https://regex101.com/r/MGPquX/1
@@ -67,7 +67,7 @@ class InstagramArchiver(Archiver):
elif len(profile_matches):
result = self.download_profile(url, profile_matches[0])
except Exception as e:
logger.error(f"Failed to download with instagram archiver due to: {e}, make sure your account credentials are valid.")
logger.error(f"Failed to download with instagram extractor due to: {e}, make sure your account credentials are valid.")
finally:
shutil.rmtree(self.download_folder, ignore_errors=True)
return result

View File

@@ -1,7 +1,6 @@
{
"name": "Instagram Telegram Bot Archiver",
"name": "Instagram Telegram Bot Extractor",
"type": ["extractor"],
"entry_point": "instagram_tbot_archiver:InstagramTbotArchiver",
"external_dependencies": {"python": ["loguru",
"telethon",],
},
@@ -13,7 +12,7 @@
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
},
"description": """
The `InstagramTbotArchiver` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
The `InstagramTbotExtractor` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
such as posts and stories. It leverages the Telethon library to interact with the Telegram API, sending Instagram URLs
to the bot and downloading the resulting media and metadata. The downloaded content is stored as `Media` objects and
returned as part of a `Metadata` object.
@@ -26,7 +25,7 @@ returned as part of a `Metadata` object.
### Setup
To use the `InstagramTbotArchiver`, you need to provide the following configuration settings:
To use the `InstagramTbotExtractor`, you need to provide the following configuration settings:
- **API ID and Hash**: Telegram API credentials obtained from [my.telegram.org/apps](https://my.telegram.org/apps).
- **Session File**: Optional path to store the Telegram session file for future use.

View File

@@ -1,5 +1,5 @@
"""
InstagramTbotArchiver Module
InstagramTbotExtractor Module
This module provides functionality to archive Instagram content (posts, stories, etc.) using a Telegram bot (`instagram_load_bot`).
It interacts with the Telegram API via the Telethon library to send Instagram URLs to the bot, which retrieves the
@@ -15,18 +15,18 @@ from sqlite3 import OperationalError
from loguru import logger
from telethon.sync import TelegramClient
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import random_str
class InstagramTbotArchiver(Archiver):
class InstagramTbotExtractor(Extractor):
"""
calls a telegram bot to fetch instagram posts/stories... and gets available media from it
https://github.com/adw0rd/instagrapi
https://t.me/instagram_load_bot
"""
name = "instagram_tbot_archiver"
name = "instagram_tbot_extractor"
def __init__(self, config: dict) -> None:
super().__init__(config)
@@ -49,7 +49,7 @@ class InstagramTbotArchiver(Archiver):
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_archiver. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
with self.client.start():
logger.success(f"SETUP {self.name} login works.")

View File

@@ -0,0 +1,26 @@
m = {
"name": "Local Storage",
"type": ["storage"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
# TODO: get base storage configs
"save_to": {"default": "./archived", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
},
"description": """
LocalStorage: A storage module for saving archived content locally on the filesystem.
### Features
- Saves archived media files to a specified folder on the local filesystem.
- Maintains file metadata during storage using `shutil.copy2`.
- Supports both absolute and relative paths for stored files, configurable via `save_absolute`.
- Automatically creates directories as needed for storing files.
### Notes
- Default storage folder is `./archived`, but this can be changed via the `save_to` configuration.
- The `save_absolute` option can reveal the file structure in output formats; use with caution.
"""
}

View File

@@ -0,0 +1,44 @@
import shutil
from typing import IO
import os
from loguru import logger
from auto_archiver.core import Media
from auto_archiver.base_modules import Storage
class LocalStorage(Storage):
name = "local_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
os.makedirs(self.save_to, exist_ok=True)
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"save_to": {"default": "./archived", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
})
def get_cdn_url(self, media: Media) -> str:
# TODO: is this viable with Storage.configs on path/filename?
dest = os.path.join(self.save_to, media.key)
if self.save_absolute:
dest = os.path.abspath(dest)
return dest
def upload(self, media: Media, **kwargs) -> bool:
# override parent so that we can use shutil.copy2 and keep metadata
dest = os.path.join(self.save_to, media.key)
os.makedirs(os.path.dirname(dest), exist_ok=True)
logger.debug(f'[{self.__class__.name}] storing file {media.filename} with key {media.key} to {dest}')
res = shutil.copy2(media.filename, dest)
logger.info(res)
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -2,7 +2,7 @@ import datetime
import os
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata

View File

@@ -2,7 +2,7 @@ import subprocess
import traceback
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata

View File

@@ -0,0 +1,9 @@
m = {
"name": "Mute Formatter",
"type": ["formatter"],
"requires_setup": False,
"external_dependencies": {
},
"description": """ Default formatter.
""",
}

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from dataclasses import dataclass
from ..core import Metadata, Media
from . import Formatter
@dataclass
class MuteFormatter(Formatter):
name = "mute_formatter"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def format(self, item: Metadata) -> Media: return None

View File

@@ -16,7 +16,7 @@ import numpy as np
from PIL import Image, UnidentifiedImageError
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata

View File

View File

@@ -0,0 +1,40 @@
m = {
"name": "S3 Storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {
"python": ["boto3", "loguru"],
},
"configs": {
# TODO: get base storage configs
"bucket": {"default": None, "help": "S3 bucket name"},
"region": {"default": None, "help": "S3 region name"},
"key": {"default": None, "help": "S3 API key"},
"secret": {"default": None, "help": "S3 API secret"},
"random_no_duplicate": {"default": False, "help": f"if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `{NO_DUPLICATES_FOLDER}`"},
"endpoint_url": {
"default": 'https://{region}.digitaloceanspaces.com',
"help": "S3 bucket endpoint, {region} are inserted at runtime"
},
"cdn_url": {
"default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}',
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
},
"description": """
S3Storage: A storage module for saving media files to an S3-compatible object storage.
### Features
- Uploads media files to an S3 bucket with customizable configurations.
- Supports `random_no_duplicate` mode to avoid duplicate uploads by checking existing files based on SHA-256 hashes.
- Automatically generates unique paths for files when duplicates are found.
- Configurable endpoint and CDN URL for different S3-compatible providers.
- Supports both private and public file storage, with public files being readable online.
### Notes
- Requires S3 credentials (API key and secret) and a bucket name to function.
- The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures.
- Uses `boto3` for interaction with the S3 API.
"""
}

View File

@@ -0,0 +1,96 @@
from typing import IO
import boto3, os
from auto_archiver.utils.misc import random_str
from auto_archiver.core import Media
from auto_archiver.base_modules import Storage
# TODO
from auto_archiver.modules.hash_enricher import HashEnricher
from loguru import logger
NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage):
name = "s3_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
self.s3 = boto3.client(
's3',
region_name=self.region,
endpoint_url=self.endpoint_url.format(region=self.region),
aws_access_key_id=self.key,
aws_secret_access_key=self.secret
)
self.random_no_duplicate = bool(self.random_no_duplicate)
if self.random_no_duplicate:
logger.warning("random_no_duplicate is set to True, this will override `path_generator`, `filename_generator` and `folder`.")
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"bucket": {"default": None, "help": "S3 bucket name"},
"region": {"default": None, "help": "S3 region name"},
"key": {"default": None, "help": "S3 API key"},
"secret": {"default": None, "help": "S3 API secret"},
"random_no_duplicate": {"default": False, "help": f"if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `{NO_DUPLICATES_FOLDER}`"},
"endpoint_url": {
"default": 'https://{region}.digitaloceanspaces.com',
"help": "S3 bucket endpoint, {region} are inserted at runtime"
},
"cdn_url": {
"default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}',
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
})
def get_cdn_url(self, media: Media) -> str:
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=media.key)
def uploadf(self, file: IO[bytes], media: Media, **kwargs: dict) -> None:
if not self.is_upload_needed(media): return True
extra_args = kwargs.get("extra_args", {})
if not self.private and 'ACL' not in extra_args:
extra_args['ACL'] = 'public-read'
if 'ContentType' not in extra_args:
try:
if media.mimetype:
extra_args['ContentType'] = media.mimetype
except Exception as e:
logger.warning(f"Unable to get mimetype for {media.key=}, error: {e}")
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=media.key, ExtraArgs=extra_args)
return True
def is_upload_needed(self, media: Media) -> bool:
if self.random_no_duplicate:
# checks if a folder with the hash already exists, if so it skips the upload
he = HashEnricher({"hash_enricher": {"algorithm": "SHA-256", "chunksize": 1.6e7}})
hd = he.calculate_hash(media.filename)
path = os.path.join(NO_DUPLICATES_FOLDER, hd[:24])
if existing_key:=self.file_in_folder(path):
media.key = existing_key
media.set("previously archived", True)
logger.debug(f"skipping upload of {media.filename} because it already exists in {media.key}")
return False
_, ext = os.path.splitext(media.key)
media.key = os.path.join(path, f"{random_str(24)}{ext}")
return True
def file_in_folder(self, path:str) -> str:
# checks if path exists and is not an empty folder
if not path.endswith('/'):
path = path + '/'
resp = self.s3.list_objects(Bucket=self.bucket, Prefix=path, Delimiter='/', MaxKeys=1)
if 'Contents' in resp:
return resp['Contents'][0]['Key']
return False

View File

@@ -5,7 +5,7 @@ import base64
from selenium.common.exceptions import TimeoutException
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.utils import Webdriver, UrlUtil, random_str
from auto_archiver.core import Media, Metadata, ArchivingContext

View File

@@ -3,7 +3,7 @@ from slugify import slugify
from urllib.parse import urlparse
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media

View File

@@ -1,7 +1,6 @@
{
"name": "Telegram Archiver",
"name": "Telegram Extractor",
"type": ["extractor"],
"entry_point": "telegram_archiver:TelegramArchiver",
"requires_setup": False,
"external_dependencies": {
"python": [
@@ -11,7 +10,7 @@
],
},
"description": """
The `TelegramArchiver` retrieves publicly available media content from Telegram message links without requiring login credentials.
The `TelegramExtractor` retrieves publicly available media content from Telegram message links without requiring login credentials.
It processes URLs to fetch images and videos embedded in Telegram messages, ensuring a structured output using `Metadata`
and `Media` objects. Recommended for scenarios where login-based archiving is not viable, although `telethon_archiver`
is advised for more comprehensive functionality.

View File

@@ -2,16 +2,16 @@ import requests, re, html
from bs4 import BeautifulSoup
from loguru import logger
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata, Media
class TelegramArchiver(Archiver):
class TelegramExtractor(Extractor):
"""
Archiver for telegram that does not require login, but the telethon_archiver is much more advised,
Extractor for telegram that does not require login, but the telethon_extractor is much more advised,
will only return if at least one image or one video is found
"""
name = "telegram_archiver"
name = "telegram_extractor"
def __init__(self, config: dict) -> None:
super().__init__(config)

View File

@@ -1,8 +1,7 @@
# TODO rm dependency on json
{
"name": "telethon_archiver",
"name": "telethon_extractor",
"type": ["extractor"],
"entry_point": "telethon_archiver:TelethonArchiver",
"requires_setup": True,
"external_dependencies": {
"python": ["telethon",
@@ -25,7 +24,7 @@
}
},
"description": """
The `TelethonArchiver` uses the Telethon library to archive posts and media from Telegram channels and groups.
The `TelethonExtractor` uses the Telethon library to archive posts and media from Telegram channels and groups.
It supports private and public channels, downloading grouped posts with media, and can join channels using invite links
if provided in the configuration.
@@ -37,7 +36,7 @@ if provided in the configuration.
- Outputs structured metadata and media using `Metadata` and `Media` objects.
### Setup
To use the `TelethonArchiver`, you must configure the following:
To use the `TelethonExtractor`, you must configure the following:
- **API ID and API Hash**: Obtain these from [my.telegram.org](https://my.telegram.org/apps).
- **Session File**: Optional, but records login sessions for future use (default: `secrets/anon.session`).
- **Bot Token**: Optional, allows access to additional content (e.g., large videos) but limits private channel archiving.

View File

@@ -8,13 +8,13 @@ from loguru import logger
from tqdm import tqdm
import re, time, json, os
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import random_str
class TelethonArchiver(Archiver):
name = "telethon_archiver"
class TelethonArchiver(Extractor):
name = "telethon_extractor"
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")

View File

@@ -9,7 +9,7 @@ and identify important moments without watching the entire video.
import ffmpeg, os
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Media, Metadata, ArchivingContext
from auto_archiver.utils.misc import random_str

View File

@@ -8,9 +8,9 @@ from certvalidator import CertificateValidator, ValidationContext
from asn1crypto import pem
import certifi
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
class TimestampingEnricher(Enricher):

View File

@@ -1,7 +1,6 @@
{
"name": "Twitter API Archiver",
"name": "Twitter API Extractor",
"type": ["extractor"],
"entry_point": "twitter_api_archiver:TwitterApiArchiver",
"requires_setup": True,
"external_dependencies": {
"python": ["requests",
@@ -20,7 +19,7 @@
"access_secret": {"default": None, "help": "twitter API access_secret"},
},
"description": """
The `TwitterApiArchiver` fetches tweets and associated media using the Twitter API.
The `TwitterApiExtractor` fetches tweets and associated media using the Twitter API.
It supports multiple API configurations for extended rate limits and reliable access.
Features include URL expansion, media downloads (e.g., images, videos), and structured output
via `Metadata` and `Media` objects. Requires Twitter API credentials such as bearer tokens
@@ -34,7 +33,7 @@
- Outputs structured metadata and media using `Metadata` and `Media` objects.
### Setup
To use the `TwitterApiArchiver`, you must provide valid Twitter API credentials via configuration:
To use the `TwitterApiExtractor`, you must provide valid Twitter API credentials via configuration:
- **Bearer Token(s)**: A single token or a list for rate-limited API access.
- **Consumer Key and Secret**: Required for user-authenticated API access.
- **Access Token and Secret**: Complements the consumer key for enhanced API capabilities.

View File

@@ -8,11 +8,11 @@ from loguru import logger
from pytwitter import Api
from slugify import slugify
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata,Media
class TwitterApiArchiver(Archiver):
name = "twitter_api_archiver"
class TwitterApiExtractor(Extractor):
name = "twitter_api_extractor"
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def __init__(self, config: dict) -> None:

View File

@@ -1,7 +1,6 @@
{
"name": "VKontakte Archiver",
"name": "VKontakte Extractor",
"type": ["extractor"],
"entry_point": "vk_archiver:VKArchiver",
"requires_setup": True,
"depends": ["core", "utils"],
"external_dependencies": {
@@ -14,7 +13,7 @@
"session_file": {"default": "secrets/vk_config.v2.json", "help": "valid VKontakte password"},
},
"description": """
The `VkArchiver` fetches posts, text, and images from VK (VKontakte) social media pages.
The `VkExtractor` fetches posts, text, and images from VK (VKontakte) social media pages.
This archiver is specialized for `/wall` posts and uses the `VkScraper` library to extract
and download content. Note that VK videos are handled separately by the `YTDownloader`.

View File

@@ -2,16 +2,16 @@ from loguru import logger
from vk_url_scraper import VkScraper
from auto_archiver.utils.misc import dump_payload
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext
class VkArchiver(Archiver):
class VkExtractor(Extractor):
""""
VK videos are handled by YTDownloader, this archiver gets posts text and images.
Currently only works for /wall posts
"""
name = "vk_archiver"
name = "vk_extractor"
def __init__(self, config: dict) -> None:
super().__init__(config)

View File

@@ -6,12 +6,11 @@ from loguru import logger
from warcio.archiveiterator import ArchiveIterator
from auto_archiver.core import Media, Metadata, ArchivingContext
from auto_archiver.enrichers import Enricher
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor, Enricher
from auto_archiver.utils import UrlUtil, random_str
class WaczArchiverEnricher(Enricher, Archiver):
class WaczExtractorEnricher(Enricher, Extractor):
"""
Uses https://github.com/webrecorder/browsertrix-crawler to generate a .WACZ archive of the URL
If used with [profiles](https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles)

View File

@@ -2,12 +2,11 @@ import json
from loguru import logger
import time, requests
from auto_archiver.enrichers import Enricher
from auto_archiver.archivers import Archiver
from auto_archiver.base_modules import Extractor, Enricher
from auto_archiver.utils import UrlUtil
from auto_archiver.core import Metadata
class WaybackArchiverEnricher(Enricher, Archiver):
class WaybackExtractorEnricher(Enricher, Extractor):
"""
Submits the current URL to the webarchive and returns a job_id or completed archive.

View File

@@ -2,9 +2,9 @@ import traceback
import requests, time
from loguru import logger
from auto_archiver.enrichers import Enricher
from auto_archiver.base_modules import Enricher
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.storages import S3Storage
from auto_archiver.modules import S3Storage
class WhisperEnricher(Enricher):