Merge branch 'main' into opentimestamps

This commit is contained in:
Patrick Robertson
2025-03-11 17:21:34 +00:00
193 changed files with 61842 additions and 3011 deletions

View File

@@ -3,7 +3,7 @@ from auto_archiver.core.orchestrator import ArchivingOrchestrator
import sys
def main():
ArchivingOrchestrator().run(sys.argv[1:])
for _ in ArchivingOrchestrator()._command_line_run(sys.argv[1:]): pass
if __name__ == "__main__":
main()

View File

@@ -3,7 +3,7 @@
"""
from .metadata import Metadata
from .media import Media
from .module import BaseModule
from .base_module import BaseModule
# cannot import ArchivingOrchestrator/Config to avoid circular dep
# from .orchestrator import ArchivingOrchestrator

View File

@@ -1,13 +1,18 @@
from urllib.parse import urlparse
from typing import Mapping, Any
from __future__ import annotations
from typing import Mapping, Any, Type, TYPE_CHECKING
from abc import ABC
from copy import deepcopy, copy
from tempfile import TemporaryDirectory
from auto_archiver.utils import url as UrlUtil
from auto_archiver.core.consts import MODULE_TYPES as CONF_MODULE_TYPES
from loguru import logger
if TYPE_CHECKING:
from .module import ModuleFactory
class BaseModule(ABC):
"""
@@ -17,41 +22,24 @@ class BaseModule(ABC):
however modules can have a .setup() method to run any setup code
(e.g. logging in to a site, spinning up a browser etc.)
See BaseModule.MODULE_TYPES for the types of modules you can create, noting that
See consts.MODULE_TYPES for the types of modules you can create, noting that
a subclass can be of multiple types. For example, a module that extracts data from
a website and stores it in a database would be both an 'extractor' and a 'database' module.
Each module is a python package, and should have a __manifest__.py file in the
same directory as the module file. The __manifest__.py specifies the module information
like name, author, version, dependencies etc. See BaseModule._DEFAULT_MANIFEST for the
like name, author, version, dependencies etc. See DEFAULT_MANIFEST for the
default manifest structure.
"""
MODULE_TYPES = [
'feeder',
'extractor',
'enricher',
'database',
'storage',
'formatter'
]
_DEFAULT_MANIFEST = {
'name': '', # the display name of the module
'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name!
'type': [], # the type of the module, can be one or more of BaseModule.MODULE_TYPES
'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional softare
'description': '', # a description of the module
'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format
'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName
'version': '1.0', # the version of the module
'configs': {} # any configuration options this module has, these will be exposed to the user in the config file or via the command line
}
MODULE_TYPES = CONF_MODULE_TYPES
# NOTE: these here are declard as class variables, but they are overridden by the instance variables in the __init__ method
config: Mapping[str, Any]
authentication: Mapping[str, Mapping[str, str]]
name: str
module_factory: ModuleFactory
# this is set by the orchestrator prior to archiving
tmp_dir: TemporaryDirectory = None
@@ -62,14 +50,6 @@ class BaseModule(ABC):
def config_setup(self, config: dict):
authentication = config.get('authentication', {})
# extract out concatenated sites
for key, val in copy(authentication).items():
if "," in key:
for site in key.split(","):
authentication[site] = val
del authentication[key]
# this is important. Each instance is given its own deepcopied config, so modules cannot
# change values to affect other modules
config = deepcopy(config)
@@ -89,21 +69,28 @@ class BaseModule(ABC):
Returns the authentication information for a given site. This is used to authenticate
with a site before extracting data. The site should be the domain of the site, e.g. 'twitter.com'
extract_cookies: bool - whether or not to extract cookies from the given browser and return the
cookie jar (disabling can speed up) processing if you don't actually need the cookies jar
:param site: the domain of the site to get authentication information for
:param extract_cookies: whether or not to extract cookies from the given browser/file and return the cookie jar (disabling can speed up processing if you don't actually need the cookies jar).
Currently, the dict can have keys of the following types:
- username: str - the username to use for login
- password: str - the password to use for login
- api_key: str - the API key to use for login
- api_secret: str - the API secret to use for login
- cookie: str - a cookie string to use for login (specific to this site)
- cookies_jar: YoutubeDLCookieJar | http.cookiejar.MozillaCookieJar - a cookie jar compatible with requests (e.g. `requests.get(cookies=cookie_jar)`)
:returns: authdict dict of login information for the given site
**Global options:**\n
* cookies_from_browser: str - the name of the browser to extract cookies from (e.g. 'chrome', 'firefox' - uses ytdlp under the hood to extract\n
* cookies_file: str - the path to a cookies file to use for login\n
**Currently, the sites dict can have keys of the following types:**\n
* username: str - the username to use for login\n
* password: str - the password to use for login\n
* api_key: str - the API key to use for login\n
* api_secret: str - the API secret to use for login\n
* cookie: str - a cookie string to use for login (specific to this site)\n
* cookies_file: str - the path to a cookies file to use for login (specific to this site)\n
* cookies_from_browser: str - the name of the browser to extract cookies from (specitic for this site)\n
"""
# TODO: think about if/how we can deal with sites that have multiple domains (main one is x.com/twitter.com)
# for now the user must enter them both, like "x.com,twitter.com" in their config. Maybe we just hard-code?
site = UrlUtil.domain_for_url(site)
site = UrlUtil.domain_for_url(site).lstrip("www.")
# add the 'www' version of the site to the list of sites to check
authdict = {}
@@ -118,8 +105,8 @@ class BaseModule(ABC):
for key in self.authentication.keys():
if key in site or site in key:
logger.debug(f"Could not find exact authentication information for site '{site}'. \
did find information for '{key}' which is close, is this what you meant? \
If so, edit your authentication settings to make sure it exactly matches.")
did find information for '{key}' which is close, is this what you meant? \
If so, edit your authentication settings to make sure it exactly matches.")
def get_ytdlp_cookiejar(args):
import yt_dlp
@@ -130,16 +117,29 @@ class BaseModule(ABC):
ytdlp_opts = getattr(parse_options(args), 'ydl_opts')
return yt_dlp.YoutubeDL(ytdlp_opts).cookiejar
# get the cookies jar, prefer the browser cookies than the file
if 'cookies_from_browser' in self.authentication:
get_cookiejar_options = None
# order of priority:
# 1. cookies_from_browser setting in site config
# 2. cookies_file setting in site config
# 3. cookies_from_browser setting in global config
# 4. cookies_file setting in global config
if 'cookies_from_browser' in authdict:
get_cookiejar_options = ['--cookies-from-browser', authdict['cookies_from_browser']]
elif 'cookies_file' in authdict:
get_cookiejar_options = ['--cookies', authdict['cookies_file']]
elif 'cookies_from_browser' in self.authentication:
authdict['cookies_from_browser'] = self.authentication['cookies_from_browser']
if extract_cookies:
authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies-from-browser', self.authentication['cookies_from_browser']])
get_cookiejar_options = ['--cookies-from-browser', self.authentication['cookies_from_browser']]
elif 'cookies_file' in self.authentication:
authdict['cookies_file'] = self.authentication['cookies_file']
if extract_cookies:
authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies', self.authentication['cookies_file']])
get_cookiejar_options = ['--cookies', self.authentication['cookies_file']]
if get_cookiejar_options:
authdict['cookies_jar'] = get_ytdlp_cookiejar(get_cookiejar_options)
return authdict
def repr(self):

View File

