demo feeder logic working

This commit is contained in:
msramalho
2022-11-24 15:44:25 +00:00
parent 618e7ed0a3
commit 9dc709d3b9
12 changed files with 216 additions and 85 deletions

View File

@@ -10,9 +10,9 @@ steps:
- webarchive # this way it runs as a failsafe only
enrichers:
- screenshot
- wacz
- webarchive # this way it runs for every case, webarchive extends archiver and enrichment
- thumbnails
# - wacz
# - webarchive # this way it runs for every case, webarchive extends archiver and enrichment
# - thumbnails
formatters:
- HTMLFormater
- PdfFormater
@@ -29,10 +29,32 @@ configurations:
global:
- save_logs: False
gsheets_feeder:
- sheet: "Auto archiver"
- header: "" # defaults to 1 in GSheetsFeeder
- service_account: "secrets/service_account.json"
sheet: auto-archiver-test
header: 2 # defaults to 1 in GSheetsFeeder
service_account: "secrets/service_account.json"
allow_worksheets: "aa-refactor-tests"
block_worksheets: "blocked,test-cases-008"
columns:
'url': 'link'
'status': 'archive status'
'folder': 'destination folder'
'archive': 'archive location'
'date': 'archive date'
'thumbnail': 'thumbnail'
'thumbnail_index': 'thumbnail index'
'timestamp': 'upload timestamp'
'title': 'upload title'
'duration': 'duration'
'screenshot': 'screenshot'
'hash': 'hash'
'wacz': 'wacz'
'replaywebpage': 'replaywebpage'
tiktok:
api_keys:
- username: 1
password: 2
- username: 3
password: 4
username: "abc"
password: "123"
token: "here"

View File

@@ -17,7 +17,7 @@ class TelethonArchiver(Archiver):
super().__init__(storage, config)
if config.telegram_config:
c = config.telegram_config
self.client = TelegramClient("./anon", c.api_id, c.api_hash)
self.client = TelegramClient("./anon.session", c.api_id, c.api_hash)
self.bot_token = c.bot_token
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):

View File

@@ -27,8 +27,10 @@ class ConfigV2:
def __init__(self) -> None:
self.defaults = {}
self.cli_ops = {}
self.config = {}
# TODO: make this work for nested props like gsheets_feeder.columns.url = "URL"
def parse(self):
# 1. parse CLI values
parser = argparse.ArgumentParser(
@@ -41,27 +43,15 @@ class ConfigV2:
for configurable in self.configurable_parents:
child: Step
# print(f"{configurable=}")
for child in configurable.__subclasses__():
# print(f"{child=} {child.configs()=}")
for config, details in child.configs().items():
print(config, details)
assert "." not in child.name, f"class prop name cannot contain dots('.'): {child.name}"
assert "." not in config, f"config property cannot contain dots('.'): {config}"
if (is_nested := type(details["default"]) == dict):
for subconfig, subdefault in details["default"].items():
assert "." not in subconfig, f"config subproperty cannot contain dots('.'): {subconfig}"
config_path = f"{child.name}.{config}.{subconfig}"
parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help'] + f"({subconfig})")
self.defaults[config_path] = subdefault
config_path = f"{child.name}.{config}"
print(config_path)
parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help'])
self.defaults[config_path] = details["default"]
if not is_nested:
# nested cannot be directly set on the CLI
parser.add_argument(f'--{config_path}', action='store', dest=config_path, help=details['help'])
if "cli_set" in details:
self.cli_ops[config_path] = details["cli_set"]
args = parser.parse_args()
@@ -73,31 +63,14 @@ class ConfigV2:
# 3. CONFIGS: decide value with priority: CLI >> config.yaml >> default
self.config = defaultdict(dict)
for config_path, default in self.defaults.items():
config_steps = config_path.split(".")
if len(config_steps) == 2: # not nested
child, config = tuple(config_steps)
val = getattr(args, config_path, None)
if val is None:
val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default)
# self.config[child][config] = val
elif len(config_steps) == 3: # nested
child, config, subconfig = tuple(config_steps)
val = getattr(args, config_path)
if config not in self.config[child]:
self.config[child][config] = {}
if val is None:
val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, {}).get(subconfig, default)
print(child, config, subconfig, val)
self.config[child][config][subconfig] = val
# child, config = tuple(config_path.split("."))
# # print(config_path)
# val = getattr(args, config_path)
# # print(child, config, val)
# if val is None:
# val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default)
# self.config[child][config] = val
child, config = tuple(config_path.split("."))
val = getattr(args, config_path)
if val is not None and config_path in self.cli_ops:
val = self.cli_ops[config_path](val, default)
if val is None:
val = self.yaml_config.get("configurations", {}).get(child, {}).get(config, default)
# print(child, config, val)
self.config[child][config] = val
self.config = dict(self.config)
# 4. STEPS: read steps and validate they exist
@@ -105,11 +78,12 @@ class ConfigV2:
assert "archivers" in steps, "your configuration steps are missing the archivers property"
assert "storages" in steps, "your configuration steps are missing the storages property"
print("config.py", self.config)
# print("config.py", self.config)
self.feeder = Feeder.init(steps.get("feeder", "cli_feeder"), self.config)
self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])]
print("feeder", self.feeder)
print("enrichers", [e for e in self.enrichers])
def validate(self):

