From 96845305a3ff265fe2690c91ab577fef659a7992 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Wed, 14 Dec 2022 19:01:20 +0000 Subject: [PATCH] media concept implemented --- src/archivers/telethon_archiverv2.py | 10 ++-- src/media.py | 17 +++++++ src/metadata.py | 63 +++++++++++--------------- src/orchestrator.py | 7 +-- src/storages/__init__.py | 4 +- src/storages/s3.py | 68 ++++++++++++++++++++++++++++ src/storages/storage.py | 21 +++++++++ 7 files changed, 145 insertions(+), 45 deletions(-) create mode 100644 src/media.py create mode 100644 src/storages/s3.py create mode 100644 src/storages/storage.py diff --git a/src/archivers/telethon_archiverv2.py b/src/archivers/telethon_archiverv2.py index 4fa3ce0..ea19c92 100644 --- a/src/archivers/telethon_archiverv2.py +++ b/src/archivers/telethon_archiverv2.py @@ -9,6 +9,8 @@ from loguru import logger from tqdm import tqdm import re, time, json, os +from media import Media + class TelethonArchiver(Archiverv2): name = "telethon" @@ -131,17 +133,17 @@ class TelethonArchiver(Archiverv2): other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image", "audio"]] if len(other_media_urls): logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}") - for om_url in other_media_urls: - filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}') + for i, om_url in enumerate(other_media_urls): + filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{i}') self.download_from_url(om_url, filename) - result.add_media(filename) + result.add_media(Media(filename)) filename_dest = os.path.join(tmp_dir, f'{chat}_{group_id}', str(mp.id)) filename = self.client.download_media(mp.media, filename_dest) if not filename: logger.debug(f"Empty media found, skipping {str(mp)=}") continue - result.add_media(filename) + result.add_media(Media(filename)) result.set("post", str(post)).set_title(title).set_timestamp(post.date) return result diff --git a/src/media.py b/src/media.py new file mode 100644 index 0000000..ecee4f4 --- /dev/null +++ b/src/media.py @@ -0,0 +1,17 @@ + +from __future__ import annotations +from ast import List +from typing import Any, Union, Dict +from dataclasses import dataclass +from datetime import datetime +import json + + +@dataclass +class Media: + filename: str + id: str = None + hash: str = None + cdn_url: str = None + hash: str = None + diff --git a/src/metadata.py b/src/metadata.py index 193003f..8945e1a 100644 --- a/src/metadata.py +++ b/src/metadata.py @@ -2,49 +2,38 @@ from __future__ import annotations from ast import List from typing import Any, Union, Dict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime -import json +# import json + +from media import Media @dataclass class Metadata: - # does not handle files, only primitives - # the only piece of logic to handle files is the archiver, enricher, and storage - status: str - # title: str - # url: str - # hash: str - metadata: Dict[str, Any] - - # TODO: remove and use default? - def __init__(self, status="") -> None: - self.status = status - self.metadata = {} + status: str = "" + metadata: Dict[str, Any] = field(default_factory=dict) + media: List[Media] = field(default_factory=list) def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata: """ - merges to Metadata instances, will overwrite according to overwrite_left flag + merges two Metadata instances, will overwrite according to overwrite_left flag """ - res = Metadata() if overwrite_left: - res.status = right.status - res.metadata = dict(self.metadata) # make a copy + self.status = right.status for k, v in right.metadata.items(): - print(type(v), type(self.get(k))) - # assert type(v) == type(self.get(k)) - if type(v) not in [dict, list, set] or k not in res.metadata: - res.set(k, v) + assert k not in self.metadata or type(v) == type(self.get(k)) + if type(v) not in [dict, list, set] or k not in self.metadata: + self.set(k, v) else: # key conflict - if type(v) in [dict, set]: res.set(k, self.get(k) | v) - elif type(v) == list: res.set(k, self.get(k) + v) + if type(v) in [dict, set]: self.set(k, self.get(k) | v) + elif type(v) == list: self.set(k, self.get(k) + v) + self.media.extend(right.media) else: # invert and do same logic return right.merge(self) - return res + return self - # TODO: setters? def set(self, key: str, val: Any) -> Metadata: - # goes through metadata and returns the Metadata available self.metadata[key] = val return self @@ -65,9 +54,6 @@ class Metadata: assert type(url) is str and len(url) > 0, "invalid URL" return url - def get_media(self) -> List: - return self.get("media", [], create_if_missing=True) - def set_content(self, content: str) -> Metadata: # the main textual content/information from a social media post, webpage, ... return self.set("content", content) @@ -75,14 +61,17 @@ class Metadata: def set_title(self, title: str) -> Metadata: return self.set("title", title) - def set_timestamp(self, title: datetime) -> Metadata: - return self.set("title", title) + def set_timestamp(self, timestamp: datetime) -> Metadata: + assert type(timestamp) == datetime, "set_timestamp expects a datetime instance" + return self.set("timestamp", timestamp) - def add_media(self, filename: str) -> Metadata: + def add_media(self, media: Media) -> Metadata: # print(f"adding {filename} to {self.metadata.get('media')}") # return self.set("media", self.get_media() + [filename]) - return self.get_media().append(filename) + # return self.get_media().append(media) + return self.media.append(media) - def as_json(self) -> str: - # converts all metadata and data into JSON - return json.dumps(self.metadata) + # def as_json(self) -> str: + # # converts all metadata and data into JSON + # return json.dumps(self.metadata) + # #TODO: datetime is not serializable diff --git a/src/orchestrator.py b/src/orchestrator.py index 9a523bf..2f33370 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -153,13 +153,14 @@ class ArchivingOrchestrator: # these rules are checked in config.py # assert len(archivers) > 1, "there needs to be at least one Archiver" - def feed(self) -> list(ArchiveResult): + def feed(self) -> list(Metadata): for url in self.feeder: print("ARCHIVING", url) with tempfile.TemporaryDirectory(dir="./") as tmp_dir: result = self.archive(url, tmp_dir) + print(type(result)) print(result) - print(result.as_json()) + # print(result.as_json()) print("holding on") time.sleep(300) # how does this handle the parameters like folder which can be different for each archiver? @@ -170,7 +171,7 @@ class ArchivingOrchestrator: def archive(self, url: str, tmp_dir: str) -> Union[Metadata, None]: # TODO: - # url = clear_url(url) + # url = clear_url(url) # should we save if they differ? # result = Metadata(url=url) result = Metadata() result.set_url(url) diff --git a/src/storages/__init__.py b/src/storages/__init__.py index 99f82b3..96baaba 100644 --- a/src/storages/__init__.py +++ b/src/storages/__init__.py @@ -2,4 +2,6 @@ from .base_storage import Storage from .local_storage import LocalStorage, LocalConfig from .s3_storage import S3Config, S3Storage -from .gd_storage import GDConfig, GDStorage \ No newline at end of file +from .gd_storage import GDConfig, GDStorage + +from .storage import StorageV2 \ No newline at end of file diff --git a/src/storages/s3.py b/src/storages/s3.py new file mode 100644 index 0000000..826d66d --- /dev/null +++ b/src/storages/s3.py @@ -0,0 +1,68 @@ + +from typing import IO +import boto3, uuid, os, mimetypes +from botocore.errorfactory import ClientError +from src.storages import StorageV2 +from loguru import logger +from slugify import slugify + + +class S3StorageV2(StorageV2): + name = "s3_storage" + + def __init__(self, config: dict) -> None: + super().__init__(config) + self.s3 = boto3.client( + 's3', + region_name=config.region, + endpoint_url=config.endpoint_url.format(region=config.region), + aws_access_key_id=config.key, + aws_secret_access_key=config.secret + ) + + @staticmethod + def configs() -> dict: + return { + "bucket": {"default": None, "help": "S3 bucket name"}, + "region": {"default": None, "help": "S3 region name"}, + "key": {"default": None, "help": "S3 API key"}, + "secret": {"default": None, "help": "S3 API secret"}, + # TODO: how to have sth like a custom folder? has to come from the feeders + "endpoint_url": { + "default": 'https://{region}.digitaloceanspaces.com', + "help": "S3 bucket endpoint, {region} are inserted at runtime" + }, + "cdn_url": { + "default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}', + "help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime" + }, + "private": {"default": False, "help": "if true S3 files will not be readable online"}, + "key_path": {"default": "random", "help": "S3 file names are non-predictable strings, one of ['random', 'default']"}, + } + + def get_cdn_url(self, key: str) -> str: + return self.cdn_url.format(bucket=self.bucket, region=self.region, key=self._get_path(key)) + + def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> None: + extra_args = kwargs.get("extra_args", {}) + if not self.private and 'ACL' not in extra_args: + extra_args['ACL'] = 'public-read' + + if 'ContentType' not in extra_args: + try: + extra_args['ContentType'] = mimetypes.guess_type(key)[0] + except Exception as e: + logger.error(f"Unable to get mimetype for {key=}, error: {e}") + + self.s3.upload_fileobj(file, Bucket=self.bucket, Key=self._get_path(key), ExtraArgs=extra_args) + + def exists(self, key: str) -> bool: + """ + Tests if a given file with key=key exists in the bucket + """ + try: + self.s3.head_object(Bucket=self.bucket, Key=self._get_path(key)) + return True + except ClientError as e: + logger.warning(f"got a ClientError when checking if {key=} exists in bucket={self.bucket}: {e}") + return False diff --git a/src/storages/storage.py b/src/storages/storage.py new file mode 100644 index 0000000..4052d7e --- /dev/null +++ b/src/storages/storage.py @@ -0,0 +1,21 @@ +from __future__ import annotations +from abc import abstractmethod +from dataclasses import dataclass +from metadata import Metadata +from steps.step import Step + + +@dataclass +class StorageV2(Step): + name = "storage" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + + # only for typing... + def init(name: str, config: dict) -> StorageV2: + return Step.init(name, config, StorageV2) + + @abstractmethod + def store(self, item: Metadata) -> Metadata: pass