@@ -7,21 +7,23 @@ flexible setup in various environments.
import argparse
from ruamel.yaml import YAML, CommentedMap, add_representer
import json
from loguru import logger
from copy import deepcopy
from .module import BaseModule
from auto_archiver.core.consts import MODULE_TYPES
from typing import Any, List, Type, Tuple
_yaml: YAML = YAML()
DEFAULT_CONFIG_FILE = "secrets/orchestration.yaml"
EMPTY_CONFIG = _yaml.load("""
# Auto Archiver Configuration
# Steps are the modules that will be run in the order they are defined
steps:""" + "".join([f"\n {module}s: []" for module in BaseModule.MODULE_TYPES]) + \
# Steps are the modules that will be run in the order they are defined
steps:""" + "".join([f"\n {module}s: []" for module in MODULE_TYPES]) + \
"""
# Global configuration
@@ -48,9 +50,61 @@ authentication: {}
logging:
level: INFO
""")
# note: 'logging' is explicitly added above in order to better format the config file
# Arg Parse Actions/Classes
class AuthenticationJsonParseAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
auth_dict = json.loads(values)
setattr(namespace, self.dest, auth_dict)
except json.JSONDecodeError as e:
raise argparse.ArgumentTypeError(f"Invalid JSON input for argument '{self.dest}': {e}")
def load_from_file(path):
try:
with open(path, 'r') as f:
try:
auth_dict = json.load(f)
except json.JSONDecodeError:
f.seek(0)
# maybe it's yaml, try that
auth_dict = _yaml.load(f)
if auth_dict.get('authentication'):
auth_dict = auth_dict['authentication']
auth_dict['load_from_file'] = path
return auth_dict
except:
return None
if isinstance(auth_dict, dict) and auth_dict.get('from_file'):
auth_dict = load_from_file(auth_dict['from_file'])
elif isinstance(auth_dict, str):
# if it's a string
auth_dict = load_from_file(auth_dict)
if not isinstance(auth_dict, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
global_options = ['cookies_from_browser', 'cookies_file', 'load_from_file']
for key, auth in auth_dict.items():
if key in global_options:
continue
if not isinstance(key, str) or not isinstance(auth, dict):
raise argparse.ArgumentTypeError(f"Authentication must be a dictionary of site names and their authentication methods. Valid global configs are {global_options}")
setattr(namespace, self.dest, auth_dict)
class UniqueAppendAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
for value in values:
if value not in getattr(namespace, self.dest):
getattr(namespace, self.dest).append(value)
class DefaultValidatingParser(argparse.ArgumentParser):
def error(self, message):
@@ -81,6 +135,7 @@ class DefaultValidatingParser(argparse.ArgumentParser):
return super().parse_known_args(args, namespace)
# Config Utils
def to_dot_notation(yaml_conf: CommentedMap | dict) -> dict:
dotdict = {}
@@ -128,6 +183,11 @@ def merge_dicts(dotdict: dict, yaml_dict: CommentedMap) -> CommentedMap:
yaml_subdict[key] = value
continue
if key == 'steps':
for module_type, modules in value.items():
# overwrite the 'steps' from the config file with the ones from the CLI
yaml_subdict[key][module_type] = modules
if is_dict_type(value):
update_dict(value, yaml_subdict[key])
elif is_list_type(value):
@@ -136,7 +196,6 @@ def merge_dicts(dotdict: dict, yaml_dict: CommentedMap) -> CommentedMap:
yaml_subdict[key] = value
update_dict(from_dot_notation(dotdict), yaml_dict)
return yaml_dict
def read_yaml(yaml_filename: str) -> CommentedMap:
@@ -148,8 +207,8 @@ def read_yaml(yaml_filename: str) -> CommentedMap:
pass
if not config:
config = EMPTY_CONFIG
config = deepcopy(EMPTY_CONFIG)
return config
# TODO: make this tidier/find a way to notify of which keys should not be stored
@@ -158,6 +217,14 @@ def read_yaml(yaml_filename: str) -> CommentedMap:
def store_yaml(config: CommentedMap, yaml_filename: str) -> None:
config_to_save = deepcopy(config)
auth_dict = config_to_save.get("authentication", {})
if auth_dict and auth_dict.get('load_from_file'):
# remove all other values from the config, don't want to store it in the config file
auth_dict = {"load_from_file": auth_dict["load_from_file"]}
config_to_save.pop('urls', None)
with open(yaml_filename, "w", encoding="utf-8") as outf:
_yaml.dump(config_to_save, outf)
_yaml.dump(config_to_save, outf)
def is_valid_config(config: CommentedMap) -> bool:
return config and config != EMPTY_CONFIG

View File

@@ -0,0 +1,25 @@
class SetupError(ValueError):
pass
MODULE_TYPES = [
'feeder',
'extractor',
'enricher',
'database',
'storage',
'formatter'
]
MANIFEST_FILE = "__manifest__.py"
DEFAULT_MANIFEST = {
'name': '', # the display name of the module
'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name!
'type': [], # the type of the module, can be one or more of MODULE_TYPES
'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional software
'description': '', # a description of the module
'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format
'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName
'version': '1.0', # the version of the module
'configs': {} # any configuration options this module has, these will be exposed to the user in the config file or via the command line
}

View File

@@ -1,3 +1,8 @@
"""
Database module for the auto-archiver that defines the interface for implementing database modules
in the media archiving framework.
"""
from __future__ import annotations
from abc import abstractmethod
from typing import Union
@@ -5,6 +10,11 @@ from typing import Union
from auto_archiver.core import Metadata, BaseModule
class Database(BaseModule):
"""
Base class for implementing database modules in the media archiving framework.
Subclasses must implement the `fetch` and `done` methods to define platform-specific behavior.
"""
def started(self, item: Metadata) -> None:
"""signals the DB that the given item archival has started"""

View File

@@ -1,5 +1,5 @@
"""
Enrichers are modular components that enhance archived content by adding
Base module for Enrichers modular components that enhance archived content by adding
context, metadata, or additional processing.
These add additional information to the context, such as screenshots, hashes, and metadata.
@@ -13,7 +13,16 @@ from abc import abstractmethod
from auto_archiver.core import Metadata, BaseModule
class Enricher(BaseModule):
"""Base classes and utilities for enrichers in the Auto-Archiver system."""
"""Base classes and utilities for enrichers in the Auto Archiver system.
Enricher modules must implement the `enrich` method to define their behavior.
"""
@abstractmethod
def enrich(self, to_enrich: Metadata) -> None: pass
def enrich(self, to_enrich: Metadata) -> None:
"""
Enriches a Metadata object with additional information or context.
Takes the metadata object to enrich as an argument and modifies it in place, returning None.
"""
pass

View File

@@ -29,14 +29,24 @@ class Extractor(BaseModule):
valid_url: re.Pattern = None
def cleanup(self) -> None:
# called when extractors are done, or upon errors, cleanup any resources
"""
Called when extractors are done, or upon errors, cleanup any resources
"""
pass
def sanitize_url(self, url: str) -> str:
# used to clean unnecessary URL parameters OR unfurl redirect links
"""
Used to clean unnecessary URL parameters OR unfurl redirect links
"""
return url
def match_link(self, url: str) -> re.Match:
"""
Returns a match object if the given URL matches the valid_url pattern or False/None if not.
Normally used in the `suitable` method to check if the URL is supported by this extractor.
"""
return self.valid_url.match(url)
def suitable(self, url: str) -> bool:
@@ -71,7 +81,8 @@ class Extractor(BaseModule):
if len(to_filename) > 64:
to_filename = to_filename[-64:]
to_filename = os.path.join(self.tmp_dir, to_filename)
if verbose: logger.debug(f"downloading {url[0:50]=} {to_filename=}")
if verbose:
logger.debug(f"downloading {url[0:50]=} {to_filename=}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
@@ -80,8 +91,8 @@ class Extractor(BaseModule):
d.raise_for_status()
# get mimetype from the response headers
if not Path(to_filename).suffix:
content_type = d.headers.get('Content-Type')
if not mimetypes.guess_type(to_filename)[0]:
content_type = d.headers.get('Content-Type') or self._guess_file_type(url)
extension = mimetypes.guess_extension(content_type)
if extension:
to_filename += extension

View File

@@ -1,3 +1,7 @@
"""
The feeder base module defines the interface for implementing feeders in the media archiving framework.
"""
from __future__ import annotations
from abc import abstractmethod
from auto_archiver.core import Metadata
@@ -5,5 +9,17 @@ from auto_archiver.core import BaseModule
class Feeder(BaseModule):
"""
Base class for implementing feeders in the media archiving framework.
Subclasses must implement the `__iter__` method to define platform-specific behavior.
"""
@abstractmethod
def __iter__(self) -> Metadata: return None
def __iter__(self) -> Metadata:
"""
Returns an iterator (use `yield`) over the items to be archived.
These should be instances of Metadata, typically created with Metadata().set_url(url).
"""
return None

View File

@@ -1,9 +1,24 @@
"""
Base module for formatters modular components that format metadata into media objects for storage.
The most commonly used formatter is the HTML formatter, which takes metadata and formats it into an HTML file for storage.
"""
from __future__ import annotations
from abc import abstractmethod
from auto_archiver.core import Metadata, Media, BaseModule
class Formatter(BaseModule):
"""
Base class for implementing formatters in the media archiving framework.
Subclasses must implement the `format` method to define their behavior.
"""
@abstractmethod
def format(self, item: Metadata) -> Media: return None
def format(self, item: Metadata) -> Media:
"""
Formats a Metadata object into a user-viewable format (e.g. HTML) and stores it if needed.
"""
return None

View File

@@ -6,7 +6,7 @@ nested media retrieval, and type validation.
from __future__ import annotations
import os
import traceback
from typing import Any, List
from typing import Any, List, Iterator
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json, config
import mimetypes
@@ -21,14 +21,13 @@ class Media:
Represents a media file with associated properties and storage details.
Attributes:
- filename: The file path of the media.
- key: An optional identifier for the media.
- filename: The file path of the media as saved locally (temporarily, before uploading to the storage).
- urls: A list of URLs where the media is stored or accessible.
- properties: Additional metadata or transformations for the media.
- _mimetype: The media's mimetype (e.g., image/jpeg, video/mp4).
"""
filename: str
key: str = None
_key: str = None
urls: List[str] = field(default_factory=list)
properties: dict = field(default_factory=dict)
_mimetype: str = None # eg: image/jpeg
@@ -47,7 +46,7 @@ class Media:
for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url, metadata=metadata)
def all_inner_media(self, include_self=False):
def all_inner_media(self, include_self=False) -> Iterator[Media]:
"""Retrieves all media, including nested media within properties or transformations on original media.
This function returns a generator for all the inner media.
@@ -67,6 +66,10 @@ class Media:
# checks if the media is already stored in the given storage
return len(self.urls) > 0 and len(self.urls) == len(in_storage.config["steps"]["storages"])
@property
def key(self) -> str:
return self._key
def set(self, key: str, value: Any) -> Media:
self.properties[key] = value
return self

View File

@@ -6,7 +6,7 @@ by handling user configuration, validating the steps properties, and implementin
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from typing import List, TYPE_CHECKING
import shutil
import ast
import copy
@@ -16,99 +16,116 @@ import os
from os.path import join
from loguru import logger
import auto_archiver
from .base_module import BaseModule
from auto_archiver.core.consts import DEFAULT_MANIFEST, MANIFEST_FILE
_LAZY_LOADED_MODULES = {}
MANIFEST_FILE = "__manifest__.py"
if TYPE_CHECKING:
from .base_module import BaseModule
def setup_paths(paths: list[str]) -> None:
"""
Sets up the paths for the modules to be loaded from
This is necessary for the modules to be imported correctly
"""
for path in paths:
# check path exists, if it doesn't, log a warning
if not os.path.exists(path):
logger.warning(f"Path '{path}' does not exist. Skipping...")
continue
HAS_SETUP_PATHS = False
# see odoo/module/module.py -> initialize_sys_path
if path not in auto_archiver.modules.__path__:
auto_archiver.modules.__path__.append(path)
class ModuleFactory:
# sort based on the length of the path, so that the longest path is last in the list
auto_archiver.modules.__path__ = sorted(auto_archiver.modules.__path__, key=len, reverse=True)
def __init__(self):
self._lazy_modules = {}
def get_module(module_name: str, config: dict) -> BaseModule:
"""
Gets and sets up a module using the provided config
This will actually load and instantiate the module, and load all its dependencies (i.e. not lazy)
"""
return get_module_lazy(module_name).load(config)
def setup_paths(self, paths: list[str]) -> None:
"""
Sets up the paths for the modules to be loaded from
This is necessary for the modules to be imported correctly
"""
global HAS_SETUP_PATHS
def get_module_lazy(module_name: str, suppress_warnings: bool = False) -> LazyBaseModule:
"""
Lazily loads a module, returning a LazyBaseModule
This has all the information about the module, but does not load the module itself or its dependencies
To load an actual module, call .setup() on a lazy module
"""
if module_name in _LAZY_LOADED_MODULES:
return _LAZY_LOADED_MODULES[module_name]
available = available_modules(limit_to_modules=[module_name], suppress_warnings=suppress_warnings)
if not available:
raise IndexError(f"Module '{module_name}' not found. Are you sure it's installed/exists?")
return available[0]
def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= [], suppress_warnings: bool = False) -> List[LazyBaseModule]:
# search through all valid 'modules' paths. Default is 'modules' in the current directory
# see odoo/modules/module.py -> get_modules
def is_really_module(module_path):
if os.path.isfile(join(module_path, MANIFEST_FILE)):
return True
all_modules = []
for module_folder in auto_archiver.modules.__path__:
# walk through each module in module_folder and check if it has a valid manifest
try:
possible_modules = os.listdir(module_folder)
except FileNotFoundError:
logger.warning(f"Module folder {module_folder} does not exist")
continue
for possible_module in possible_modules:
if limit_to_modules and possible_module not in limit_to_modules:
for path in paths:
# check path exists, if it doesn't, log a warning
if not os.path.exists(path):
logger.warning(f"Path '{path}' does not exist. Skipping...")
continue
possible_module_path = join(module_folder, possible_module)
if not is_really_module(possible_module_path):
# see odoo/module/module.py -> initialize_sys_path
if path not in auto_archiver.modules.__path__:
if HAS_SETUP_PATHS == True:
logger.warning(f"You are attempting to re-initialise the module paths with: '{path}' for a 2nd time. \
This could lead to unexpected behaviour. It is recommended to only use a single modules path. \
If you wish to load modules from different paths then load a 2nd python interpreter (e.g. using multiprocessing).")
auto_archiver.modules.__path__.append(path)
# sort based on the length of the path, so that the longest path is last in the list
auto_archiver.modules.__path__ = sorted(auto_archiver.modules.__path__, key=len, reverse=True)
HAS_SETUP_PATHS = True
def get_module(self, module_name: str, config: dict) -> BaseModule:
"""
Gets and sets up a module using the provided config
This will actually load and instantiate the module, and load all its dependencies (i.e. not lazy)
"""
return self.get_module_lazy(module_name).load(config)
def get_module_lazy(self, module_name: str, suppress_warnings: bool = False) -> LazyBaseModule:
"""
Lazily loads a module, returning a LazyBaseModule
This has all the information about the module, but does not load the module itself or its dependencies
To load an actual module, call .setup() on a lazy module
"""
if module_name in self._lazy_modules:
return self._lazy_modules[module_name]
available = self.available_modules(limit_to_modules=[module_name], suppress_warnings=suppress_warnings)
if not available:
message = f"Module '{module_name}' not found. Are you sure it's installed/exists?"
if 'archiver' in module_name:
message += f" Did you mean {module_name.replace('archiver', 'extractor')}?"
raise IndexError(message)
return available[0]
def available_modules(self, limit_to_modules: List[str]= [], suppress_warnings: bool = False) -> List[LazyBaseModule]:
# search through all valid 'modules' paths. Default is 'modules' in the current directory
# see odoo/modules/module.py -> get_modules
def is_really_module(module_path):
if os.path.isfile(join(module_path, MANIFEST_FILE)):
return True
all_modules = []
for module_folder in auto_archiver.modules.__path__:
# walk through each module in module_folder and check if it has a valid manifest
try:
possible_modules = os.listdir(module_folder)
except FileNotFoundError:
logger.warning(f"Module folder {module_folder} does not exist")
continue
if _LAZY_LOADED_MODULES.get(possible_module):
continue
lazy_module = LazyBaseModule(possible_module, possible_module_path)
_LAZY_LOADED_MODULES[possible_module] = lazy_module
for possible_module in possible_modules:
if limit_to_modules and possible_module not in limit_to_modules:
continue
all_modules.append(lazy_module)
if not suppress_warnings:
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module '{module}' not found. Are you sure it's installed?")
possible_module_path = join(module_folder, possible_module)
if not is_really_module(possible_module_path):
continue
if self._lazy_modules.get(possible_module):
continue
lazy_module = LazyBaseModule(possible_module, possible_module_path, factory=self)
return all_modules
self._lazy_modules[possible_module] = lazy_module
all_modules.append(lazy_module)
if not suppress_warnings:
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module '{module}' not found. Are you sure it's installed?")
return all_modules
@dataclass
class LazyBaseModule:
@@ -120,17 +137,22 @@ class LazyBaseModule:
"""
name: str
type: list
description: str
path: str
module_factory: ModuleFactory
_manifest: dict = None
_instance: BaseModule = None
_entry_point: str = None
def __init__(self, module_name, path):
def __init__(self, module_name, path, factory: ModuleFactory):
self.name = module_name
self.path = path
self.module_factory = factory
@property
def type(self):
return self.manifest['type']
@property
def entry_point(self):
@@ -161,16 +183,15 @@ class LazyBaseModule:
return self._manifest
# print(f"Loading manifest for module {module_path}")
# load the manifest file
manifest = copy.deepcopy(BaseModule._DEFAULT_MANIFEST)
manifest = copy.deepcopy(DEFAULT_MANIFEST)
with open(join(self.path, MANIFEST_FILE)) as f:
try:
manifest.update(ast.literal_eval(f.read()))
except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError) as e:
logger.error(f"Error loading manifest from file {self.path}/{MANIFEST_FILE}: {e}")
raise ValueError(f"Error loading manifest from file {self.path}/{MANIFEST_FILE}: {e}")
self._manifest = manifest
self.type = manifest['type']
self._entry_point = manifest['entry_point']
self.description = manifest['description']
self.version = manifest['version']
@@ -189,13 +210,14 @@ class LazyBaseModule:
# clear out any empty strings that a user may have erroneously added
continue
if not check(dep):
logger.error(f"Module '{self.name}' requires external dependency '{dep}' which is not available/setup. Have you installed the required dependencies for the '{self.name}' module? See the README for more information.")
logger.error(f"Module '{self.name}' requires external dependency '{dep}' which is not available/setup. \
Have you installed the required dependencies for the '{self.name}' module? See the README for more information.")
exit(1)
def check_python_dep(dep):
# first check if it's a module:
try:
m = get_module_lazy(dep, suppress_warnings=True)
m = self.module_factory.get_module_lazy(dep, suppress_warnings=True)
try:
# we must now load this module and set it up with the config
m.load(config)
@@ -230,19 +252,21 @@ class LazyBaseModule:
__import__(f'{qualname}.{file_name}', fromlist=[self.entry_point])
# finally, get the class instance
instance: BaseModule = getattr(sys.modules[sub_qualname], class_name)()
if not getattr(instance, 'name', None):
instance.name = self.name
if not getattr(instance, 'display_name', None):
instance.display_name = self.display_name
self._instance = instance
# set the name, display name and module factory
instance.name = self.name
instance.display_name = self.display_name
instance.module_factory = self.module_factory
# merge the default config with the user config
default_config = dict((k, v['default']) for k, v in self.configs.items() if v.get('default'))
default_config = dict((k, v['default']) for k, v in self.configs.items() if 'default' in v)
config[self.name] = default_config | config.get(self.name, {})
instance.config_setup(config)
instance.setup()
# save the instance for future easy loading
self._instance = instance
return instance
def __repr__(self):

View File

@@ -5,88 +5,61 @@
"""
from __future__ import annotations
from typing import Generator, Union, List, Type
from urllib.parse import urlparse
from ipaddress import ip_address
from typing import Generator, Union, List, Type, TYPE_CHECKING
import argparse
import os
import sys
import json
from tempfile import TemporaryDirectory
import traceback
from copy import copy
from rich_argparse import RichHelpFormatter
from loguru import logger
import requests
from .metadata import Metadata, Media
from auto_archiver.version import __version__
from .config import _yaml, read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG, DefaultValidatingParser
from .module import available_modules, LazyBaseModule, get_module, setup_paths
from .config import read_yaml, store_yaml, to_dot_notation, merge_dicts, is_valid_config, \
DefaultValidatingParser, UniqueAppendAction, AuthenticationJsonParseAction, DEFAULT_CONFIG_FILE
from .module import ModuleFactory, LazyBaseModule
from . import validators, Feeder, Extractor, Database, Storage, Formatter, Enricher
from .module import BaseModule
from .consts import MODULE_TYPES, SetupError
from auto_archiver.utils.url import check_url_or_raise
from loguru import logger
DEFAULT_CONFIG_FILE = "orchestration.yaml"
class JsonParseAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
setattr(namespace, self.dest, json.loads(values))
except json.JSONDecodeError as e:
raise argparse.ArgumentTypeError(f"Invalid JSON input for argument '{self.dest}': {e}")
class AuthenticationJsonParseAction(JsonParseAction):
def __call__(self, parser, namespace, values, option_string=None):
super().__call__(parser, namespace, values, option_string)
auth_dict = getattr(namespace, self.dest)
if isinstance(auth_dict, str):
# if it's a string
try:
with open(auth_dict, 'r') as f:
try:
auth_dict = json.load(f)
except json.JSONDecodeError:
# maybe it's yaml, try that
auth_dict = _yaml.load(f)
except:
pass
if not isinstance(auth_dict, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
for site, auth in auth_dict.items():
if not isinstance(site, str) or not isinstance(auth, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
setattr(namespace, self.dest, auth_dict)
class UniqueAppendAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace, self.dest):
setattr(namespace, self.dest, [])
for value in values:
if value not in getattr(namespace, self.dest):
getattr(namespace, self.dest).append(value)
if TYPE_CHECKING:
from .base_module import BaseModule
from .module import LazyBaseModule
class ArchivingOrchestrator:
# instance variables
module_factory: ModuleFactory
setup_finished: bool
logger_id: int
# instance variables, used for convenience to access modules by step
feeders: List[Type[Feeder]]
extractors: List[Type[Extractor]]
enrichers: List[Type[Enricher]]
databases: List[Type[Database]]
storages: List[Type[Storage]]
formatters: List[Type[Formatter]]
def __init__(self):
self.module_factory = ModuleFactory()
self.setup_finished = False
self.logger_id = None
def setup_basic_parser(self):
parser = argparse.ArgumentParser(
prog="auto-archiver",
add_help=False,
description="""
prog="auto-archiver",
add_help=False,
description="""
Auto Archiver is a CLI tool to archive media/metadata from online URLs;
it can read URLs from many sources (Google Sheets, Command Line, ...); and write results to many destinations too (CSV, Google Sheets, MongoDB, ...)!
""",
epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter,
epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter,
)
parser.add_argument('--help', '-h', action='store_true', dest='help', help='show a full help message and exit')
parser.add_argument('--version', action='version', version=__version__)
@@ -98,105 +71,136 @@ class ArchivingOrchestrator:
self.basic_parser = parser
return parser
def check_steps(self, config):
for module_type in MODULE_TYPES:
if not config['steps'].get(f"{module_type}s", []):
if module_type == 'feeder' or module_type == 'formatter' and config['steps'].get(f"{module_type}"):
raise SetupError(f"It appears you have '{module_type}' set under 'steps' in your configuration file, but as of version 0.13.0 of Auto Archiver, you must use '{module_type}s'. Change this in your configuration file and try again. \
Here's how that would look: \n\nsteps:\n {module_type}s:\n - [your_{module_type}_name_here]\n {'extractors:...' if module_type == 'feeder' else '...'}\n")
if module_type == 'extractor' and config['steps'].get('archivers'):
raise SetupError(f"As of version 0.13.0 of Auto Archiver, the 'archivers' step name has been changed to 'extractors'. Change this in your configuration file and try again. \
Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_here]\n enrichers:...\n")
raise SetupError(f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)")
def setup_complete_parser(self, basic_config: dict, yaml_config: dict, unused_args: list[str]) -> None:
# modules parser to get the overridden 'steps' values
modules_parser = argparse.ArgumentParser(
add_help=False,
)
self.add_modules_args(modules_parser)
cli_modules, unused_args = modules_parser.parse_known_args(unused_args)
for module_type in MODULE_TYPES:
yaml_config['steps'][f"{module_type}s"] = getattr(cli_modules, f"{module_type}s", []) or yaml_config['steps'].get(f"{module_type}s", [])
parser = DefaultValidatingParser(
add_help=False,
)
self.add_additional_args(parser)
# merge command line module args (--feeders, --enrichers etc.) and add them to the config
# check what mode we're in
# if we have a config file, use that to decide which modules to load
# if simple, we'll load just the modules that has requires_setup = False
# if full, we'll load all modules
# TODO: BUG** - basic_config won't have steps in it, since these args aren't added to 'basic_parser'
# but should we add them? Or should we just add them to the 'complete' parser?
if yaml_config != EMPTY_CONFIG:
if is_valid_config(yaml_config):
self.check_steps(yaml_config)
# only load the modules enabled in config
# TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty?
enabled_modules = []
# first loads the modules from the config file, then from the command line
for config in [yaml_config['steps'], basic_config.__dict__]:
for module_type in BaseModule.MODULE_TYPES:
enabled_modules.extend(config.get(f"{module_type}s", []))
for module_type in MODULE_TYPES:
enabled_modules.extend(yaml_config['steps'].get(f"{module_type}s", []))
# clear out duplicates, but keep the order
enabled_modules = list(dict.fromkeys(enabled_modules))
avail_modules = available_modules(with_manifest=True, limit_to_modules=enabled_modules, suppress_warnings=True)
self.add_module_args(avail_modules, parser)
avail_modules = self.module_factory.available_modules(limit_to_modules=enabled_modules, suppress_warnings=True)
self.add_individual_module_args(avail_modules, parser)
elif basic_config.mode == 'simple':
simple_modules = [module for module in available_modules(with_manifest=True) if not module.requires_setup]
self.add_module_args(simple_modules, parser)
simple_modules = [module for module in self.module_factory.available_modules() if not module.requires_setup]
self.add_individual_module_args(simple_modules, parser)
# for simple mode, we use the cli_feeder and any modules that don't require setup
yaml_config['steps']['feeders'] = ['cli_feeder']
# add them to the config
for module in simple_modules:
for module_type in module.type:
yaml_config['steps'].setdefault(f"{module_type}s", []).append(module.name)
else:
# load all modules, they're not using the 'simple' mode
self.add_module_args(available_modules(with_manifest=True), parser)
all_modules = self.module_factory.available_modules()
# add all the modules to the steps
for module in all_modules:
for module_type in module.type:
yaml_config['steps'].setdefault(f"{module_type}s", []).append(module.name)
self.add_individual_module_args(all_modules, parser)
parser.set_defaults(**to_dot_notation(yaml_config))
# reload the parser with the new arguments, now that we have them
parsed, unknown = parser.parse_known_args(unused_args)
# merge the new config with the old one
self.config = merge_dicts(vars(parsed), yaml_config)
config = merge_dicts(vars(parsed), yaml_config)
# set up the authentication dict as needed
config = self.setup_authentication(config)
# clean out args from the base_parser that we don't want in the config
for key in vars(basic_config):
self.config.pop(key, None)
config.pop(key, None)
# setup the logging
self.setup_logging()
self.setup_logging(config)
if unknown:
logger.warning(f"Ignoring unknown/unused arguments: {unknown}\nPerhaps you don't have this module enabled?")
if (self.config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
if (config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
logger.info(f"Storing configuration file to {basic_config.config_file}")
store_yaml(self.config, basic_config.config_file)
return self.config
store_yaml(config, basic_config.config_file)
return config
def add_modules_args(self, parser: argparse.ArgumentParser = None):
if not parser:
parser = self.parser
# Module loading from the command line
for module_type in MODULE_TYPES:
parser.add_argument(f'--{module_type}s', dest=f'{module_type}s', nargs='+', help=f'the {module_type}s to use', default=[], action=UniqueAppendAction)
def add_additional_args(self, parser: argparse.ArgumentParser = None):
if not parser:
parser = self.parser
# allow passing URLs directly on the command line
parser.add_argument('urls', nargs='*', default=[], help='URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml')
parser.add_argument('--feeders', dest='steps.feeders', nargs='+', default=['cli_feeder'], help='the feeders to use', action=UniqueAppendAction)
parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction)
parser.add_argument('--extractors', dest='steps.extractors', nargs='+', help='the extractors to use', action=UniqueAppendAction)
parser.add_argument('--databases', dest='steps.databases', nargs='+', help='the databases to use', action=UniqueAppendAction)
parser.add_argument('--storages', dest='steps.storages', nargs='+', help='the storages to use', action=UniqueAppendAction)
parser.add_argument('--formatters', dest='steps.formatters', nargs='+', help='the formatter to use', action=UniqueAppendAction)
parser.add_argument('--authentication', dest='authentication', help='A dictionary of sites and their authentication methods \
(token, username etc.) that extractors can use to log into \
a website. If passing this on the command line, use a JSON string. \
You may also pass a path to a valid JSON/YAML file which will be parsed.',\
You may also pass a path to a valid JSON/YAML file which will be parsed.',
default={},
nargs="?",
action=AuthenticationJsonParseAction)
# logging arguments
parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO', type=str.upper)
parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None)
parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None)
def add_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None) -> None:
def add_individual_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None) -> None:
if not modules:
modules = available_modules(with_manifest=True)
module: LazyBaseModule
modules = self.module_factory.available_modules()
for module in modules:
if module.name == 'cli_feeder':
# special case. For the CLI feeder, allow passing URLs directly on the command line without setting --cli_feeder.urls=
parser.add_argument('urls', nargs='*', default=[], help='URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml')
continue
if not module.configs:
# this module has no configs, don't show anything in the help
# (TODO: do we want to show something about this module though, like a description?)
@@ -224,21 +228,35 @@ class ArchivingOrchestrator:
arg.should_store = should_store
def show_help(self, basic_config: dict):
# for the help message, we want to load *all* possible modules and show the help
# add configs as arg parser arguments
# for the help message, we want to load manifests from *all* possible modules and show their help/settings
# add configs as arg parser arguments
self.add_modules_args(self.basic_parser)
self.add_additional_args(self.basic_parser)
self.add_module_args(parser=self.basic_parser)
self.add_individual_module_args(parser=self.basic_parser)
self.basic_parser.print_help()
self.basic_parser.exit()
def setup_logging(self):
def setup_logging(self, config):
logging_config = config['logging']
if logging_config.get('enabled', True) is False:
# disabled logging settings, they're set on a higher level
logger.disable('auto_archiver')
return
# setup loguru logging
logger.remove(0) # remove the default logger
logging_config = self.config['logging']
logger.add(sys.stderr, level=logging_config['level'])
if log_file := logging_config['file']:
logger.add(log_file) if not logging_config['rotation'] else logger.add(log_file, rotation=logging_config['rotation'])
try:
logger.remove(0) # remove the default logger
except ValueError:
pass
# add other logging info
if self.logger_id is None: # note - need direct comparison to None since need to consider falsy value 0
self.logger_id = logger.add(sys.stderr, level=logging_config['level'])
if log_file := logging_config['file']:
logger.add(log_file) if not logging_config['rotation'] else logger.add(log_file, rotation=logging_config['rotation'])
def install_modules(self, modules_by_type):
"""
@@ -246,59 +264,38 @@ class ArchivingOrchestrator:
orchestrator's attributes (self.feeders, self.extractors etc.). If no modules of a certain type
are loaded, the program will exit with an error message.
"""
invalid_modules = []
for module_type in BaseModule.MODULE_TYPES:
for module_type in MODULE_TYPES:
step_items = []
modules_to_load = modules_by_type[f"{module_type}s"]
assert modules_to_load, f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)"
if not modules_to_load:
raise SetupError(f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)")
def check_steps_ok():
if not len(step_items):
logger.error(f"NO {module_type.upper()}S LOADED. Please check your configuration and try again.")
if len(modules_to_load):
logger.error(f"Tried to load the following modules, but none were available: {modules_to_load}")
exit()
logger.error(f"Unable to load any {module_type}s. Tried the following, but none were available: {modules_to_load}")
raise SetupError(f"NO {module_type.upper()}S LOADED. Please check your configuration and try again.")
if (module_type == 'feeder' or module_type == 'formatter') and len(step_items) > 1:
logger.error(f"Only one {module_type} is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}")
exit()
raise SetupError(f"Only one {module_type} is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}")
for module in modules_to_load:
if module == 'cli_feeder':
# pseudo module, don't load it
urls = self.config['urls']
if not urls:
logger.error("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.")
exit()
# cli_feeder is a pseudo module, it just takes the command line args
def feed(self) -> Generator[Metadata]:
for url in urls:
logger.debug(f"Processing URL: '{url}'")
yield Metadata().set_url(url)
pseudo_module = type('CLIFeeder', (Feeder,), {
'name': 'cli_feeder',
'display_name': 'CLI Feeder',
'__iter__': feed
})()
pseudo_module.__iter__ = feed
step_items.append(pseudo_module)
continue
if module in invalid_modules:
continue
loaded_module = None
try:
loaded_module: BaseModule = get_module(module, self.config)
loaded_module: BaseModule = self.module_factory.get_module(module, self.config)
except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of modules: {e}\n{traceback.format_exc()}")
if module_type == 'extractor' and loaded_module.name == module:
if loaded_module and module_type == 'extractor':
loaded_module.cleanup()
exit()
raise e
if not loaded_module:
invalid_modules.append(module)
@@ -308,48 +305,107 @@ class ArchivingOrchestrator:
check_steps_ok()
setattr(self, f"{module_type}s", step_items)
def load_config(self, config_file: str) -> dict:
if not os.path.exists(config_file) and config_file != DEFAULT_CONFIG_FILE:
logger.error(f"The configuration file {config_file} was not found. Make sure the file exists and try again, or run without the --config file to use the default settings.")
exit()
raise FileNotFoundError(f"Configuration file {config_file} not found")
return read_yaml(config_file)
def setup_config(self, args: list) -> dict:
"""
Sets up the configuration file, merging the default config with the user's config
This function should only ever be run once.
"""
def run(self, args: list) -> None:
self.setup_basic_parser()
# parse the known arguments for now (basically, we want the config file)
basic_config, unused_args = self.basic_parser.parse_known_args(args)
# setup any custom module paths, so they'll show in the help and for arg parsing
setup_paths(basic_config.module_paths)
self.module_factory.setup_paths(basic_config.module_paths)
# if help flag was called, then show the help
if basic_config.help:
self.show_help(basic_config)
# merge command line --feeder etc. args with what's in the yaml config
yaml_config = self.load_config(basic_config.config_file)
self.setup_complete_parser(basic_config, yaml_config, unused_args)
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
def check_for_updates(self):
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
latest_version = response['info']['version']
# check version compared to current version
if latest_version != __version__:
if os.environ.get('RUNNING_IN_DOCKER'):
update_cmd = "`docker pull bellingcat/auto-archiver:latest`"
else:
update_cmd = "`pip install --upgrade auto-archiver`"
logger.warning("")
logger.warning("********* IMPORTANT: UPDATE AVAILABLE ********")
logger.warning(f"A new version of auto-archiver is available (v{latest_version}, you have {__version__})")
logger.warning(f"Make sure to update to the latest version using: {update_cmd}")
logger.warning("")
def setup(self, args: list):
"""
Function to configure all setup of the orchestrator: setup configs and load modules.
This method should only ever be called once
"""
self.check_for_updates()
if self.setup_finished:
logger.warning("The `setup_config()` function should only ever be run once. \
If you need to re-run the setup, please re-instantiate a new instance of the orchestrator. \
For code implementatations, you should call .setup_config() once then you may call .feed() \
multiple times to archive multiple URLs.")
return
self.setup_basic_parser()
self.config = self.setup_config(args)
logger.info(f"======== Welcome to the AUTO ARCHIVER ({__version__}) ==========")
self.install_modules(self.config['steps'])
# log out the modules that were loaded
for module_type in BaseModule.MODULE_TYPES:
for module_type in MODULE_TYPES:
logger.info(f"{module_type.upper()}S: " + ", ".join(m.display_name for m in getattr(self, f"{module_type}s")))
self.setup_finished = True
for _ in self.feed():
pass
def _command_line_run(self, args: list) -> Generator[Metadata]:
"""
This is the main entry point for the orchestrator, when run from the command line.
def cleanup(self)->None:
:param args: list of arguments to pass to the orchestrator - these are the command line args
You should not call this method from code implementations.
This method sets up the configuration, loads the modules, and runs the feed.
If you wish to make code invocations yourself, you should use the 'setup' and 'feed' methods separately.
To test configurations, without loading any modules you can also first call 'setup_configs'
"""
try:
self.setup(args)
return self.feed()
except Exception as e:
logger.error(e)
exit(1)
def cleanup(self) -> None:
logger.info("Cleaning up")
for e in self.extractors:
e.cleanup()
def feed(self) -> Generator[Metadata]:
url_count = 0
for feeder in self.feeders:
for item in feeder:
@@ -393,7 +449,6 @@ class ArchivingOrchestrator:
m.tmp_dir = None
tmp_dir.cleanup()
def archive(self, result: Metadata) -> Union[Metadata, None]:
"""
Runs the archiving process for a single URL
@@ -407,8 +462,8 @@ class ArchivingOrchestrator:
original_url = result.get_url().strip()
try:
self.assert_valid_url(original_url)
except AssertionError as e:
check_url_or_raise(original_url)
except ValueError as e:
logger.error(f"Error archiving URL {original_url}: {e}")
raise e
@@ -440,13 +495,13 @@ class ArchivingOrchestrator:
try:
result.merge(a.download(result))
if result.is_success(): break
except Exception as e:
except Exception as e:
logger.error(f"ERROR archiver {a.name}: {e}: {traceback.format_exc()}")
# 4 - call enrichers to work with archived content
for e in self.enrichers:
try: e.enrich(result)
except Exception as exc:
except Exception as exc:
logger.error(f"ERROR enricher {e.name}: {exc}: {traceback.format_exc()}")
# 5 - store all downloaded/generated media
@@ -468,30 +523,30 @@ class ArchivingOrchestrator:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return result
def assert_valid_url(self, url: str) -> bool:
def setup_authentication(self, config: dict) -> dict:
"""
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
Setup authentication for all modules that require it
Split up strings into multiple sites if they are comma separated
"""
assert url.startswith("http://") or url.startswith("https://"), f"Invalid URL scheme"
authentication = config.get('authentication', {})
# extract out concatenated sites
for key, val in copy(authentication).items():
if "," in key:
for site in key.split(","):
site = site.strip()
authentication[site] = val
del authentication[key]
parsed = urlparse(url)
assert parsed.scheme in ["http", "https"], f"Invalid URL scheme"
assert parsed.hostname, f"Invalid URL hostname"
assert parsed.hostname != "localhost", f"Invalid URL"
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError: pass
else:
assert ip.is_global, f"Invalid IP used"
assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
config['authentication'] = authentication
return config
# Helper Properties
@property
def all_modules(self) -> List[Type[BaseModule]]:
return self.feeders + self.extractors + self.enrichers + self.databases + self.storages + self.formatters
return self.feeders + self.extractors + self.enrichers + self.databases + self.storages + self.formatters

View File

@@ -1,7 +1,29 @@
"""
Base module for Storage modules modular components that store media objects in various locations.
If you are looking to implement a new storage module, you should subclass the `Storage` class and
implement the `get_cdn_url` and `uploadf` methods.
Your module **must** also have two config variables 'path_generator' and 'filename_generator' which
determine how the key is generated for the media object. The 'path_generator' and 'filename_generator'
variables can be set to one of the following values:
- 'flat': A flat structure with no subfolders
- 'url': A structure based on the URL of the media object
- 'random': A random structure
The 'filename_generator' variable can be set to one of the following values:
- 'random': A random string
- 'static': A replicable strategy such as a hash
If you don't want to use this naming convention, you can override the `set_key` method in your subclass.
"""
from __future__ import annotations
from abc import abstractmethod
from typing import IO
import os
import platform
from loguru import logger
from slugify import slugify
@@ -10,56 +32,85 @@ from auto_archiver.utils.misc import random_str
from auto_archiver.core import Media, BaseModule, Metadata
from auto_archiver.modules.hash_enricher.hash_enricher import HashEnricher
from auto_archiver.core.module import get_module
class Storage(BaseModule):
"""
Base class for implementing storage modules in the media archiving framework.
Subclasses must implement the `get_cdn_url` and `uploadf` methods to define their behavior.
"""
def store(self, media: Media, url: str, metadata: Metadata=None) -> None:
if media.is_stored(in_storage=self):
logger.debug(f"{media.key} already stored, skipping")
return
self.set_key(media, url, metadata)
self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media))
@abstractmethod
def get_cdn_url(self, media: Media) -> str: pass
def get_cdn_url(self, media: Media) -> str:
"""
Returns the URL of the media object stored in the CDN.
"""
pass
@abstractmethod
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool:
"""
Uploads (or saves) a file to the storage service/location.
This method should not be called directly, but instead through the 'store' method,
which sets up the media for storage.
"""
pass
def upload(self, media: Media, **kwargs) -> bool:
"""
Uploads a media object to the storage service.
This method should not be called directly, but instead be called through the 'store' method,
which sets up the media for storage.
"""
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key}')
with open(media.filename, 'rb') as f:
return self.uploadf(f, media, **kwargs)
def set_key(self, media: Media, url, metadata: Metadata) -> None:
def set_key(self, media: Media, url: str, metadata: Metadata) -> None:
"""takes the media and optionally item info and generates a key"""
if media.key is not None and len(media.key) > 0: return
if media.key is not None and len(media.key) > 0:
# media key is already set
return
folder = metadata.get_context('folder', '')
filename, ext = os.path.splitext(media.filename)
# Handle path_generator logic
path_generator = self.config.get("path_generator", "url")
path_generator = self.path_generator
if path_generator == "flat":
path = ""
filename = slugify(filename) # Ensure filename is slugified
elif path_generator == "url":
path = slugify(url)
path = slugify(url)[:70]
elif path_generator == "random":
path = self.config.get("random_path", random_str(24), True)
path = random_str(24)
else:
raise ValueError(f"Invalid path_generator: {path_generator}")
# Handle filename_generator logic
filename_generator = self.config.get("filename_generator", "random")
filename_generator = self.filename_generator
if filename_generator == "random":
filename = random_str(24)
elif filename_generator == "static":
# load the hash_enricher module
he = get_module(HashEnricher, self.config)
he = self.module_factory.get_module("hash_enricher", self.config)
hd = he.calculate_hash(media.filename)
filename = hd[:24]
else:
raise ValueError(f"Invalid filename_generator: {filename_generator}")
key = os.path.join(folder, path, f"{filename}{ext}")
media.key = os.path.join(folder, path, f"{filename}{ext}")
media._key = key

