From 77b2b099c63071fe42c9542039e0bd0d92ac0058 Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Wed, 19 Feb 2025 15:32:45 +0000 Subject: [PATCH] Replace exit() with raise exceptions. Better for code implementations exit() is reserved solely for command line-called areas now also assert is only recommended for debugging --- src/auto_archiver/core/config.py | 58 +++++++++++++++ src/auto_archiver/core/orchestrator.py | 97 ++++++-------------------- 2 files changed, 80 insertions(+), 75 deletions(-) diff --git a/src/auto_archiver/core/config.py b/src/auto_archiver/core/config.py index 18b0a2a..79ce7e2 100644 --- a/src/auto_archiver/core/config.py +++ b/src/auto_archiver/core/config.py @@ -7,6 +7,7 @@ flexible setup in various environments. import argparse from ruamel.yaml import YAML, CommentedMap, add_representer +import json from loguru import logger @@ -17,6 +18,8 @@ from typing import Any, List, Type, Tuple _yaml: YAML = YAML() +DEFAULT_CONFIG_FILE = "orchestration.yaml" + EMPTY_CONFIG = _yaml.load(""" # Auto Archiver Configuration # Steps are the modules that will be run in the order they are defined @@ -52,6 +55,57 @@ logging: """) # note: 'logging' is explicitly added above in order to better format the config file + +# Arg Parse Actions/Classes +class AuthenticationJsonParseAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + + try: + auth_dict = json.loads(values) + setattr(namespace, self.dest, auth_dict) + except json.JSONDecodeError as e: + raise argparse.ArgumentTypeError(f"Invalid JSON input for argument '{self.dest}': {e}") + + def load_from_file(path): + try: + with open(path, 'r') as f: + try: + auth_dict = json.load(f) + except json.JSONDecodeError: + f.seek(0) + # maybe it's yaml, try that + auth_dict = _yaml.load(f) + if auth_dict.get('authentication'): + auth_dict = auth_dict['authentication'] + auth_dict['load_from_file'] = path + return auth_dict + except: + return None + + if isinstance(auth_dict, dict) and auth_dict.get('from_file'): + auth_dict = load_from_file(auth_dict['from_file']) + elif isinstance(auth_dict, str): + # if it's a string + auth_dict = load_from_file(auth_dict) + + if not isinstance(auth_dict, dict): + raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods") + global_options = ['cookies_from_browser', 'cookies_file', 'load_from_file'] + for key, auth in auth_dict.items(): + if key in global_options: + continue + if not isinstance(key, str) or not isinstance(auth, dict): + raise argparse.ArgumentTypeError(f"Authentication must be a dictionary of site names and their authentication methods. Valid global configs are {global_options}") + + setattr(namespace, self.dest, auth_dict) + + +class UniqueAppendAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + for value in values: + if value not in getattr(namespace, self.dest): + getattr(namespace, self.dest).append(value) + class DefaultValidatingParser(argparse.ArgumentParser): def error(self, message): @@ -82,6 +136,7 @@ class DefaultValidatingParser(argparse.ArgumentParser): return super().parse_known_args(args, namespace) +# Config Utils def to_dot_notation(yaml_conf: CommentedMap | dict) -> dict: dotdict = {} @@ -171,3 +226,6 @@ def store_yaml(config: CommentedMap, yaml_filename: str) -> None: config_to_save.pop('urls', None) with open(yaml_filename, "w", encoding="utf-8") as outf: _yaml.dump(config_to_save, outf) + +def is_valid_config(config: CommentedMap) -> bool: + return config and config != EMPTY_CONFIG \ No newline at end of file diff --git a/src/auto_archiver/core/orchestrator.py b/src/auto_archiver/core/orchestrator.py index 03e3a5e..54f5c32 100644 --- a/src/auto_archiver/core/orchestrator.py +++ b/src/auto_archiver/core/orchestrator.py @@ -11,16 +11,16 @@ from ipaddress import ip_address import argparse import os import sys -import json from tempfile import TemporaryDirectory import traceback from rich_argparse import RichHelpFormatter - +from loguru import logger from .metadata import Metadata, Media from auto_archiver.version import __version__ -from .config import _yaml, read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG, DefaultValidatingParser +from .config import read_yaml, store_yaml, to_dot_notation, merge_dicts, is_valid_config, \ + DefaultValidatingParser, UniqueAppendAction, AuthenticationJsonParseAction, DEFAULT_CONFIG_FILE from .module import ModuleFactory, LazyBaseModule from . import validators, Feeder, Extractor, Database, Storage, Formatter, Enricher from .consts import MODULE_TYPES @@ -30,63 +30,8 @@ if TYPE_CHECKING: from .base_module import BaseModule from .module import LazyBaseModule -DEFAULT_CONFIG_FILE = "orchestration.yaml" - - -class JsonParseAction(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - try: - setattr(namespace, self.dest, json.loads(values)) - except json.JSONDecodeError as e: - raise argparse.ArgumentTypeError(f"Invalid JSON input for argument '{self.dest}': {e}") - - -class AuthenticationJsonParseAction(JsonParseAction): - def __call__(self, parser, namespace, values, option_string=None): - super().__call__(parser, namespace, values, option_string) - auth_dict = getattr(namespace, self.dest) - - def load_from_file(path): - try: - with open(path, 'r') as f: - try: - auth_dict = json.load(f) - except json.JSONDecodeError: - f.seek(0) - # maybe it's yaml, try that - auth_dict = _yaml.load(f) - if auth_dict.get('authentication'): - auth_dict = auth_dict['authentication'] - auth_dict['load_from_file'] = path - return auth_dict - except: - return None - - if isinstance(auth_dict, dict) and auth_dict.get('from_file'): - auth_dict = load_from_file(auth_dict['from_file']) - elif isinstance(auth_dict, str): - # if it's a string - auth_dict = load_from_file(auth_dict) - - if not isinstance(auth_dict, dict): - raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods") - global_options = ['cookies_from_browser', 'cookies_file', 'load_from_file'] - for key, auth in auth_dict.items(): - if key in global_options: - continue - if not isinstance(key, str) or not isinstance(auth, dict): - raise argparse.ArgumentTypeError(f"Authentication must be a dictionary of site names and their authentication methods. Valid global configs are {global_options}") - - setattr(namespace, self.dest, auth_dict) - - -class UniqueAppendAction(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - for value in values: - if value not in getattr(namespace, self.dest): - getattr(namespace, self.dest).append(value) - - +class SetupError(ValueError): + pass class ArchivingOrchestrator: # instance variables @@ -155,7 +100,7 @@ class ArchivingOrchestrator: # TODO: BUG** - basic_config won't have steps in it, since these args aren't added to 'basic_parser' # but should we add them? Or should we just add them to the 'complete' parser? - if yaml_config != EMPTY_CONFIG: + if is_valid_config(yaml_config): # only load the modules enabled in config # TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty? enabled_modules = [] @@ -304,27 +249,25 @@ class ArchivingOrchestrator: step_items = [] modules_to_load = modules_by_type[f"{module_type}s"] - assert modules_to_load, f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)" + if not modules_to_load: + raise SetupError(f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)") def check_steps_ok(): if not len(step_items): - logger.error(f"NO {module_type.upper()}S LOADED. Please check your configuration and try again.") if len(modules_to_load): - logger.error(f"Tried to load the following modules, but none were available: {modules_to_load}") - exit() + logger.error(f"Unable to load any {module_type}s. Tried the following, but none were available: {modules_to_load}") + raise SetupError(f"NO {module_type.upper()}S LOADED. Please check your configuration and try again.") + if (module_type == 'feeder' or module_type == 'formatter') and len(step_items) > 1: - logger.error(f"Only one {module_type} is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}") - exit() + raise SetupError(f"Only one {module_type} is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}") for module in modules_to_load: if module == 'cli_feeder': - # pseudo module, don't load it + # cli_feeder is a pseudo module, it just takes the command line args for [URLS] urls = self.config['urls'] if not urls: - logger.error("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.") - exit() - # cli_feeder is a pseudo module, it just takes the command line args + raise SetupError("No URLs provided. Please provide at least one URL via the command line, or set up an alternative feeder. Use --help for more information.") def feed(self) -> Generator[Metadata]: for url in urls: @@ -350,7 +293,7 @@ class ArchivingOrchestrator: logger.error(f"Error during setup of modules: {e}\n{traceback.format_exc()}") if module_type == 'extractor' and loaded_module.name == module: loaded_module.cleanup() - exit() + raise e if not loaded_module: invalid_modules.append(module) @@ -364,7 +307,7 @@ class ArchivingOrchestrator: def load_config(self, config_file: str) -> dict: if not os.path.exists(config_file) and config_file != DEFAULT_CONFIG_FILE: logger.error(f"The configuration file {config_file} was not found. Make sure the file exists and try again, or run without the --config file to use the default settings.") - exit() + raise FileNotFoundError(f"Configuration file {config_file} not found") return read_yaml(config_file) @@ -429,8 +372,12 @@ class ArchivingOrchestrator: If you wish to make code invocations yourself, you should use the 'setup' and 'feed' methods separately. To test configurations, without loading any modules you can also first call 'setup_configs' """ - self.setup(args) - return self.feed() + try: + self.setup(args) + return self.feed() + except Exception as e: + logger.exception(e) + exit(1) def cleanup(self) -> None: logger.info("Cleaning up")