Further tidyups + refactoring for new structure

* Add implementation tests for orchestrator + logging tests
* Standardise method/class vars for extractors to see if they are suitable
* Fix bugs with removing default loguru logger (allows further customisation)
* Fix bug loading required fields from file
*
This commit is contained in:
Patrick Robertson
2025-01-30 13:21:10 +01:00
parent cddae65a90
commit b7d9145f6c
22 changed files with 292 additions and 51 deletions

24
poetry.lock generated
View File

@@ -1025,7 +1025,7 @@ version = "0.7.3"
description = "Python logging made (stupidly) simple"
optional = false
python-versions = "<4.0,>=3.5"
groups = ["main"]
groups = ["main", "dev"]
files = [
{file = "loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c"},
{file = "loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6"},
@@ -1750,6 +1750,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[[package]]
name = "pytest-loguru"
version = "0.4.0"
description = "Pytest Loguru"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "pytest_loguru-0.4.0-py3-none-any.whl", hash = "sha256:3cc7b9c6b22cb158209ccbabf0d678dacd3f3c7497d6f46f1c338c13bee1ac77"},
{file = "pytest_loguru-0.4.0.tar.gz", hash = "sha256:0d9e4e72ae9bfd92f774c666e7353766af11b0b78edd59c290e89be116050f03"},
]
[package.dependencies]
loguru = "*"
[package.extras]
test = ["pytest", "pytest-cov"]
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
@@ -3032,7 +3050,7 @@ version = "1.2.0"
description = "A small Python utility to set file creation time on Windows"
optional = false
python-versions = ">=3.5"
groups = ["main"]
groups = ["main", "dev"]
markers = "sys_platform == \"win32\""
files = [
{file = "win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390"},
@@ -3082,4 +3100,4 @@ test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "1556d53c5a94392c120ebaafc495d3b322daf64dac4a19f9726588c7f3d84bca"
content-hash = "5a54c84ba388db7b77d1c28973b710fc99aa3822a2860b30acaf5b02ba1927bd"

View File

@@ -63,6 +63,7 @@ dependencies = [
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.4"
autopep8 = "^2.3.1"
pytest-loguru = "^0.4.0"
[tool.poetry.group.docs.dependencies]
sphinx = "^8.1.3"

View File

@@ -1,8 +1,9 @@
""" Entry point for the auto_archiver package. """
from auto_archiver.core.orchestrator import ArchivingOrchestrator
import sys
def main():
ArchivingOrchestrator().run()
ArchivingOrchestrator().run(sys.argv)
if __name__ == "__main__":
main()

View File

View File

@@ -48,6 +48,10 @@ class DefaultValidatingParser(argparse.ArgumentParser):
"""
for action in self._actions:
if not namespace or action.dest not in namespace:
# for actions that are required and already have a default value, remove the 'required' check
if action.required and action.default is not None:
action.required = False
if action.default is not None:
try:
self._check_value(action, action.default)

View File

@@ -11,9 +11,12 @@ from abc import abstractmethod
from dataclasses import dataclass
import mimetypes
import os
import mimetypes, requests
import mimetypes
import requests
from loguru import logger
from retrying import retry
import re
from ..core import Metadata, ArchivingContext, BaseModule
@@ -25,6 +28,8 @@ class Extractor(BaseModule):
Subclasses must implement the `download` method to define platform-specific behavior.
"""
valid_url: re.Pattern = None
def cleanup(self) -> None:
# called when extractors are done, or upon errors, cleanup any resources
pass
@@ -32,13 +37,20 @@ class Extractor(BaseModule):
def sanitize_url(self, url: str) -> str:
# used to clean unnecessary URL parameters OR unfurl redirect links
return url
def match_link(self, url: str) -> re.Match:
return self.valid_url.match(url)
def suitable(self, url: str) -> bool:
"""
Returns True if this extractor can handle the given URL
Should be overridden by subclasses
"""
if self.valid_url:
return self.match_link(url) is not None
return True
def _guess_file_type(self, path: str) -> str:

View File

@@ -83,6 +83,11 @@ def setup_paths(paths: list[str]) -> None:
"""
for path in paths:
# check path exists, if it doesn't, log a warning
if not os.path.exists(path):
logger.warning(f"Path '{path}' does not exist. Skipping...")
continue
# see odoo/module/module.py -> initialize_sys_path
if path not in auto_archiver.modules.__path__:
auto_archiver.modules.__path__.append(path)

View File

@@ -43,6 +43,7 @@ class ArchivingOrchestrator:
def setup_basic_parser(self):
parser = argparse.ArgumentParser(
prog="auto-archiver",
add_help=False,
description="""
Auto Archiver is a CLI tool to archive media/metadata from online URLs;
@@ -51,15 +52,16 @@ class ArchivingOrchestrator:
epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter,
)
parser.add_argument('--config', action='store', dest="config_file", help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default=DEFAULT_CONFIG_FILE)
parser.add_argument('--help', '-h', action='store_true', dest='help', help='show this help message and exit')
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--config', action='store', dest="config_file", help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default=DEFAULT_CONFIG_FILE)
parser.add_argument('--mode', action='store', dest='mode', type=str, choices=['simple', 'full'], help='the mode to run the archiver in', default='simple')
# override the default 'help' so we can inject all the configs and show those
parser.add_argument('-h', '--help', action='store_true', dest='help', help='show this help message and exit')
parser.add_argument('-s', '--store', dest='store', default=False, help='Store the created config in the config file', action=argparse.BooleanOptionalAction)
parser.add_argument('--module_paths', dest='module_paths', nargs='+', default=[], help='additional paths to search for modules', action=UniqueAppendAction)
self.basic_parser = parser
return parser
def setup_complete_parser(self, basic_config: dict, yaml_config: dict, unused_args: list[str]) -> None:
parser = DefaultValidatingParser(
@@ -78,15 +80,15 @@ class ArchivingOrchestrator:
# only load the modules enabled in config
# TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty?
enabled_modules = []
for module_type in BaseModule.MODULE_TYPES:
enabled_modules.extend(yaml_config['steps'].get(f"{module_type}s", []))
# add in any extra modules that have been passed on the command line for 'feeders', 'enrichers', 'archivers', 'databases', 'storages', 'formatter'
for module_type in BaseModule.MODULE_TYPES:
if modules := getattr(basic_config, f"{module_type}s", []):
enabled_modules.extend(modules)
# first loads the modules from the config file, then from the command line
for config in [yaml_config['steps'], basic_config.__dict__]:
for module_type in BaseModule.MODULE_TYPES:
enabled_modules.extend(config.get(f"{module_type}s", []))
avail_modules = available_modules(with_manifest=True, limit_to_modules=list(dict.fromkeys(enabled_modules)), suppress_warnings=True)
# clear out duplicates, but keep the order
enabled_modules = list(dict.fromkeys(enabled_modules))
avail_modules = available_modules(with_manifest=True, limit_to_modules=enabled_modules, suppress_warnings=True)
self.add_module_args(avail_modules, parser)
elif basic_config.mode == 'simple':
simple_modules = [module for module in available_modules(with_manifest=True) if not module.requires_setup]
@@ -163,6 +165,10 @@ class ArchivingOrchestrator:
# make a nicer metavar, metavar is what's used in the help, e.g. --cli_feeder.urls [METAVAR]
kwargs['metavar'] = name.upper()
if kwargs.get('required', False):
# required args shouldn't have a 'default' value, remove it
kwargs.pop('default', None)
kwargs.pop('cli_set', None)
should_store = kwargs.pop('should_store', False)
kwargs['dest'] = f"{module.name}.{kwargs.pop('dest', name)}"
@@ -179,13 +185,12 @@ class ArchivingOrchestrator:
self.add_additional_args(self.basic_parser)
self.add_module_args(parser=self.basic_parser)
self.basic_parser.print_help()
exit()
self.basic_parser.exit()
def setup_logging(self):
# setup loguru logging
logger.remove() # remove the default logger
logger.remove(0) # remove the default logger
logging_config = self.config['logging']
logger.add(sys.stderr, level=logging_config['level'])
if log_file := logging_config['file']:
@@ -194,14 +199,18 @@ class ArchivingOrchestrator:
def install_modules(self):
"""
Swaps out the previous 'strings' in the config with the actual modules
Swaps out the previous 'strings' in the config with the actual modules and loads them
"""
invalid_modules = []
for module_type in BaseModule.MODULE_TYPES:
step_items = []
modules_to_load = self.config['steps'][f"{module_type}s"]
assert modules_to_load, f"No {module_type}s were configured. Make sure to set at least one {module_type} \
in your configuration file or on the command line (using --{module_type}s)"
def check_steps_ok():
if not len(step_items):
logger.error(f"NO {module_type.upper()}S LOADED. Please check your configuration and try again.")
@@ -239,30 +248,29 @@ class ArchivingOrchestrator:
assert len(step_items) > 0, f"No {module_type}s were loaded. Please check your configuration file and try again."
self.config['steps'][f"{module_type}s"] = step_items
def load_config(self, config_file: str) -> dict:
if not os.path.exists(config_file) and config_file != DEFAULT_CONFIG_FILE:
logger.error(f"The configuration file {config_file} was not found. Make sure the file exists and try again, or run without the --config file to use the default settings.")
exit()
def run(self) -> None:
return read_yaml(config_file)
def run(self, args: list) -> None:
self.setup_basic_parser()
# parse the known arguments for now (basically, we want the config file)
basic_config, unused_args = self.basic_parser.parse_known_args(args)
# load the config file to get the list of enabled items
basic_config, unused_args = self.basic_parser.parse_known_args()
# setup any custom module paths, so they'll show in the help and for arg parsing
setup_paths(basic_config.module_paths)
# if help flag was called, then show the help
if basic_config.help:
self.show_help(basic_config)
# load the config file
yaml_config = {}
if not os.path.exists(basic_config.config_file) and basic_config.config_file != DEFAULT_CONFIG_FILE:
logger.error(f"The configuration file {basic_config.config_file} was not found. Make sure the file exists and try again, or run without the --config file to use the default settings.")
exit()
yaml_config = read_yaml(basic_config.config_file)
yaml_config = self.load_config(basic_config.config_file)
self.setup_complete_parser(basic_config, yaml_config, unused_args)
logger.info(f"======== Welcome to the AUTO ARCHIVER ({__version__}) ==========")

View File

@@ -28,7 +28,7 @@ class InstagramAPIExtractor(Extractor):
# TODO: improvement collect aggregates of locations[0].location and mentions for all posts
"""
global_pattern = re.compile(
valid_url = re.compile(
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
)
@@ -44,7 +44,7 @@ class InstagramAPIExtractor(Extractor):
url.replace("instagr.com", "instagram.com").replace(
"instagr.am", "instagram.com"
)
insta_matches = self.global_pattern.findall(url)
insta_matches = self.valid_url.findall(url)
logger.info(f"{insta_matches=}")
if not len(insta_matches) or len(insta_matches[0]) != 3:
return

View File

@@ -16,10 +16,13 @@ class InstagramExtractor(Extractor):
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
# NB: post regex should be tested before profile
valid_url = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/")
# https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(?:p|reel)\/(\w+)")
post_pattern = re.compile(r"{valid_url}(?:p|reel)\/(\w+)".format(valid_url=valid_url))
# https://regex101.com/r/6Wbsxa/1
profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)")
profile_pattern = re.compile(r"{valid_url}(\w+)".format(valid_url=valid_url))
# TODO: links to stories
def setup(self, config: dict) -> None:

View File

@@ -14,7 +14,7 @@ from auto_archiver.utils import random_str
class TelethonArchiver(Extractor):
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
@@ -92,7 +92,7 @@ class TelethonArchiver(Extractor):
"""
url = item.get_url()
# detect URLs that we definitely cannot handle
match = self.link_pattern.search(url)
match = self.valid_url.search(url)
logger.debug(f"TELETHON: {match=}")
if not match: return False

View File

@@ -12,7 +12,7 @@ from auto_archiver.core import Extractor
from auto_archiver.core import Metadata,Media
class TwitterApiExtractor(Extractor):
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
valid_url = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def setup(self, config: dict) -> None:
super().setup(config)
@@ -54,7 +54,7 @@ class TwitterApiExtractor(Extractor):
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
matches = self.valid_url.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported

View File

@@ -2,8 +2,11 @@ import re
from urllib.parse import urlparse, urlunparse
class UrlUtil:
telegram_private = re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)")
is_istagram = re.compile(r"https:\/\/www\.instagram\.com")
AUTHWALL_URLS = [
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
]
@staticmethod
def clean(url: str) -> str: return url
@@ -13,8 +16,9 @@ class UrlUtil:
"""
checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work
"""
if UrlUtil.telegram_private.match(url): return True
if UrlUtil.is_istagram.match(url): return True
for regex in UrlUtil.AUTHWALL_URLS:
if regex.match(url):
return True
return False

View File

@@ -1,4 +0,0 @@
from auto_archiver.core.extractor import Extractor
class ExampleModule(Extractor):
def download(self, item):
print("do something")

View File

@@ -1,10 +1,11 @@
{
"name": "Example Module",
"type": ["extractor"],
"type": ["extractor", "feeder", "formatter", "storage", "enricher", "database"],
"requires_setup": False,
"dependencies": {"python": ["loguru"]
},
"configs": {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
"csv_file": {"default": "db.csv", "help": "CSV file name"},
"required_field": {"required": True, "help": "required field in the CSV file"},
},
}

View File

@@ -0,0 +1,28 @@
from auto_archiver.core import Extractor, Enricher, Feeder, Database, Storage, Formatter, Metadata
class ExampleModule(Extractor, Enricher, Feeder, Database, Storage, Formatter):
def download(self, item):
print("download")
def __iter__(self):
yield Metadata().set_url("https://example.com")
def done(self, result):
print("done")
def enrich(self, to_enrich):
print("enrich")
def get_cdn_url(self, media):
return "nice_url"
def save(self, item):
print("save")
def uploadf(self, file, key, **kwargs):
print("uploadf")
def format(self, item):
print("format")

View File

@@ -0,0 +1,16 @@
steps:
feeders:
- example_module
extractors:
- example_module
formatters:
- example_module
storages:
- example_module
databases:
- example_module
enrichers:
- example_module
# Global configuration

View File

@@ -9,7 +9,7 @@ class TestExtractorBase(object):
config: dict = None
@pytest.fixture(autouse=True)
def setup_archiver(self, setup_module):
def setup_extractor(self, setup_module):
assert self.extractor_module is not None, "self.extractor_module must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"

View File

@@ -0,0 +1,21 @@
import pytest
from auto_archiver.modules.instagram_extractor import InstagramExtractor
from .test_extractor_base import TestExtractorBase
class TestInstagramExtractor(TestExtractorBase):
extractor_module: str = 'instagram_extractor'
config: dict = {}
@pytest.mark.parametrize("url", [
"https://www.instagram.com/p/",
"https://www.instagram.com/p/1234567890/",
"https://www.instagram.com/reel/1234567890/",
"https://www.instagram.com/username/",
"https://www.instagram.com/username/stories/",
"https://www.instagram.com/username/highlights/",
])
def test_regex_matches(self, url):
# post
assert InstagramExtractor.valid_url.match(url)

View File

@@ -7,7 +7,7 @@ def example_module():
import auto_archiver
previous_path = auto_archiver.modules.__path__
auto_archiver.modules.__path__.append("tests/data/")
auto_archiver.modules.__path__.append("tests/data/test_modules/")
module = get_module_lazy("example_module")
yield module

123
tests/test_orchestrator.py Normal file
View File

@@ -0,0 +1,123 @@
import pytest
import sys
from argparse import ArgumentParser
from auto_archiver.core.orchestrator import ArchivingOrchestrator
from auto_archiver.version import __version__
from auto_archiver.core.config import read_yaml, store_yaml
TEST_ORCHESTRATION = "tests/data/test_orchestration.yaml"
TEST_MODULES = "tests/data/test_modules/"
@pytest.fixture
def test_args():
return ["--config", TEST_ORCHESTRATION,
"--module_paths", TEST_MODULES,
"--example_module.required_field", "some_value"] # just set this for normal testing, we will remove it later
@pytest.fixture
def orchestrator():
yield ArchivingOrchestrator()
# hack - the loguru logger starts with one logger, but if orchestrator has run before
# it'll remove the default logger, add it back in:
from loguru import logger
if not logger._core.handlers.get(0):
logger._core.handlers_count = 0
logger.add(sys.stderr)
# and remove the custom logger
if logger._core.handlers.get(1):
logger.remove(1)
@pytest.fixture
def basic_parser(orchestrator) -> ArgumentParser:
return orchestrator.setup_basic_parser()
def test_setup_orchestrator(orchestrator):
assert orchestrator is not None
def test_parse_config():
pass
def test_parse_basic(basic_parser):
args = basic_parser.parse_args(["--config", TEST_ORCHESTRATION])
assert args.config_file == TEST_ORCHESTRATION
@pytest.mark.parametrize("mode", ["simple", "full"])
def test_mode(basic_parser, mode):
args = basic_parser.parse_args(["--mode", mode])
assert args.mode == mode
def test_mode_invalid(basic_parser, capsys):
with pytest.raises(SystemExit) as exit_error:
basic_parser.parse_args(["--mode", "invalid"])
assert exit_error.value.code == 2
assert "invalid choice" in capsys.readouterr().err
def test_version(basic_parser, capsys):
with pytest.raises(SystemExit) as exit_error:
basic_parser.parse_args(["--version"])
assert exit_error.value.code == 0
assert capsys.readouterr().out == f"{__version__}\n"
def test_help(orchestrator, basic_parser, capsys):
args = basic_parser.parse_args(["--help"])
assert args.help == True
# test the show_help() on orchestrator
with pytest.raises(SystemExit) as exit_error:
orchestrator.show_help(args)
assert exit_error.value.code == 0
assert "Usage: auto-archiver [--help] [--version] [--config CONFIG_FILE]" in capsys.readouterr().out
def test_add_custom_modules_path(orchestrator, test_args):
orchestrator.run(test_args)
import auto_archiver
assert "tests/data/test_modules/" in auto_archiver.modules.__path__
def test_add_custom_modules_path_invalid(orchestrator, caplog, test_args):
orchestrator.run(test_args + # we still need to load the real path to get the example_module
["--module_paths", "tests/data/invalid_test_modules/"])
# assert False
assert caplog.records[0].message == "Path 'tests/data/invalid_test_modules/' does not exist. Skipping..."
def test_check_required_values(orchestrator, caplog, test_args):
# drop the example_module.required_field from the test_args
test_args = test_args[:-2]
with pytest.raises(SystemExit) as exit_error:
orchestrator.run(test_args)
assert caplog.records[1].message == "the following arguments are required: --example_module.required_field"
def test_get_required_values_from_config(orchestrator, test_args, tmp_path):
# load the default example yaml, add a required field, then run the orchestrator
test_yaml = read_yaml(TEST_ORCHESTRATION)
test_yaml['example_module'] = {'required_field': 'some_value'}
# write it to a temp file
tmp_file = (tmp_path / "temp_config.yaml").as_posix()
store_yaml(test_yaml, tmp_file)
# run the orchestrator
orchestrator.run(["--config", tmp_file, "--module_paths", TEST_MODULES])
# should run OK, since there are no missing required fields
# basic_args = basic_parser.parse_known_args(test_args)
# test_yaml = read_yaml(TEST_ORCHESTRATION)
# test_yaml['example_module'] = {'required_field': 'some_value'}
# # monkey patch the example_module to have a 'configs' setting of 'my_var' with required=True
# # load the module first
# m = get_module_lazy("example_module")
# orchestrator.setup_complete_parser(basic_args, test_yaml, unused_args=[])
# assert orchestrator.config is not None