View File

@@ -1,6 +1,7 @@
# used as validators for config values. Should raise an exception if the value is invalid.
from pathlib import Path
import argparse
import json
def example_validator(value):
if "example" not in value:
@@ -16,4 +17,7 @@ def positive_number(value):
def valid_file(value):
if not Path(value).is_file():
raise argparse.ArgumentTypeError(f"File '{value}' does not exist.")
return value
return value
def json_loader(cli_val):
return json.loads(cli_val)

View File

@@ -1,5 +1,5 @@
{
"name": "Auto-Archiver API Database",
"name": "Auto Archiver API Database",
"type": ["database"],
"entry_point": "api_db::AAApiDb",
"requires_setup": True,
@@ -24,9 +24,9 @@
"help": "which group of users have access to the archive in case public=false as author",
},
"use_api_cache": {
"default": True,
"default": False,
"type": "bool",
"help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
"help": "if True then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
},
"store_results": {
"default": True,
@@ -39,7 +39,7 @@
},
},
"description": """
Provides integration with the Auto-Archiver API for querying and storing archival data.
Provides integration with the Auto Archiver API for querying and storing archival data.
### Features
- **API Integration**: Supports querying for existing archives and submitting results.
@@ -49,6 +49,6 @@
- **Optional Storage**: Archives results conditionally based on configuration.
### Setup
Requires access to an Auto-Archiver API instance and a valid API token.
Requires access to an Auto Archiver API instance and a valid API token.
""",
}

