From 9dc709d3b90603c205b081847362f19027d123f4 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Thu, 24 Nov 2022 15:44:25 +0000 Subject: [PATCH] demo feeder logic working --- orchestrate.yaml | 34 ++++++++-- src/archivers/telethon_archiver.py | 2 +- src/configs/v2config.py | 56 +++++----------- src/enrichers/enricher.py | 3 +- src/feeders/__init__.py | 2 + src/feeders/feeder.py | 23 +++++++ src/feeders/feeder_gsheet.py | 101 +++++++++++++++++++++++++++++ src/orchestrator.py | 54 +++++++-------- src/step.py | 16 +++-- src/utils/__init__.py | 2 +- src/utils/util.py | 3 +- src/v2.py | 5 +- 12 files changed, 216 insertions(+), 85 deletions(-) create mode 100644 src/feeders/__init__.py create mode 100644 src/feeders/feeder.py create mode 100644 src/feeders/feeder_gsheet.py diff --git a/orchestrate.yaml b/orchestrate.yaml index 689765f..3a2bc27 100644 --- a/orchestrate.yaml +++ b/orchestrate.yaml @@ -10,9 +10,9 @@ steps: - webarchive # this way it runs as a failsafe only enrichers: - screenshot - - wacz - - webarchive # this way it runs for every case, webarchive extends archiver and enrichment - - thumbnails + # - wacz + # - webarchive # this way it runs for every case, webarchive extends archiver and enrichment + # - thumbnails formatters: - HTMLFormater - PdfFormater @@ -29,10 +29,32 @@ configurations: global: - save_logs: False gsheets_feeder: - - sheet: "Auto archiver" - - header: "" # defaults to 1 in GSheetsFeeder - - service_account: "secrets/service_account.json" + sheet: auto-archiver-test + header: 2 # defaults to 1 in GSheetsFeeder + service_account: "secrets/service_account.json" + allow_worksheets: "aa-refactor-tests" + block_worksheets: "blocked,test-cases-008" + columns: + 'url': 'link' + 'status': 'archive status' + 'folder': 'destination folder' + 'archive': 'archive location' + 'date': 'archive date' + 'thumbnail': 'thumbnail' + 'thumbnail_index': 'thumbnail index' + 'timestamp': 'upload timestamp' + 'title': 'upload title' + 'duration': 'duration' + 'screenshot': 'screenshot' + 'hash': 'hash' + 'wacz': 'wacz' + 'replaywebpage': 'replaywebpage' tiktok: + api_keys: + - username: 1 + password: 2 + - username: 3 + password: 4 username: "abc" password: "123" token: "here" diff --git a/src/archivers/telethon_archiver.py b/src/archivers/telethon_archiver.py index f0ff194..a2cbf0a 100644 --- a/src/archivers/telethon_archiver.py +++ b/src/archivers/telethon_archiver.py @@ -17,7 +17,7 @@ class TelethonArchiver(Archiver): super().__init__(storage, config) if config.telegram_config: c = config.telegram_config - self.client = TelegramClient("./anon", c.api_id, c.api_hash) + self.client = TelegramClient("./anon.session", c.api_id, c.api_hash) self.bot_token = c.bot_token def _get_media_posts_in_group(self, chat, original_post, max_amp=10): diff --git a/src/configs/v2config.py b/src/configs/v2config.py index bce5669..9eb35df 100644 --- a/src/configs/v2config.py +++ b/src/configs/v2config.py @@ -27,8 +27,10 @@ class ConfigV2: def __init__(self) -> None: self.defaults = {} + self.cli_ops = {} self.config = {} + # TODO: make this work for nested props like gsheets_feeder.columns.url = "URL" def parse(self): # 1. parse CLI values parser = argparse.ArgumentParser( @@ -41,27 +43,15 @@ class ConfigV2: for configurable in self.configurable_parents: child: Step - # print(f"{configurable=}") for child in configurable.__subclasses__(): - # print(f"{child=} {child.configs()=}") - for config, details in child.configs().items(): - print(config, details) assert "." not in child.name, f"class prop name cannot contain dots('.'): {child.name}" assert "." not in config, f"config property cannot contain dots('.'): {config}" - if (is_nested := type(details["default"]) == dict): - for subconfig, subdefault in details["default"].items(): - assert "." not in subconfig, f"config subproperty cannot contain dots('.'): {subconfig}" - config_path = f"{child.name}.{config}.{subconfig}" - parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help'] + f"({subconfig})") - self.defaults[config_path] = subdefault - config_path = f"{child.name}.{config}" - print(config_path) + parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help']) self.defaults[config_path] = details["default"] - if not is_nested: - # nested cannot be directly set on the CLI - parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help']) + if "cli_set" in details: + self.cli_ops[config_path] = details["cli_set"] args = parser.parse_args() @@ -73,31 +63,14 @@ class ConfigV2: # 3. CONFIGS: decide value with priority: CLI >> config.yaml >> default self.config = defaultdict(dict) for config_path, default in self.defaults.items(): - config_steps = config_path.split(".") - if len(config_steps) == 2: # not nested - child, config = tuple(config_steps) - val = getattr(args, config_path, None) - if val is None: - val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default) - # self.config[child][config] = val - - elif len(config_steps) == 3: # nested - child, config, subconfig = tuple(config_steps) - val = getattr(args, config_path) - if config not in self.config[child]: - self.config[child][config] = {} - if val is None: - val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, {}).get(subconfig, default) - print(child, config, subconfig, val) - self.config[child][config][subconfig] = val - - # child, config = tuple(config_path.split(".")) - # # print(config_path) - # val = getattr(args, config_path) - # # print(child, config, val) - # if val is None: - # val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default) - # self.config[child][config] = val + child, config = tuple(config_path.split(".")) + val = getattr(args, config_path) + if val is not None and config_path in self.cli_ops: + val = self.cli_ops[config_path](val, default) + if val is None: + val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default) + # print(child, config, val) + self.config[child][config] = val self.config = dict(self.config) # 4. STEPS: read steps and validate they exist @@ -105,11 +78,12 @@ class ConfigV2: assert "archivers" in steps, "your configuration steps are missing the archivers property" assert "storages" in steps, "your configuration steps are missing the storages property" - print("config.py", self.config) + # print("config.py", self.config) self.feeder = Feeder.init(steps.get("feeder", "cli_feeder"), self.config) self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])] + print("feeder", self.feeder) print("enrichers", [e for e in self.enrichers]) def validate(self): diff --git a/src/enrichers/enricher.py b/src/enrichers/enricher.py index c767b8e..baa22e3 100644 --- a/src/enrichers/enricher.py +++ b/src/enrichers/enricher.py @@ -9,7 +9,8 @@ class Enricher(Step, ABC): name = "enricher" def __init__(self, config: dict) -> None: - Step.__init__(self) + # without this STEP.__init__ is not called + super().__init__(config) # only for typing... diff --git a/src/feeders/__init__.py b/src/feeders/__init__.py new file mode 100644 index 0000000..9fb5942 --- /dev/null +++ b/src/feeders/__init__.py @@ -0,0 +1,2 @@ +from.feeder import Feeder +from .feeder_gsheet import GsheetsFeeder \ No newline at end of file diff --git a/src/feeders/feeder.py b/src/feeders/feeder.py new file mode 100644 index 0000000..6b7ba10 --- /dev/null +++ b/src/feeders/feeder.py @@ -0,0 +1,23 @@ +from __future__ import annotations +from dataclasses import dataclass +from abc import abstractmethod +# from metadata import Metadata +from step import Step + + +@dataclass +class Feeder(Step): + name = "feeder" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + + def init(name: str, config: dict) -> Feeder: + # only for code typing + return Step.init(name, config, Feeder) + + # def feed(self, item: Metadata) -> Metadata: pass + + @abstractmethod + def __iter__(self) -> Feeder: return None \ No newline at end of file diff --git a/src/feeders/feeder_gsheet.py b/src/feeders/feeder_gsheet.py new file mode 100644 index 0000000..7ebc640 --- /dev/null +++ b/src/feeders/feeder_gsheet.py @@ -0,0 +1,101 @@ +import json, gspread + +# from metadata import Metadata +from loguru import logger + +# from . import Enricher +from feeders.feeder import Feeder +from utils import GWorksheet + + +class GsheetsFeeder(Feeder): + name = "gsheets_feeder" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + self.gsheets_client = gspread.service_account(filename=self.service_account) + assert type(self.header) == int, f"header ({self.header}) value must be an integer not {type(self.header)}" + + @staticmethod + def configs() -> dict: + return { + "sheet": {"default": None, "help": "name of the sheet to archive"}, + "header": {"default": 1, "help": "index of the header row (starts at 1)"}, + "service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path"}, + "allow_worksheets": { + "default": set(), + "help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed", + "cli_set": lambda cli_val, cur_val: set(cli_val.split(",")) + }, + "block_worksheets": { + "default": set(), + "help": "(CSV) explicitly block some worksheets from being processed, defaults to empty", + "cli_set": lambda cli_val, cur_val: set(cli_val.split(",")) + }, + "columns": { + "default": { + 'url': 'link', + 'status': 'archive status', + 'folder': 'destination folder', + 'archive': 'archive location', + 'date': 'archive date', + 'thumbnail': 'thumbnail', + 'thumbnail_index': 'thumbnail index', + 'timestamp': 'upload timestamp', + 'title': 'upload title', + 'duration': 'duration', + 'screenshot': 'screenshot', + 'hash': 'hash', + 'wacz': 'wacz', + 'replaywebpage': 'replaywebpage', + }, + "help": "names of columns in the google sheet", + "cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val)) + }, + } + def __iter__(self) -> str: + sh = self.gsheets_client.open(self.sheet) + for ii, wks in enumerate(sh.worksheets()): + if not self.should_process_sheet(wks.title): + logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules") + continue + + logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}') + gw = GWorksheet(wks, header_row=self.header, columns=self.columns) + + if len(missing_cols := self.missing_required_columns(gw)): + logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}") + continue + + for row in range(1 + self.header, gw.count_rows() + 1): + url = gw.get_cell(row, 'url').strip() + if not len(url): continue + #TODO: gsheet_db should check later if this is supposed to be archived + # static_status = gw.get_cell(row, 'status') + # status = gw.get_cell(row, 'status', fresh=static_status in ['', None] and url != '') + # All checks done - archival process starts here + yield url + logger.success(f'Finished worksheet {wks.title}') + + # GWorksheet(self.sheet) + print(self.sheet) + for u in ["url1", "url2"]: + yield u + + + def should_process_sheet(self, sheet_name: str) -> bool: + if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets: + # ALLOW rules exist AND sheet name not explicitly allowed + return False + if len(self.block_worksheets) and sheet_name in self.block_worksheets: + # BLOCK rules exist AND sheet name is blocked + return False + return True + + def missing_required_columns(self, gw: GWorksheet) -> list: + missing = [] + for required_col in ['url', 'status']: + if not gw.col_exists(required_col): + missing.append(required_col) + return missing diff --git a/src/orchestrator.py b/src/orchestrator.py index 272919f..f32f4c9 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -2,6 +2,8 @@ from __future__ import annotations from typing import Union, Dict from dataclasses import dataclass +from enrichers.enricher import Enricher + """ how not to couple the different pieces of logic due to the use of constants for the metadata keys? @@ -110,49 +112,47 @@ class ArchivingOrchestrator: def __init__(self, config) -> None: # in config.py we should test that the archivers exist and log mismatches (blocking execution) # identify each formatter, storage, database, etc - self.feeder = Feeder.init(config.feeder, config.get(config.feeder)) + # self.feeder = Feeder.init(config.feeder, config.get(config.feeder)) # Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheets_feeder.sheet via CLI # where does that update/processing happen? in config.py # reflection for Archiver to know wihch child classes it has? use Archiver.__subclasses__ - self.archivers = [ - Archiver.init(a, config) - for a in config.archivers - ] + # self.archivers = [ + # Archiver.init(a, config) + # for a in config.archivers + # ] + self.feeder = config.feeder + self.enrichers = config.enrichers - self.enrichers = [ - Enricher.init(e, config) - for e in config.enrichers - ] + # self.formatters = [ + # Formatter.init(f, config) + # for f in config.formatters + # ] - self.formatters = [ - Formatter.init(f, config) - for f in config.formatters - ] + # self.storages = [ + # Storage.init(s, config) + # for s in config.storages + # ] - self.storages = [ - Storage.init(s, config) - for s in config.storages - ] - - self.databases = [ - Database.init(f, config) - for f in config.formatters - ] + # self.databases = [ + # Database.init(f, config) + # for f in config.formatters + # ] # these rules are checked in config.py - assert len(archivers) > 1, "there needs to be at least one Archiver" + # assert len(archivers) > 1, "there needs to be at least one Archiver" - def feed(self, feeder: Feeder) -> list(ArchiveResult): - for next in feeder: - self.archive(next) + def feed(self) -> list(ArchiveResult): + for url in self.feeder: + print("ARCHIVING", url) + # self.archive(url) # how does this handle the parameters like folder which can be different for each archiver? # the storage needs to know where to archive!! # solution: feeders have context: extra metadata that they can read or ignore, # all of it should have sensible defaults (eg: folder) # default feeder is a list with 1 element - def archive(url) -> Union[ArchiveResult, None]: + def archive(self, url) -> Union[ArchiveResult, None]: url = clear_url(url) result = Metadata(url=url) diff --git a/src/step.py b/src/step.py index d717386..04d7a61 100644 --- a/src/step.py +++ b/src/step.py @@ -1,16 +1,21 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field +from inspect import ClassFoundException from typing import Type from metadata import Metadata from abc import ABC +# from collections.abc import Iterable @dataclass class Step(ABC): - name : str = None + name: str = None def __init__(self, config: dict) -> None: - self.config = self.config[self.name] + # reads the configs into object properties + # self.config = config[self.name] + for k, v in config[self.name].items(): + self.__setattr__(k, v) @staticmethod def configs() -> dict: {} @@ -21,8 +26,9 @@ class Step(ABC): """ for sub in child.__subclasses__(): if sub.name == name: - return sub.__init__(config) - raise f"Unable to initialize class with {name=}" + print(sub.name, "CALLING NEW") + return sub(config) + raise ClassFoundException(f"Unable to initialize STEP with {name=}") def get_url(self, item: Metadata) -> str: url = item.get("url") diff --git a/src/utils/__init__.py b/src/utils/__init__.py index baea5e9..ad56f36 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -1,4 +1,4 @@ # we need to explicitly expose the available imports here -from .gworksheet import * +from .gworksheet import GWorksheet from .misc import * from .util import Util \ No newline at end of file diff --git a/src/utils/util.py b/src/utils/util.py index 9ad5b53..51bb2e3 100644 --- a/src/utils/util.py +++ b/src/utils/util.py @@ -11,8 +11,7 @@ class Util(Step, ABC): def __init__(self, config: dict) -> None: Step.__init__(self) - - # only for typing... + # only for typing... def init(name: str, config: dict) -> Util: return super().init(name, config, Util) diff --git a/src/v2.py b/src/v2.py index 8fa544f..8ecb820 100644 --- a/src/v2.py +++ b/src/v2.py @@ -1,9 +1,12 @@ +from abc import ABC from configs.v2config import ConfigV2 from orchestrator import ArchivingOrchestrator config = ConfigV2() config.parse() -# orchestrator = ArchivingOrchestrator(config) \ No newline at end of file +orchestrator = ArchivingOrchestrator(config) + +orchestrator.feed()