Merge branch 'main' into tests/add_module_tests

This commit is contained in:
erinhmclark
2025-02-17 09:03:04 +00:00
3 changed files with 46 additions and 39 deletions

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "0.13.1"
version = "0.13.2"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"

View File

@@ -30,6 +30,7 @@ from loguru import logger
DEFAULT_CONFIG_FILE = "orchestration.yaml"
class JsonParseAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
@@ -60,6 +61,8 @@ class AuthenticationJsonParseAction(JsonParseAction):
if not isinstance(site, str) or not isinstance(auth, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
setattr(namespace, self.dest, auth_dict)
class UniqueAppendAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace, self.dest):
@@ -68,6 +71,7 @@ class UniqueAppendAction(argparse.Action):
if value not in getattr(namespace, self.dest):
getattr(namespace, self.dest).append(value)
class ArchivingOrchestrator:
feeders: List[Type[Feeder]]
@@ -76,17 +80,17 @@ class ArchivingOrchestrator:
databases: List[Type[Database]]
storages: List[Type[Storage]]
formatters: List[Type[Formatter]]
def setup_basic_parser(self):
parser = argparse.ArgumentParser(
prog="auto-archiver",
add_help=False,
description="""
prog="auto-archiver",
add_help=False,
description="""
Auto Archiver is a CLI tool to archive media/metadata from online URLs;
it can read URLs from many sources (Google Sheets, Command Line, ...); and write results to many destinations too (CSV, Google Sheets, MongoDB, ...)!
""",
epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter,
epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter,
)
parser.add_argument('--help', '-h', action='store_true', dest='help', help='show a full help message and exit')
parser.add_argument('--version', action='version', version=__version__)
@@ -130,7 +134,7 @@ class ArchivingOrchestrator:
# for simple mode, we use the cli_feeder and any modules that don't require setup
yaml_config['steps']['feeders'] = ['cli_feeder']
# add them to the config
for module in simple_modules:
for module_type in module.type:
@@ -155,23 +159,22 @@ class ArchivingOrchestrator:
if unknown:
logger.warning(f"Ignoring unknown/unused arguments: {unknown}\nPerhaps you don't have this module enabled?")
if (self.config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
logger.info(f"Storing configuration file to {basic_config.config_file}")
store_yaml(self.config, basic_config.config_file)
return self.config
def add_additional_args(self, parser: argparse.ArgumentParser = None):
if not parser:
parser = self.parser
# allow passing URLs directly on the command line
parser.add_argument('urls', nargs='*', default=[], help='URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml')
parser.add_argument('--feeders', dest='steps.feeders', nargs='+', default=['cli_feeder'], help='the feeders to use', action=UniqueAppendAction)
parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction)
parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction)
parser.add_argument('--extractors', dest='steps.extractors', nargs='+', help='the extractors to use', action=UniqueAppendAction)
parser.add_argument('--databases', dest='steps.databases', nargs='+', help='the databases to use', action=UniqueAppendAction)
parser.add_argument('--storages', dest='steps.storages', nargs='+', help='the storages to use', action=UniqueAppendAction)
@@ -180,7 +183,7 @@ class ArchivingOrchestrator:
parser.add_argument('--authentication', dest='authentication', help='A dictionary of sites and their authentication methods \
(token, username etc.) that extractors can use to log into \
a website. If passing this on the command line, use a JSON string. \
You may also pass a path to a valid JSON/YAML file which will be parsed.',\
You may also pass a path to a valid JSON/YAML file which will be parsed.',
default={},
action=AuthenticationJsonParseAction)
# logging arguments
@@ -188,7 +191,6 @@ class ArchivingOrchestrator:
parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None)
parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None)
def add_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None) -> None:
if not modules:
@@ -225,16 +227,16 @@ class ArchivingOrchestrator:
def show_help(self, basic_config: dict):
# for the help message, we want to load *all* possible modules and show the help
# add configs as arg parser arguments
# add configs as arg parser arguments
self.add_additional_args(self.basic_parser)
self.add_module_args(parser=self.basic_parser)
self.basic_parser.print_help()
self.basic_parser.exit()
def setup_logging(self):
# setup loguru logging
logger.remove(0) # remove the default logger
logger.remove(0) # remove the default logger
logging_config = self.config['logging']
logger.add(sys.stderr, level=logging_config['level'])
if log_file := logging_config['file']:
@@ -246,7 +248,7 @@ class ArchivingOrchestrator:
orchestrator's attributes (self.feeders, self.extractors etc.). If no modules of a certain type
are loaded, the program will exit with an error message.
"""
invalid_modules = []
for module_type in BaseModule.MODULE_TYPES:
@@ -273,6 +275,7 @@ class ArchivingOrchestrator:
logger.error("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.")
exit()
# cli_feeder is a pseudo module, it just takes the command line args
def feed(self) -> Generator[Metadata]:
for url in urls:
logger.debug(f"Processing URL: '{url}'")
@@ -284,7 +287,6 @@ class ArchivingOrchestrator:
'__iter__': feed
})()
pseudo_module.__iter__ = feed
step_items.append(pseudo_module)
@@ -308,16 +310,18 @@ class ArchivingOrchestrator:
check_steps_ok()
setattr(self, f"{module_type}s", step_items)
def load_config(self, config_file: str) -> dict:
if not os.path.exists(config_file) and config_file != DEFAULT_CONFIG_FILE:
logger.error(f"The configuration file {config_file} was not found. Make sure the file exists and try again, or run without the --config file to use the default settings.")
exit()
return read_yaml(config_file)
def run(self, args: list) -> None:
def setup(self, args: list):
"""
Main entry point for the orchestrator, sets up the basic parser, loads the config file, and sets up the complete parser
"""
self.setup_basic_parser()
# parse the known arguments for now (basically, we want the config file)
@@ -340,16 +344,18 @@ class ArchivingOrchestrator:
for module_type in BaseModule.MODULE_TYPES:
logger.info(f"{module_type.upper()}S: " + ", ".join(m.display_name for m in getattr(self, f"{module_type}s")))
for _ in self.feed():
pass
def run(self, args: list) -> Generator[Metadata]:
def cleanup(self)->None:
self.setup(args)
return self.feed()
def cleanup(self) -> None:
logger.info("Cleaning up")
for e in self.extractors:
e.cleanup()
def feed(self) -> Generator[Metadata]:
url_count = 0
for feeder in self.feeders:
for item in feeder:
@@ -393,7 +399,6 @@ class ArchivingOrchestrator:
m.tmp_dir = None
tmp_dir.cleanup()
def archive(self, result: Metadata) -> Union[Metadata, None]:
"""
Runs the archiving process for a single URL
@@ -440,13 +445,13 @@ class ArchivingOrchestrator:
try:
result.merge(a.download(result))
if result.is_success(): break
except Exception as e:
except Exception as e:
logger.error(f"ERROR archiver {a.name}: {e}: {traceback.format_exc()}")
# 4 - call enrichers to work with archived content
for e in self.enrichers:
try: e.enrich(result)
except Exception as exc:
except Exception as exc:
logger.error(f"ERROR enricher {e.name}: {exc}: {traceback.format_exc()}")
# 5 - store all downloaded/generated media
@@ -474,13 +479,13 @@ class ArchivingOrchestrator:
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
assert url.startswith("http://") or url.startswith("https://"), f"Invalid URL scheme"
parsed = urlparse(url)
assert parsed.scheme in ["http", "https"], f"Invalid URL scheme"
assert parsed.hostname, f"Invalid URL hostname"
assert parsed.hostname != "localhost", f"Invalid URL"
try: # special rules for IP addresses
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError: pass
else:
@@ -489,9 +494,8 @@ class ArchivingOrchestrator:
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
# Helper Properties
@property
def all_modules(self) -> List[Type[BaseModule]]:
return self.feeders + self.extractors + self.enrichers + self.databases + self.storages + self.formatters
return self.feeders + self.extractors + self.enrichers + self.databases + self.storages + self.formatters

View File

@@ -23,7 +23,6 @@ class TestTwitterApiExtractor(TestExtractorBase):
}
@pytest.mark.parametrize("url, expected", [
("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL
("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # don't strip params from twitter urls (changed Jan 2025)
@@ -32,7 +31,11 @@ class TestTwitterApiExtractor(TestExtractorBase):
])
def test_sanitize_url(self, url, expected):
assert expected == self.extractor.sanitize_url(url)
@pytest.mark.download
def test_sanitize_url_download(self):
assert "https://t.co/yl3oOJatFp" == self.extractor.sanitize_url("https://www.bellingcat.com/category/resources/")
@pytest.mark.parametrize("url, exptected_username, exptected_tweetid", [
("https://twitter.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),