View File

@@ -1 +0,0 @@
from atlos_db import AtlosDb

View File

@@ -1,36 +0,0 @@
{
"name": "Atlos Database",
"type": ["database"],
"entry_point": "atlos_db::AtlosDb",
"requires_setup": True,
"dependencies":
{"python": ["loguru",
""],
"bin": [""]},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
Handles integration with the Atlos platform for managing archival results.
### Features
- Outputs archival results to the Atlos API for storage and tracking.
- Updates failure status with error details when archiving fails.
- Processes and formats metadata, including ISO formatting for datetime fields.
- Skips processing for items without an Atlos ID.
### Setup
Required configs:
- atlos_url: Base URL for the Atlos API.
- api_token: Authentication token for API access.
"""
,
}

View File

@@ -1,66 +0,0 @@
from typing import Union
import requests
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -1,13 +0,0 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": str
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": str
},
}

View File

@@ -1 +0,0 @@
from .atlos_feeder import AtlosFeeder

View File

@@ -1,34 +0,0 @@
{
"name": "Atlos Feeder",
"type": ["feeder"],
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
AtlosFeeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Filters source materials based on visibility, processing status, and metadata.
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
- Iterates through paginated results using a cursor for efficient API interaction.
### Notes
- Requires an Atlos API endpoint and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Handles pagination transparently when retrieving data from the Atlos API.
"""
}