View File

@@ -9,7 +9,8 @@ class Enricher(Step, ABC):
name = "enricher"
def __init__(self, config: dict) -> None:
Step.__init__(self)
# without this STEP.__init__ is not called
super().__init__(config)
# only for typing...

2
src/feeders/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from.feeder import Feeder
from .feeder_gsheet import GsheetsFeeder

23
src/feeders/feeder.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
# from metadata import Metadata
from step import Step
@dataclass
class Feeder(Step):
name = "feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def init(name: str, config: dict) -> Feeder:
# only for code typing
return Step.init(name, config, Feeder)
# def feed(self, item: Metadata) -> Metadata: pass
@abstractmethod
def __iter__(self) -> Feeder: return None

View File

@@ -0,0 +1,101 @@
import json, gspread
# from metadata import Metadata
from loguru import logger
# from . import Enricher
from feeders.feeder import Feeder
from utils import GWorksheet
class GsheetsFeeder(Feeder):
name = "gsheets_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.gsheets_client = gspread.service_account(filename=self.service_account)
assert type(self.header) == int, f"header ({self.header}) value must be an integer not {type(self.header)}"
@staticmethod
def configs() -> dict:
return {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"header": {"default": 1, "help": "index of the header row (starts at 1)"},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path"},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed, defaults to empty",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"columns": {
"default": {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'thumbnail_index': 'thumbnail index',
'timestamp': 'upload timestamp',
'title': 'upload title',
'duration': 'duration',
'screenshot': 'screenshot',
'hash': 'hash',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
},
"help": "names of columns in the google sheet",
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
},
}
def __iter__(self) -> str:
sh = self.gsheets_client.open(self.sheet)
for ii, wks in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title):
logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}')
gw = GWorksheet(wks, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}")
continue
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
#TODO: gsheet_db should check later if this is supposed to be archived
# static_status = gw.get_cell(row, 'status')
# status = gw.get_cell(row, 'status', fresh=static_status in ['', None] and url != '')
# All checks done - archival process starts here
yield url
logger.success(f'Finished worksheet {wks.title}')
# GWorksheet(self.sheet)
print(self.sheet)
for u in ["url1", "url2"]:
yield u
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing

View File

