mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-10 20:28:28 +03:00
* clean orchestrator code, add archiver cleanup logic * improves documentation for database.py * telethon archivers isolate sessions into copied files * closes #127 * closes #125 * closes #84 * meta enricher applies to all media * closes #61 adds subtitles and comments * minor update * minor fixes to yt-dlp subtitles and comments * closes #17 but logic is imperfect. * closes #85 ssl enhancer * minimifies html, JS refactor for preview of certificates * closes #91 adds freetsa timestamp authority * version bump * simplify download_url method * skip ssl if nothing archived * html preview improvements * adds retrying lib * manual download archiver improvements * meta only runs when relevant data available * new metadata convenience method * html template improvements * removes debug message * does not close #91 yet, will need a few more certificate chaing logging * adds verbosity config * new instagram api archiver * adds proxy support we * adds proxy/end support and bug fix for yt-dlp * proxy support for webdriver * adds socks proxy to wacz_enricher * refactor recursivity in inner media and display * infinite recursive display * foolproofing timestamping authortities * version to 0.9.0 * minor fixes from code-review
42 lines
1.2 KiB
Python
42 lines
1.2 KiB
Python
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
from abc import abstractmethod, ABC
|
|
from typing import Union
|
|
|
|
from ..core import Metadata, Step
|
|
|
|
|
|
@dataclass
|
|
class Database(Step, ABC):
|
|
name = "database"
|
|
|
|
def __init__(self, config: dict) -> None:
|
|
# without this STEP.__init__ is not called
|
|
super().__init__(config)
|
|
|
|
def init(name: str, config: dict) -> Database:
|
|
# only for typing...
|
|
return Step.init(name, config, Database)
|
|
|
|
def started(self, item: Metadata) -> None:
|
|
"""signals the DB that the given item archival has started"""
|
|
pass
|
|
|
|
def failed(self, item: Metadata) -> None:
|
|
"""update DB accordingly for failure"""
|
|
pass
|
|
|
|
def aborted(self, item: Metadata) -> None:
|
|
"""abort notification if user cancelled after start"""
|
|
pass
|
|
|
|
# @abstractmethod
|
|
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
|
|
"""check and fetch if the given item has been archived already, each database should handle its own caching, and configuration mechanisms"""
|
|
return False
|
|
|
|
@abstractmethod
|
|
def done(self, item: Metadata, cached: bool=False) -> None:
|
|
"""archival result ready - should be saved to DB"""
|
|
pass
|