View File

@@ -1,42 +0,0 @@
import requests
from loguru import logger
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata
class AtlosFeeder(Feeder):
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break

View File

@@ -0,0 +1 @@
from .atlos_feeder_db_storage import AtlosFeederDbStorage

View File

@@ -0,0 +1,46 @@
{
"name": "Atlos Feeder Database Storage",
"type": ["feeder", "database", "storage"],
"entry_point": "atlos_feeder_db_storage::AtlosFeederDbStorage",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
A module that integrates with the Atlos API to fetch source material URLs for archival, uplaod extracted media,
[Atlos](https://www.atlos.org/) is a visual investigation and archiving platform designed for investigative research, journalism, and open-source intelligence (OSINT).
It helps users organize, analyze, and store media from various sources, making it easier to track and investigate digital evidence.
To get started create a new project and obtain an API token from the settings page. You can group event's into Atlos's 'incidents'.
Here you can add 'source material' by URLn and the Atlos feeder will fetch these URLs for archival.
You can use Atlos only as a 'feeder', however you can also implement the 'database' and 'storage' features to store the media files in Atlos which is recommended.
The Auto Archiver will retain the Atlos ID for each item, ensuring that the media and database outputs are uplaoded back into the relevant media item.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Iterates through the URLs from all source material items which are unprocessed, visible, and ready to archive.
- If the storage option is selected, it will store the media files alongside the original source material item in Atlos.
- Is the database option is selected it will output the results to the media item, as well as updating failure status with error details when archiving fails.
- Skips Storege/ database upload for items without an Atlos ID - restricting that you must use the Atlos feeder so that it has the Atlos ID to store the results with.
### Notes
- Requires an Atlos account with a project and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Feches any media items within an Atlos project, regardless of separation into incidents.
"""
}

View File

@@ -0,0 +1,153 @@
import hashlib
import os
from typing import IO, Iterator, Optional, Union
import requests
from loguru import logger
from auto_archiver.core import Database, Feeder, Media, Metadata, Storage
from auto_archiver.utils import calculate_file_hash
class AtlosFeederDbStorage(Feeder, Database, Storage):
def setup(self) -> requests.Session:
"""create and return a persistent session."""
self.session = requests.Session()
def _get(self, endpoint: str, params: Optional[dict] = None) -> dict:
"""Wrapper for GET requests to the Atlos API."""
url = f"{self.atlos_url}{endpoint}"
response = self.session.get(
url, headers={"Authorization": f"Bearer {self.api_token}"}, params=params
)
response.raise_for_status()
return response.json()
def _post(
self,
endpoint: str,
json: Optional[dict] = None,
params: Optional[dict] = None,
files: Optional[dict] = None,
) -> dict:
"""Wrapper for POST requests to the Atlos API."""
url = f"{self.atlos_url}{endpoint}"
response = self.session.post(
url,
headers={"Authorization": f"Bearer {self.api_token}"},
json=json,
params=params,
files=files,
)
response.raise_for_status()
return response.json()
# ! Atlos Module - Feeder Methods
def __iter__(self) -> Iterator[Metadata]:
"""Iterate over unprocessed, visible source materials from Atlos."""
cursor = None
while True:
data = self._get("/api/v2/source_material", params={"cursor": cursor})
cursor = data.get("next")
results = data.get("results", [])
for item in results:
if (
item.get("source_url") not in [None, ""]
and not item.get("metadata", {}).get("auto_archiver", {}).get("processed", False)
and item.get("visibility") == "visible"
and item.get("status") not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set("atlos_id", item["id"])
if not results or cursor is None:
break
# ! Atlos Module - Database Methods
def failed(self, item: Metadata, reason: str) -> None:
"""Mark an item as failed in Atlos, if the ID exists."""
atlos_id = item.metadata.get("atlos_id")
if not atlos_id:
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
self._post(
f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver",
json={"metadata": {"processed": True, "status": "error", "error": reason}},
)
logger.info(f"Stored failure for {item.get_url()} (ID {atlos_id}) on Atlos: {reason}")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""Mark an item as successfully archived in Atlos."""
atlos_id = item.metadata.get("atlos_id")
if not atlos_id:
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
self._post(
f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver",
json={
"metadata": {
"processed": True,
"status": "success",
"results": self._process_metadata(item),
}
},
)
logger.info(f"Stored success for {item.get_url()} (ID {atlos_id}) on Atlos")
# ! Atlos Module - Storage Methods
def get_cdn_url(self, _media: Media) -> str:
"""Return the base Atlos URL as the CDN URL."""
return self.atlos_url
def upload(self, media: Media, metadata: Optional[Metadata] = None, **_kwargs) -> bool:
"""Upload a media file to Atlos if it has not been uploaded already."""
if metadata is None:
logger.error(f"No metadata provided for {media.filename}")
return False
atlos_id = metadata.get("atlos_id")
if not atlos_id:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} in Atlos.")
return False
media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096)
# Check whether the media has already been uploaded
source_material = self._get(f"/api/v2/source_material/{atlos_id}")["result"]
existing_media = [
artifact.get("file_hash_sha256")
for artifact in source_material.get("artifacts", [])
]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
with open(media.filename, "rb") as file_obj:
self._post(
f"/api/v2/source_material/upload/{atlos_id}",
params={"title": media.properties},
files={"file": (os.path.basename(media.filename), file_obj)},
)
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool:
"""Upload a file-like object; not implemented."""
pass

View File

@@ -1,66 +0,0 @@
import hashlib
import os
from typing import IO, Optional
import requests
from loguru import logger
from auto_archiver.core import Media, Metadata
from auto_archiver.core import Storage
class AtlosStorage(Storage):
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -0,0 +1,23 @@
{
'name': 'Command Line Feeder',
'type': ['feeder'],
'entry_point': 'cli_feeder::CLIFeeder',
'requires_setup': False,
'description': 'Feeds URLs to orchestrator from the command line',
'configs': {
'urls': {
'default': None,
'help': 'URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml',
},
},
'description': """
The Command Line Feeder is the default enabled feeder for the Auto Archiver. It allows you to pass URLs directly to the orchestrator from the command line
without the need to specify any additional configuration or command line arguments:
`auto-archiver --feeder cli_feeder -- "https://example.com/1/,https://example.com/2/"`
You can pass multiple URLs by separating them with a space. The URLs will be processed in the order they are provided.
`auto-archiver --feeder cli_feeder -- https://example.com/1/ https://example.com/2/`
""",
}

View File

@@ -0,0 +1,20 @@
from loguru import logger
from auto_archiver.core.feeder import Feeder
from auto_archiver.core.metadata import Metadata
class CLIFeeder(Feeder):
def setup(self) -> None:
self.urls = self.config['urls']
if not self.urls:
raise ValueError("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.")
def __iter__(self) -> Metadata:
urls = self.config['urls']
for url in urls:
logger.debug(f"Processing {url}")
m = Metadata().set_url(url)
yield m
logger.success(f"Processed {len(urls)} URL(s)")

