diff --git a/src/archivers/archiver.py b/src/archivers/archiver.py index 538804e..37f5d4d 100644 --- a/src/archivers/archiver.py +++ b/src/archivers/archiver.py @@ -3,6 +3,7 @@ from abc import abstractmethod from dataclasses import dataclass from metadata import Metadata from steps.step import Step +import mimetypes, requests @dataclass @@ -12,9 +13,9 @@ class Archiverv2(Step): def __init__(self, config: dict) -> None: # without this STEP.__init__ is not called super().__init__(config) - # self.setup() # only for typing... + def init(name: str, config: dict) -> Archiverv2: return Step.init(name, config, Archiverv2) @@ -22,5 +23,23 @@ class Archiverv2(Step): # used when archivers need to login or do other one-time setup pass + def _guess_file_type(self, path: str) -> str: + """ + Receives a URL or filename and returns global mimetype like 'image' or 'video' + see https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types + """ + mime = mimetypes.guess_type(path)[0] + if mime is not None: + return mime.split("/")[0] + return "" + + def download_from_url(self, url:str, to_filename:str) -> None: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36' + } + d = requests.get(url, headers=headers) + with open(to_filename, 'wb') as f: + f.write(d.content) + @abstractmethod def download(self, item: Metadata) -> Metadata: pass diff --git a/src/archivers/telethon_archiverv2.py b/src/archivers/telethon_archiverv2.py index a4273b0..267dc2d 100644 --- a/src/archivers/telethon_archiverv2.py +++ b/src/archivers/telethon_archiverv2.py @@ -7,8 +7,7 @@ from telethon.tl.functions.messages import ImportChatInviteRequest from telethon.errors.rpcerrorlist import UserAlreadyParticipantError, FloodWaitError, InviteRequestSentError, InviteHashExpiredError from loguru import logger from tqdm import tqdm -import re, time, json - +import re, time, json, os class TelethonArchiver(Archiverv2): @@ -38,6 +37,10 @@ class TelethonArchiver(Archiverv2): } def setup(self) -> None: + """ + 1. trigger login process for telegram or proceed if already saved in a session file + 2. joins channel_invites where needed + """ logger.info(f"SETUP {self.name} checking login...") with self.client.start(): pass @@ -56,11 +59,11 @@ class TelethonArchiver(Archiverv2): channel_id = channel_invite.get("id", False) invite = channel_invite["invite"] if (match := self.invite_pattern.search(invite)): - try: + try: if channel_id: - ent = self.client.get_entity(int(channel_id)) # fails if not a member + ent = self.client.get_entity(int(channel_id)) # fails if not a member else: - ent = self.client.get_entity(invite) # fails if not a member + ent = self.client.get_entity(invite) # fails if not a member logger.warning(f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting.") except ValueError as e: logger.info(f"joining new channel {invite=}") @@ -80,35 +83,80 @@ class TelethonArchiver(Archiverv2): continue else: logger.warning(f"Invalid invite link {invite}") - i+=1 + i += 1 pbar.update() - def download(self, item: Metadata) -> Metadata: - url = self.get_url(item) + url = item.get_url() + print(f"downloading {url=}") # detect URLs that we definitely cannot handle match = self.link_pattern.search(url) if not match: return False - # app will ask (stall for user input!) for phone number and auth code if anon.session not found - # TODO: not using bot_token since then private channels cannot be archived - # with self.client.start(bot_token=self.bot_token): - with self.client.start(): - # self.client(ImportChatInviteRequest('4kAkN49IKJBhZDk6')) - is_private = match.group(1) == "/c" - print(f"{is_private=}") - chat = int(match.group(2)) if is_private else match.group(2) - post_id = int(match.group(3)) + is_private = match.group(1) == "/c" + chat = int(match.group(2)) if is_private else match.group(2) + post_id = int(match.group(3)) + result = Metadata() + + # NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token) + with self.client.start(): try: post = self.client.get_messages(chat, ids=post_id) except ValueError as e: logger.error(f"Could not fetch telegram {url} possibly it's private: {e}") return False except ChannelInvalidError as e: - logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}") + logger.error(f"Could not fetch telegram {url}. This error may be fixed if you setup a bot_token in addition to api_id and api_hash (but then private channels will not be archived, we need to update this logic to handle both): {e}") return False if post is None: return False - print(post) + logger.info(f"fetched telegram {post.id=}") + + media_posts = self._get_media_posts_in_group(chat, post) + logger.debug(f'got {len(media_posts)=} for {url=}') + + tmp_dir = item.get("tmp_dir") + + group_id = post.grouped_id if post.grouped_id is not None else post.id + title = post.message + for mp in media_posts: + if len(mp.message) > len(title): title = mp.message # save the longest text found (usually only 1) + + # media can also be in entities + if mp.entities: + other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image"]] + logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}") + for om_url in other_media_urls: + filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}') + self.download_from_url(om_url, filename) + result.add_media(filename) + + filename_dest = os.path.join(tmp_dir, f'{chat}_{group_id}', str(mp.id)) + filename = self.client.download_media(mp.media, filename_dest) + if not filename: + logger.debug(f"Empty media found, skipping {str(mp)=}") + continue + result.add_media(filename) + + result.set("post", post).set_title(title).set_timestamp(post.date) + return result + + def _get_media_posts_in_group(self, chat, original_post, max_amp=10): + """ + Searches for Telegram posts that are part of the same group of uploads + The search is conducted around the id of the original post with an amplitude + of `max_amp` both ways + Returns a list of [post] where each post has media and is in the same grouped_id + """ + if getattr(original_post, "grouped_id", None) is None: + return [original_post] if getattr(original_post, "media", False) else [] + + search_ids = [i for i in range(original_post.id - max_amp, original_post.id + max_amp + 1)] + posts = self.client.get_messages(chat, ids=search_ids) + media = [] + for post in posts: + if post is not None and post.grouped_id == original_post.grouped_id and post.media is not None: + media.append(post) + return media diff --git a/src/metadata.py b/src/metadata.py index d56fcd9..e1e8d8b 100644 --- a/src/metadata.py +++ b/src/metadata.py @@ -1,7 +1,9 @@ from __future__ import annotations +from ast import List from typing import Any, Union, Dict from dataclasses import dataclass +from datetime import datetime @dataclass @@ -15,8 +17,8 @@ class Metadata: metadata: Dict[str, Any] # TODO: remove and use default? - def __init__(self) -> None: - self.status = "" + def __init__(self, status="") -> None: + self.status = status self.metadata = {} # @staticmethod @@ -27,14 +29,38 @@ class Metadata: pass # TODO: setters? - def set(self, key: str, val: Any) -> Union[Metadata, str]: + def set(self, key: str, val: Any) -> Metadata: # goes through metadata and returns the Metadata available self.metadata[key] = val + return self def get(self, key: str, default: Any = None) -> Union[Metadata, str]: # goes through metadata and returns the Metadata available return self.metadata.get(key, default) +# custom getter/setters + + def set_url(self, url: str) -> Metadata: + assert type(url) is str and len(url) > 0, "invalid URL" + return self.set("url", url) + + def get_url(self) -> str: + url = self.get("url") + assert type(url) is str and len(url) > 0, "invalid URL" + return url + + def get_media(self) -> List: + return self.get("media", []) + + def set_title(self, title: str) -> Metadata: + return self.set("title", title) + + def set_timestamp(self, title: datetime) -> Metadata: + return self.set("title", title) + + def add_media(self, filename: str) -> Metadata: + return self.get_media().append(filename) + def as_json(self) -> str: # converts all metadata and data into JSON pass diff --git a/src/orchestrator.py b/src/orchestrator.py index 5889497..804948e 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -6,6 +6,7 @@ from archivers.archiver import Archiverv2 from enrichers.enricher import Enricher from metadata import Metadata +import tempfile, time """ how not to couple the different pieces of logic @@ -155,19 +156,24 @@ class ArchivingOrchestrator: def feed(self) -> list(ArchiveResult): for url in self.feeder: print("ARCHIVING", url) - self.archive(url) + with tempfile.TemporaryDirectory(dir="./") as tmp_dir: + self.archive(url, tmp_dir) + + print("holding on") + time.sleep(300) # how does this handle the parameters like folder which can be different for each archiver? # the storage needs to know where to archive!! # solution: feeders have context: extra metadata that they can read or ignore, # all of it should have sensible defaults (eg: folder) # default feeder is a list with 1 element - def archive(self, url) -> Union[ArchiveResult, None]: + def archive(self, url: str, tmp_dir: str) -> Union[Metadata, None]: # TODO: # url = clear_url(url) # result = Metadata(url=url) result = Metadata() - result.set("url", url) + result.set_url(url) + result.set("tmp_dir", tmp_dir) should_archive = True for d in self.databases: should_archive &= d.should_process(url) diff --git a/src/steps/step.py b/src/steps/step.py index 04d7a61..4d7e6c1 100644 --- a/src/steps/step.py +++ b/src/steps/step.py @@ -29,8 +29,3 @@ class Step(ABC): print(sub.name, "CALLING NEW") return sub(config) raise ClassFoundException(f"Unable to initialize STEP with {name=}") - - def get_url(self, item: Metadata) -> str: - url = item.get("url") - assert type(url) is str and len(url) > 0 - return url