@@ -2,6 +2,8 @@ from __future__ import annotations
from typing import Union, Dict
from dataclasses import dataclass
from enrichers.enricher import Enricher
"""
how not to couple the different pieces of logic
due to the use of constants for the metadata keys?
@@ -110,49 +112,47 @@ class ArchivingOrchestrator:
def __init__(self, config) -> None:
# in config.py we should test that the archivers exist and log mismatches (blocking execution)
# identify each formatter, storage, database, etc
self.feeder = Feeder.init(config.feeder, config.get(config.feeder))
# self.feeder = Feeder.init(config.feeder, config.get(config.feeder))
# Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheets_feeder.sheet via CLI
# where does that update/processing happen? in config.py
# reflection for Archiver to know wihch child classes it has? use Archiver.__subclasses__
self.archivers = [
Archiver.init(a, config)
for a in config.archivers
]
# self.archivers = [
# Archiver.init(a, config)
# for a in config.archivers
# ]
self.feeder = config.feeder
self.enrichers = config.enrichers
self.enrichers = [
Enricher.init(e, config)
for e in config.enrichers
]
# self.formatters = [
# Formatter.init(f, config)
# for f in config.formatters
# ]
self.formatters = [
Formatter.init(f, config)
for f in config.formatters
]
# self.storages = [
# Storage.init(s, config)
# for s in config.storages
# ]
self.storages = [
Storage.init(s, config)
for s in config.storages
]
self.databases = [
Database.init(f, config)
for f in config.formatters
]
# self.databases = [
# Database.init(f, config)
# for f in config.formatters
# ]
# these rules are checked in config.py
assert len(archivers) > 1, "there needs to be at least one Archiver"
# assert len(archivers) > 1, "there needs to be at least one Archiver"
def feed(self, feeder: Feeder) -> list(ArchiveResult):
for next in feeder:
self.archive(next)
def feed(self) -> list(ArchiveResult):
for url in self.feeder:
print("ARCHIVING", url)
# self.archive(url)
# how does this handle the parameters like folder which can be different for each archiver?
# the storage needs to know where to archive!!
# solution: feeders have context: extra metadata that they can read or ignore,
# all of it should have sensible defaults (eg: folder)
# default feeder is a list with 1 element
def archive(url) -> Union[ArchiveResult, None]:
def archive(self, url) -> Union[ArchiveResult, None]:
url = clear_url(url)
result = Metadata(url=url)

View File

@@ -1,16 +1,21 @@
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from inspect import ClassFoundException
from typing import Type
from metadata import Metadata
from abc import ABC
# from collections.abc import Iterable
@dataclass
class Step(ABC):
name : str = None
name: str = None
def __init__(self, config: dict) -> None:
self.config = self.config[self.name]
# reads the configs into object properties
# self.config = config[self.name]
for k, v in config[self.name].items():
self.__setattr__(k, v)
@staticmethod
def configs() -> dict: {}
@@ -21,8 +26,9 @@ class Step(ABC):
"""
for sub in child.__subclasses__():
if sub.name == name:
return sub.__init__(config)
raise f"Unable to initialize class with {name=}"
print(sub.name, "CALLING NEW")
return sub(config)
raise ClassFoundException(f"Unable to initialize STEP with {name=}")
def get_url(self, item: Metadata) -> str:
url = item.get("url")

View File

@@ -1,4 +1,4 @@
# we need to explicitly expose the available imports here
from .gworksheet import *
from .gworksheet import GWorksheet
from .misc import *
from .util import Util

View File

@@ -11,8 +11,7 @@ class Util(Step, ABC):
def __init__(self, config: dict) -> None:
Step.__init__(self)
# only for typing...
# only for typing...
def init(name: str, config: dict) -> Util:
return super().init(name, config, Util)

View File

@@ -1,9 +1,12 @@
from abc import ABC
from configs.v2config import ConfigV2
from orchestrator import ArchivingOrchestrator
config = ConfigV2()
config.parse()
# orchestrator = ArchivingOrchestrator(config)
orchestrator = ArchivingOrchestrator(config)
orchestrator.feed()