View File

@@ -10,7 +10,7 @@ class ConsoleDb(Database):
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
logger.info(f"STARTED {item}")
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}")

View File

@@ -6,7 +6,7 @@
},
'entry_point': 'csv_db::CSVDb',
"configs": {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
"csv_file": {"default": "db.csv", "help": "CSV file name to save metadata to"},
},
"description": """
Handles exporting archival results to a CSV file.

View File

@@ -32,7 +32,6 @@
GDriveStorage: A storage module for saving archived content to Google Drive.
Author: Dave Mateer, (And maintained by: )
Source Documentation: https://davemateer.com/2022/04/28/google-drive-with-python
### Features

View File

@@ -28,6 +28,13 @@ the broader archiving framework.
metadata objects. Some dropins are included in this generic_archiver by default, but
custom dropins can be created to handle additional websites and passed to the archiver
via the command line using the `--dropins` option (TODO!).
### Auto-Updates
The Generic Extractor will also automatically check for updates to `yt-dlp` (every 5 days by default).
This can be configured using the `ytdlp_update_interval` setting (or disabled by setting it to -1).
If you are having issues with the extractor, you can review the version of `yt-dlp` being used with `yt-dlp --version`.
""",
"configs": {
"subtitles": {"default": True, "help": "download subtitles if available", "type": "bool"},
@@ -64,5 +71,10 @@ via the command line using the `--dropins` option (TODO!).
"default": "inf",
"help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit.",
},
"ytdlp_update_interval": {
"default": 5,
"help": "How often to check for yt-dlp updates (days). If positive, will check and update yt-dlp every [num] days. Set it to -1 to disable, or 0 to always update on every run.",
"type": "int",
},
},
}

View File

@@ -39,11 +39,11 @@ class Bluesky(GenericDropin):
for image_media in image_medias:
url = media_url.format(image_media['image']['ref']['$link'], post['author']['did'])
image_media = archiver.download_from_url(url)
media.append(image_media)
media.append(Media(image_media))
for video_media in video_medias:
url = media_url.format(video_media['ref']['$link'], post['author']['did'])
video_media = archiver.download_from_url(url)
media.append(video_media)
media.append(Media(video_media))
return media

View File

@@ -8,7 +8,8 @@ class Facebook(GenericDropin):
url.replace('://m.facebook.com/', '://www.facebook.com/'), video_id)
webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group('id'))
post_data = ie_instance._extract_from_url.extract_metadata(webpage)
# TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
post_data = ie_instance._extract_metadata(webpage)
return post_data
def create_metadata(self, post: dict, ie_instance, archiver, url):

View File

@@ -1,7 +1,11 @@
import datetime, os, yt_dlp, pysubs2
import datetime, os
import importlib
from typing import Type
import subprocess
from typing import Generator, Type
import yt_dlp
from yt_dlp.extractor.common import InfoExtractor
import pysubs2
from loguru import logger
@@ -11,7 +15,45 @@ from auto_archiver.core import Metadata, Media
class GenericExtractor(Extractor):
_dropins = {}
def suitable_extractors(self, url: str) -> list[str]:
def setup(self):
# check for file .ytdlp-update in the secrets folder
if self.ytdlp_update_interval < 0:
return
use_secrets = os.path.exists('secrets')
path = os.path.join('secrets' if use_secrets else '', '.ytdlp-update')
next_update_check = None
if os.path.exists(path):
with open(path, "r") as f:
next_update_check = datetime.datetime.fromisoformat(f.read())
if not next_update_check or next_update_check < datetime.datetime.now():
self.update_ytdlp()
next_update_check = datetime.datetime.now() + datetime.timedelta(days=self.ytdlp_update_interval)
with open(path, "w") as f:
f.write(next_update_check.isoformat())
def update_ytdlp(self):
logger.info("Checking and updating yt-dlp...")
logger.info(f"Tip: change the 'ytdlp_update_interval' setting to control how often yt-dlp is updated. Set to -1 to disable or 0 to enable on every run. Current setting: {self.ytdlp_update_interval}")
from importlib.metadata import version as get_version
old_version = get_version("yt-dlp")
try:
# try and update with pip (this works inside poetry environment and in a normal virtualenv)
result = subprocess.run(["pip", "install", "--upgrade", "yt-dlp"], check=True, capture_output=True)
if "Successfully installed yt-dlp" in result.stdout.decode():
new_version = importlib.metadata.version("yt-dlp")
logger.info(f"yt-dlp successfully (from {old_version} to {new_version})")
importlib.reload(yt_dlp)
else:
logger.info("yt-dlp already up to date")
except Exception as e:
logger.error(f"Error updating yt-dlp: {e}")
def suitable_extractors(self, url: str) -> Generator[str, None, None]:
"""
Returns a list of valid extractors for the given URL"""
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
@@ -86,7 +128,7 @@ class GenericExtractor(Extractor):
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop('title', video_data.pop('fulltitle', "")))
result.set_url(url)
if "description" in video_data: result.set_content(video_data["description"])
# extract comments if enabled
if self.comments:
result.set("comments", [{
@@ -116,7 +158,7 @@ class GenericExtractor(Extractor):
def get_metadata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
"""
Calls into the ytdlp InfoExtract subclass to use the prive _extract_post method to get the post metadata.
Calls into the ytdlp InfoExtract subclass to use the private _extract_post method to get the post metadata.
"""
ie_instance = info_extractor(downloader=ydl)
@@ -266,6 +308,11 @@ class GenericExtractor(Extractor):
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
#TODO: this is a temporary hack until this issue is closed: https://github.com/yt-dlp/yt-dlp/issues/11025
if url.startswith("https://ya.ru"):
url = url.replace("https://ya.ru", "https://yandex.ru")
item.set("replaced_url", url)
ydl_options = {'outtmpl': os.path.join(self.tmp_dir, f'%(id)s.%(ext)s'),
'quiet': False, 'noplaylist': not self.allow_playlist ,
@@ -275,7 +322,8 @@ class GenericExtractor(Extractor):
# set up auth
auth = self.auth_for_site(url, extract_cookies=False)
# order of importance: username/pasword -> api_key -> cookie -> cookie_from_browser -> cookies_file
# order of importance: username/pasword -> api_key -> cookie -> cookies_from_browser -> cookies_file
if auth:
if 'username' in auth and 'password' in auth:
logger.debug(f'Using provided auth username and password for {url}')
@@ -284,12 +332,12 @@ class GenericExtractor(Extractor):
elif 'cookie' in auth:
logger.debug(f'Using provided auth cookie for {url}')
yt_dlp.utils.std_headers['cookie'] = auth['cookie']
elif 'cookie_from_browser' in auth:
logger.debug(f'Using extracted cookies from browser {self.cookies_from_browser} for {url}')
elif 'cookies_from_browser' in auth:
logger.debug(f'Using extracted cookies from browser {auth["cookies_from_browser"]} for {url}')
ydl_options['cookiesfrombrowser'] = auth['cookies_from_browser']
elif 'cookies_file' in auth:
logger.debug(f'Using cookies from file {self.cookie_file} for {url}')
ydl_options['cookiesfile'] = auth['cookies_file']
logger.debug(f'Using cookies from file {auth["cookies_file"]} for {url}')
ydl_options['cookiefile'] = auth['cookies_file']
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"

View File

@@ -1 +0,0 @@
from .gsheet_db import GsheetsDb

View File

@@ -1,38 +0,0 @@
{
"name": "Google Sheets Database",
"type": ["database"],
"entry_point": "gsheet_db::GsheetsDb",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsDatabase:
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
"""
}

View File

@@ -1,114 +0,0 @@
from typing import Union, Tuple
from urllib.parse import quote
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_feeder import GWorksheet
from auto_archiver.utils.misc import get_current_timestamp
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -1,2 +0,0 @@
from .gworksheet import GWorksheet
from .gsheet_feeder import GsheetsFeeder

View File

@@ -1,71 +0,0 @@
{
"name": "Google Sheets Feeder",
"type": ["feeder"],
"entry_point": "gsheet_feeder::GsheetsFeeder",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {
"default": None,
"help": "(alternative to sheet name) the id of the sheet to archive",
},
"header": {"default": 1, "help": "index of the header row (starts at 1)", "type": "int"},
"service_account": {
"default": "secrets/service_account.json",
"help": "service account JSON file path",
},
"columns": {
"default": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"help": "names of columns in the google sheet (stringified JSON object)",
"type": "auto_archiver.utils.json_loader",
},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
"type": "bool",
},
},
"description": """
GsheetsFeeder
A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Features
- Validates the sheet structure and filters rows based on input configurations.
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
### Notes
- Requires a Google Service Account JSON file for authentication. Suggested location is `secrets/gsheets_service_account.json`.
- Create the sheet using the template provided in the docs.
""",
}

View File

@@ -1,96 +0,0 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import os
import gspread
from loguru import logger
from slugify import slugify
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata
from . import GWorksheet
class GsheetsFeeder(Feeder):
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
assert self.sheet or self.sheet_id, (
"You need to define either a 'sheet' name or a 'sheet_id' in your manifest."
)
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing

View File

@@ -0,0 +1,2 @@
from .gworksheet import GWorksheet
from .gsheet_feeder_db import GsheetsFeederDB

View File

@@ -0,0 +1,94 @@
{
"name": "Google Sheets Feeder Database",
"type": ["feeder", "database"],
"entry_point": "gsheet_feeder_db::GsheetsFeederDB",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {
"default": None,
"help": "the id of the sheet to archive (alternative to 'sheet' config)",
},
"header": {"default": 1,
"type": "int",
"help": "index of the header row (starts at 1)", "type": "int"},
"service_account": {
"default": "secrets/service_account.json",
"help": "service account JSON file path. Learn how to create one: https://gspread.readthedocs.io/en/latest/oauth2.html",
"required": True,
},
"columns": {
"default": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"help": "Custom names for the columns in your Google sheet. If you don't want to use the default column names, change them with this setting",
"type": "json_loader",
},
"allow_worksheets": {
"default": set(),
"help": "A list of worksheet names that should be processed (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "A list of worksheet names for worksheets that should be explicitly blocked from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
"type": "bool",
},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsFeederDatabase
A Google Sheets-based feeder and optional database for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Features
- Validates the sheet structure and filters rows based on input configurations.
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
- If the database is enabled, this updates the Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Setup
- Requires a Google Service Account JSON file for authentication, which should be stored in `secrets/gsheets_service_account.json`.
To set up a service account, follow the instructions [here](https://gspread.readthedocs.io/en/latest/oauth2.html).
- Define the `sheet` or `sheet_id` configuration to specify the sheet to archive.
- Customize the column names in your Google sheet using the `columns` configuration.
- The Google Sheet can be used soley as a feeder or as a feeder and database, but note you can't currently feed into the database from an alternate feeder.
""",
}

View File

@@ -0,0 +1,196 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import os
from typing import Tuple, Union
from urllib.parse import quote
import gspread
from loguru import logger
from slugify import slugify
from auto_archiver.core import Feeder, Database, Media
from auto_archiver.core import Metadata
from auto_archiver.modules.gsheet_feeder_db import GWorksheet
from auto_archiver.utils.misc import calculate_file_hash, get_current_timestamp
class GsheetsFeederDB(Feeder, Database):
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
if not self.sheet and not self.sheet_id:
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -17,6 +17,7 @@ class GWorksheet:
'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot',
'hash': 'hash',
'pdq_hash': 'perceptual hashes',

View File

@@ -7,7 +7,9 @@
"bin": [""]
},
"configs": {
"detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
"detect_thumbnails": {"default": True,
"help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'",
"type": "bool"},
},
"description": """ """,
}

View File

@@ -9,9 +9,7 @@ import base64
from auto_archiver.version import __version__
from auto_archiver.core import Metadata, Media
from auto_archiver.core import Formatter
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str
from auto_archiver.core.module import get_module
class HtmlFormatter(Formatter):
environment: Environment = None
@@ -51,7 +49,7 @@ class HtmlFormatter(Formatter):
final_media = Media(filename=html_path, _mimetype="text/html")
# get the already instantiated hash_enricher module
he = get_module('hash_enricher', self.config)
he = self.module_factory.get_module('hash_enricher', self.config)
if len(hd := he.calculate_hash(final_media.filename)):
final_media.set("hash", f"{he.algorithm}:{hd}")

View File

@@ -200,7 +200,7 @@
el.innerHTML = decodeCertificate(certificate);
let cyberChefUrl =
`https://gchq.github.io/CyberChef/#recipe=Parse_X.509_certificate('PEM')&input=${btoa(certificate)}`;
`https://gchq.github.io/CyberChef/#recipe=Parse_X.509_certificate('PEM')&input=${btoa(certificate).replace(/=+$/, '')}`;
// create a new anchor with this url and append after the code
let a = document.createElement("a");
a.href = cyberChefUrl;

View File

@@ -10,25 +10,30 @@
"requires_setup": True,
"configs": {
"username": {"required": True,
"help": "a valid Instagram username"},
"help": "A valid Instagram username."},
"password": {
"required": True,
"help": "the corresponding Instagram account password",
"help": "The corresponding Instagram account password.",
},
"download_folder": {
"default": "instaloader",
"help": "name of a folder to temporarily download content to",
"help": "Name of a folder to temporarily download content to.",
},
"session_file": {
"default": "secrets/instaloader.session",
"help": "path to the instagram session which saves session credentials",
"help": "Path to the instagram session file which saves session credentials. If one doesn't exist this gives the path to store a new one.",
},
# TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
},
"description": """
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram.
> ⚠️ **Warning**
> This module is not actively maintained due to known issues with blocking.
> Prioritise usage of the [Instagram Tbot Extractor](./instagram_tbot_extractor.md) and [Instagram API Extractor](./instagram_api_extractor.md)
This class handles both individual posts and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts.
Authentication is required via username/password or a session file.

View File

@@ -3,7 +3,7 @@
highlights, and tagged posts. Authentication is required via username/password or a session file.
"""
import re, os, shutil, traceback
import re, os, shutil
import instaloader
from loguru import logger
@@ -15,10 +15,9 @@ class InstagramExtractor(Extractor):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
# NB: post regex should be tested before profile
valid_url = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/")
# https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"{valid_url}(?:p|reel)\/(\w+)".format(valid_url=valid_url))
# https://regex101.com/r/6Wbsxa/1
@@ -28,19 +27,22 @@ class InstagramExtractor(Extractor):
def setup(self) -> None:
self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"
download_geotags=True,
download_comments=True,
compress_json=False,
dirname_pattern=self.download_folder,
filename_pattern="{date_utc}_UTC_{target}__{typename}"
)
try:
self.insta.load_session_from_file(self.username, self.session_file)
except Exception as e:
logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
try:
self.insta.login(self.username, config.instagram_self.password)
# TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758
logger.debug(f"Session file failed", exc_info=True)
logger.info("No valid session file found - Attempting login with use and password.")
self.insta.login(self.username, self.password)
self.insta.save_session_to_file(self.session_file)
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
except Exception as e:
logger.error(f"Failed to setup Instagram Extractor with Instagrapi. {e}")
def download(self, item: Metadata) -> Metadata:

View File

@@ -77,13 +77,14 @@ class InstagramTbotExtractor(Extractor):
chat, since_id = self._send_url_to_bot(url)
message = self._process_messages(chat, since_id, tmp_dir, result)
# This may be outdated and replaced by the below message, but keeping until confirmed
if "You must enter a URL to a post" in message:
logger.debug(f"invalid link {url=} for {self.name}: {message}")
return False
# # TODO: It currently returns this as a success - is that intentional?
# if "Media not found or unavailable" in message:
# logger.debug(f"invalid link {url=} for {self.name}: {message}")
# return False
if "Media not found or unavailable" in message:
logger.debug(f"No media found for link {url=} for {self.name}: {message}")
return False
if message:
result.set_content(message).set_title(message[:128])
@@ -103,7 +104,7 @@ class InstagramTbotExtractor(Extractor):
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
while attempts < max(self.timeout - 3, 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):

View File

@@ -17,7 +17,9 @@
"choices": ["random", "static"],
},
"save_to": {"default": "./local_archive", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
"save_absolute": {"default": False,
"type": "bool",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
},
"description": """
LocalStorage: A storage module for saving archived content locally on the filesystem.

View File

@@ -6,25 +6,42 @@ from loguru import logger
from auto_archiver.core import Media
from auto_archiver.core import Storage
from auto_archiver.core.consts import SetupError
class LocalStorage(Storage):
def setup(self) -> None:
if len(self.save_to) > 200:
raise SetupError(f"Your save_to path is too long, this will cause issues saving files on your computer. Please use a shorter path.")
def get_cdn_url(self, media: Media) -> str:
# TODO: is this viable with Storage.configs on path/filename?
dest = os.path.join(self.save_to, media.key)
dest = media.key
if self.save_absolute:
dest = os.path.abspath(dest)
return dest
def set_key(self, media, url, metadata):
# clarify we want to save the file to the save_to folder
old_folder = metadata.get('folder', '')
metadata.set_context('folder', os.path.join(self.save_to, metadata.get('folder', '')))
super().set_key(media, url, metadata)
# don't impact other storages that might want a different 'folder' set
metadata.set_context('folder', old_folder)
def upload(self, media: Media, **kwargs) -> bool:
# override parent so that we can use shutil.copy2 and keep metadata
dest = os.path.join(self.save_to, media.key)
dest = media.key
os.makedirs(os.path.dirname(dest), exist_ok=True)
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key} to {dest}')
res = shutil.copy2(media.filename, dest)
logger.info(res)
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool:
pass

View File

@@ -42,7 +42,7 @@ class S3Storage(Storage):
logger.warning(f"Unable to get mimetype for {media.key=}, error: {e}")
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=media.key, ExtraArgs=extra_args)
return True
def is_upload_needed(self, media: Media) -> bool:
if self.random_no_duplicate:
# checks if a folder with the hash already exists, if so it skips the upload
@@ -50,13 +50,13 @@ class S3Storage(Storage):
path = os.path.join(NO_DUPLICATES_FOLDER, hd[:24])
if existing_key:=self.file_in_folder(path):
media.key = existing_key
media._key = existing_key
media.set("previously archived", True)
logger.debug(f"skipping upload of {media.filename} because it already exists in {media.key}")
return False
_, ext = os.path.splitext(media.key)
media.key = os.path.join(path, f"{random_str(24)}{ext}")
media._key = os.path.join(path, f"{random_str(24)}{ext}")
return True
def file_in_folder(self, path:str) -> str:
@@ -66,5 +66,4 @@ class S3Storage(Storage):
resp = self.s3.list_objects(Bucket=self.bucket, Prefix=path, Delimiter='/', MaxKeys=1)
if 'Contents' in resp:
return resp['Contents'][0]['Key']
return False
return False

View File

@@ -4,16 +4,27 @@
"requires_setup": True,
"dependencies": {
"python": ["loguru", "selenium"],
"bin": ["chromedriver"]
},
"configs": {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"width": {"default": 1280,
"type": "int",
"help": "width of the screenshots"},
"height": {"default": 1024,
"type": "int",
"help": "height of the screenshots"},
"timeout": {"default": 60,
"type": "int",
"help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4,
"type": "int",
"help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
"save_to_pdf": {"default": False,
"type": "bool",
"help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {},
"help": "options to pass to the pdf printer, in JSON format. See https://www.selenium.dev/documentation/webdriver/interactions/print_page/ for more information",
"type": "json_loader"},
},
"description": """
Captures screenshots and optionally saves web pages as PDFs using a WebDriver.

View File

@@ -11,6 +11,10 @@ from auto_archiver.core import Media, Metadata
class ScreenshotEnricher(Enricher):
def __init__(self, webdriver_factory=None):
super().__init__()
self.webdriver_factory = webdriver_factory or Webdriver
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
@@ -20,7 +24,8 @@ class ScreenshotEnricher(Enricher):
logger.debug(f"Enriching screenshot for {url=}")
auth = self.auth_for_site(url)
with Webdriver(self.width, self.height, self.timeout, facebook_accept_cookies='facebook.com' in url,
with self.webdriver_factory(
self.width, self.height, self.timeout, facebook_accept_cookies='facebook.com' in url,
http_proxy=self.http_proxy, print_options=self.print_options, auth=auth) as driver:
try:
driver.get(url)
@@ -38,3 +43,4 @@ class ScreenshotEnricher(Enricher):
logger.info("TimeoutException loading page for screenshot")
except Exception as e:
logger.error(f"Got error while loading webdriver for screenshot enricher: {e}")

View File

@@ -7,7 +7,9 @@
},
'entry_point': 'ssl_enricher::SSLEnricher',
"configs": {
"skip_when_nothing_archived": {"default": True, "help": "if true, will skip enriching when no media is archived"},
"skip_when_nothing_archived": {"default": True,
"type": 'bool',
"help": "if true, will skip enriching when no media is archived"},
},
"description": """
Retrieves SSL certificate information for a domain and stores it as a file.

View File

@@ -20,5 +20,6 @@
- Processes HTML content of messages to retrieve embedded media.
- Sets structured metadata, including timestamps, content, and media details.
- Does not require user authentication for Telegram.
""",
}

View File

@@ -1,5 +1,5 @@
{
"name": "telethon_extractor",
"name": "Telethon Extractor",
"type": ["extractor"],
"requires_setup": True,
"dependencies": {
@@ -14,11 +14,13 @@
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"join_channels": {"default": True, "help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"join_channels": {"default": True,
"type": "bool",
"help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"channel_invites": {
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
"type": "auto_archiver.utils.json_loader",
"type": "json_loader",
}
},
"description": """
@@ -40,5 +42,9 @@ To use the `TelethonExtractor`, you must configure the following:
- **Bot Token**: Optional, allows access to additional content (e.g., large videos) but limits private channel archiving.
- **Channel Invites**: Optional, specify a JSON string of invite links to join channels during setup.
### First Time Login
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.
"""
}

View File

@@ -7,8 +7,12 @@
"bin": ["ffmpeg"]
},
"configs": {
"thumbnails_per_minute": {"default": 60, "help": "how many thumbnails to generate per minute of video, can be limited by max_thumbnails"},
"max_thumbnails": {"default": 16, "help": "limit the number of thumbnails to generate per video, 0 means no limit"},
"thumbnails_per_minute": {"default": 60,
"type": "int",
"help": "how many thumbnails to generate per minute of video, can be limited by max_thumbnails"},
"max_thumbnails": {"default": 16,
"type": "int",
"help": "limit the number of thumbnails to generate per video, 0 means no limit"},
},
"description": """
Generates thumbnails for video files to provide visual previews.

View File

@@ -42,7 +42,7 @@ class ThumbnailEnricher(Enricher):
logger.error(f"error getting duration of video {m.filename}: {e}")
return
num_thumbs = int(min(max(1, duration * self.thumbnails_per_minute), self.max_thumbnails))
num_thumbs = int(min(max(1, (duration / 60) * self.thumbnails_per_minute), self.max_thumbnails))
timestamps = [duration / (num_thumbs + 1) * i for i in range(1, num_thumbs + 1)]
thumbnails_media = []

View File

@@ -0,0 +1 @@
from .tiktok_tikwm_extractor import TiktokTikwmExtractor

View File

@@ -0,0 +1,23 @@
{
"name": "Tiktok Tikwm Extractor",
"type": ["extractor"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "requests"],
"bin": []
},
"description": """
Uses an unofficial TikTok video download platform's API to download videos: https://tikwm.com/
This extractor complements the generic_extractor which can already get TikTok videos, but this one can extract special videos like those marked as sensitive.
### Features
- Downloads the video and, if possible, also the video cover.
- Stores extra metadata about the post like author information, and more as returned by tikwm.com.
### Notes
- If tikwm.com is down, this extractor will not work.
- If tikwm.com changes their API, this extractor may break.
- If no video is found, this extractor will consider the extraction failed.
"""
}

View File

@@ -0,0 +1,75 @@
import re
import requests
from loguru import logger
from datetime import datetime, timezone
from yt_dlp.extractor.tiktok import TikTokIE
from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media
class TiktokTikwmExtractor(Extractor):
"""
Extractor for TikTok that uses an unofficial API and can capture content that requires a login, like sensitive content.
"""
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.match(TikTokIE._VALID_URL, url):
return False
endpoint = TiktokTikwmExtractor.TIKWM_ENDPOINT.format(url=url)
r = requests.get(endpoint)
if r.status_code != 200:
logger.error(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
return False
try:
json_response = r.json()
except ValueError:
logger.error(f"failed to parse JSON response from tikwm.com for {url=}")
return False
if not json_response.get('msg') == 'success' or not (api_data := json_response.get('data', {})):
logger.error(f"failed to get a valid response from tikwm.com for {url=}: {json_response}")
return False
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
logger.error(f"no valid video URL found in response from tikwm.com for {url=}")
return False
# prepare result, start by downloading video
result = Metadata()
# get the cover if possible
cover_url = api_data.pop("origin_cover", api_data.pop("cover", api_data.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := self.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = self.download_from_url(video_url, f"vid_{api_data.get('id', '')}")
if not video_downloaded:
logger.error(f"failed to download video from {video_url}")
return False
video_media = Media(video_downloaded)
if duration := api_data.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
# add remaining metadata
result.set_title(api_data.pop("title", ""))
if created_at := api_data.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if (author := api_data.pop("author", None)):
result.set("author", author)
result.set("api_data", api_data)
return result.success("tikwm")

View File

@@ -1 +0,0 @@
from .wacz_enricher import WaczExtractorEnricher

View File

@@ -0,0 +1 @@
from .wacz_extractor_enricher import WaczExtractorEnricher

View File

@@ -1,7 +1,7 @@
{
"name": "WACZ Enricher",
"type": ["enricher", "archiver"],
"entry_point": "wacz_enricher::WaczExtractorEnricher",
"name": "WACZ Enricher (and Extractor)",
"type": ["enricher", "extractor"],
"entry_point": "wacz_extractor_enricher::WaczExtractorEnricher",
"requires_setup": True,
"dependencies": {
"python": [
@@ -17,11 +17,19 @@
"configs": {
"profile": {"default": None, "help": "browsertrix-profile (for profile generation see https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles)."},
"docker_commands": {"default": None, "help":"if a custom docker invocation is needed"},
"timeout": {"default": 120, "help": "timeout for WACZ generation in seconds"},
"extract_media": {"default": False, "help": "If enabled all the images/videos/audio present in the WACZ archive will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"extract_screenshot": {"default": True, "help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"timeout": {"default": 120,
"type": "int",
"help": "timeout for WACZ generation in seconds", "type": "int"},
"extract_media": {"default": False,
"type": 'bool',
"help": "If enabled all the images/videos/audio present in the WACZ archive will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."
},
"extract_screenshot": {"default": True,
"type": 'bool',
"help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."
},
"socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"},
"socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"socks_proxy_port": {"default": None, "type":"int", "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"proxy_server": {"default": None, "help": "SOCKS server proxy URL, in development"},
},
"description": """

View File

@@ -221,4 +221,4 @@ class WaczExtractorEnricher(Enricher, Extractor):
to_enrich.add_media(m, warc_fn)
counter += 1
seen_urls.add(record_url)
logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")
logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")

View File

@@ -1,6 +1,6 @@
{
"name": "Wayback Machine Enricher",
"type": ["enricher", "archiver"],
"name": "Wayback Machine Enricher (and Extractor)",
"type": ["enricher", "extractor"],
"entry_point": "wayback_extractor_enricher::WaybackExtractorEnricher",
"requires_setup": True,
"dependencies": {
@@ -9,6 +9,7 @@
"configs": {
"timeout": {
"default": 15,
"type": "int",
"help": "seconds to wait for successful archive confirmation from wayback, if more than this passes the result contains the job_id so the status can later be checked manually.",
},
"if_not_archived_within": {

View File

@@ -10,8 +10,12 @@
"help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"required": True,
"help": "WhisperApi api key for authentication"},
"include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."},
"include_srt": {"default": False,
"type": "bool",
"help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90,
"type": "int",
"help": "How many seconds to wait at most for a successful job completion."},
"action": {"default": "translate",
"help": "which Whisper operation to execute",
"choices": ["transcribe", "translate", "language_detection"]},

View File

@@ -4,7 +4,6 @@ from loguru import logger
from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media
from auto_archiver.core.module import get_module
class WhisperEnricher(Enricher):
"""
@@ -15,7 +14,7 @@ class WhisperEnricher(Enricher):
def setup(self) -> None:
self.stores = self.config['steps']['storages']
self.s3 = get_module("s3_storage", self.config)
self.s3 = self.module_factory.get_module("s3_storage", self.config)
if not "s3_storage" in self.stores:
logger.error("WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called.")
return
@@ -29,8 +28,7 @@ class WhisperEnricher(Enricher):
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
# TODO: this used to pass all storage items to store now
# Now only passing S3, the rest will get added later in the usual order (?)
# Only storing S3, the rest will get added later in the usual order (?)
m.store(url=url, metadata=to_enrich, storages=[self.s3])
try:
job_id = self.submit_job(m)

View File

@@ -2,7 +2,6 @@
# we need to explicitly expose the available imports here
from .misc import *
from .webdriver import Webdriver
from .atlos import get_atlos_config_options
# handy utils from ytdlp
from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none, url_or_none)

View File

@@ -1,13 +0,0 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

View File

@@ -1,9 +1,11 @@
import os
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from dateutil.parser import parse as parse_dt
import requests
import hashlib
from loguru import logger
@@ -46,7 +48,7 @@ def dump_payload(p):
def update_nested_dict(dictionary, update_dict):
# takes 2 dicts and overwrites the first with the second only on the changed balues
# takes 2 dicts and overwrites the first with the second only on the changed values
for key, value in update_dict.items():
if key in dictionary and isinstance(value, dict) and isinstance(dictionary[key], dict):
update_nested_dict(dictionary[key], value)
@@ -59,10 +61,6 @@ def random_str(length: int = 32) -> str:
return str(uuid.uuid4()).replace("-", "")[:length]
def json_loader(cli_val):
return json.loads(cli_val)
def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: int = 16000000) -> str:
hash = hash_algo()
with open(filename, "rb") as f:
@@ -72,26 +70,34 @@ def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: in
hash.update(buf)
return hash.hexdigest()
def get_current_datetime_iso() -> str:
return datetime.now(timezone.utc).replace(tzinfo=timezone.utc).isoformat()
def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) -> datetime | None:
""" parse a datetime string with option of passing a specific format
def get_datetime_from_str(dt_str: str, fmt: str | None = None) -> datetime | None:
# parse a datetime string with option of passing a specific format
Args:
dt_str: the datetime string to parse
fmt: the python date format of the datetime string, if None, dateutil.parser.parse is used
dayfirst: Use this to signify between date formats which put the day first, vs the month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
try:
return datetime.strptime(dt_str, fmt) if fmt else datetime.fromisoformat(dt_str)
return datetime.strptime(dt_str, fmt) if fmt else parse_dt(dt_str, dayfirst=dayfirst)
except ValueError as e:
logger.error(f"Unable to parse datestring {dt_str}: {e}")
return None
def get_timestamp(ts, utc=True, iso=True) -> str | datetime | None:
# Consistent parsing of timestamps
# If utc=True, the timezone is set to UTC,
# if iso=True, the output is an iso string
def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | None:
""" Consistent parsing of timestamps.
Args:
If utc=True, the timezone is set to UTC,
if iso=True, the output is an iso string
Use dayfirst to signify between date formats which put the date vs month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
if not ts: return
try:
if isinstance(ts, str): ts = datetime.fromisoformat(ts)
if isinstance(ts, str): ts = parse_dt(ts, dayfirst=dayfirst)
if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts)
if utc: ts = ts.replace(tzinfo=timezone.utc)
if iso: return ts.isoformat()
@@ -100,5 +106,6 @@ def get_timestamp(ts, utc=True, iso=True) -> str | datetime | None:
logger.error(f"Unable to parse timestamp {ts}: {e}")
return None
def get_current_timestamp() -> str:
return get_timestamp(datetime.now())
return get_timestamp(datetime.now())

View File

@@ -1,5 +1,6 @@
import re
from urllib.parse import urlparse, urlunparse
from ipaddress import ip_address
AUTHWALL_URLS = [
@@ -7,6 +8,43 @@ AUTHWALL_URLS = [
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
]
def check_url_or_raise(url: str) -> bool | ValueError:
"""
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
if not (url.startswith("http://") or url.startswith("https://")):
raise ValueError(f"Invalid URL scheme for url {url}")
parsed = urlparse(url)
if not parsed.hostname:
raise ValueError(f"Invalid URL hostname for url {url}")
if parsed.hostname == "localhost":
raise ValueError(f"Localhost URLs cannot be parsed for security reasons (for url {url})")
if parsed.scheme not in ["http", "https"]:
raise ValueError(f"Invalid URL scheme, only http and https supported (for url {url})")
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError:
pass
else:
if not ip.is_global:
raise ValueError(f"IP address {ip} is not globally reachable")
if ip.is_reserved:
raise ValueError(f"Reserved IP address {ip} used")
if ip.is_link_local:
raise ValueError(f"Link-local IP address {ip} used")
if ip.is_private:
raise ValueError(f"Private IP address {ip} used")
return True
def domain_for_url(url: str) -> str:
"""
SECURITY: parse the domain using urllib to avoid any potential security issues

View File

@@ -1,18 +1,23 @@
""" This Webdriver class acts as a context manager for the selenium webdriver. """
from __future__ import annotations
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.print_page_options import PrintOptions
from loguru import logger
from selenium.webdriver.common.by import By
import os
import time
#import domain_for_url
from urllib.parse import urlparse, urlunparse
from http.cookiejar import MozillaCookieJar
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import exceptions as selenium_exceptions
from selenium.webdriver.common.print_page_options import PrintOptions
from selenium.webdriver.common.by import By
from loguru import logger
class CookieSettingDriver(webdriver.Firefox):
facebook_accept_cookies: bool
@@ -20,6 +25,10 @@ class CookieSettingDriver(webdriver.Firefox):
cookiejar: MozillaCookieJar
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
if os.environ.get('RUNNING_IN_DOCKER'):
# Selenium doesn't support linux-aarch64 driver, we need to set this manually
kwargs['service'] = webdriver.FirefoxService(executable_path='/usr/local/bin/geckodriver')
super(CookieSettingDriver, self).__init__(*args, **kwargs)
self.cookies = cookies
self.cookiejar = cookiejar
@@ -64,14 +73,29 @@ class CookieSettingDriver(webdriver.Firefox):
time.sleep(2)
except Exception as e:
logger.warning(f'Failed on fb accept cookies.', e)
# now get the actual URL
super(CookieSettingDriver, self).get(url)
if self.facebook_accept_cookies:
# try and click the 'close' button on the 'login' window to close it
close_button = self.find_element(By.XPATH, "//div[@role='dialog']//div[@aria-label='Close']")
if close_button:
close_button.click()
try:
xpath = "//div[@role='dialog']//div[@aria-label='Close']"
WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
except selenium_exceptions.NoSuchElementException:
logger.warning("Unable to find the 'close' button on the facebook login window")
pass
else:
# for all other sites, try and use some common button text to reject/accept cookies
for text in ["Refuse non-essential cookies", "Decline optional cookies", "Reject additional cookies", "Reject all", "Accept all cookies"]:
try:
xpath = f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text.lower()}')]"
WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
break
except selenium_exceptions.WebDriverException:
pass
class Webdriver:
@@ -90,7 +114,6 @@ class Webdriver:
setattr(self.print_options, k, v)
def __enter__(self) -> webdriver:
options = webdriver.FirefoxOptions()
options.add_argument("--headless")
options.add_argument(f'--proxy-server={self.http_proxy}')
@@ -105,7 +128,7 @@ class Webdriver:
self.driver.set_window_size(self.width, self.height)
self.driver.set_page_load_timeout(self.timeout_seconds)
self.driver.print_options = self.print_options
except TimeoutException as e:
except selenium_exceptions.TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
return self.driver