diff --git a/src/auto_archiver/archivers/archiver.py b/src/auto_archiver/archivers/archiver.py index 24bb53c..6fed8b7 100644 --- a/src/auto_archiver/archivers/archiver.py +++ b/src/auto_archiver/archivers/archiver.py @@ -34,6 +34,14 @@ class Archiver(Step): def sanitize_url(self, url: str) -> str: # used to clean unnecessary URL parameters OR unfurl redirect links return url + + def suitable(self, url: str) -> bool: + """ + Returns True if this archiver can handle the given URL + + Should be overridden by subclasses + """ + return True def _guess_file_type(self, path: str) -> str: """ diff --git a/src/auto_archiver/archivers/youtubedl_archiver.py b/src/auto_archiver/archivers/youtubedl_archiver.py index b13cceb..97ad569 100644 --- a/src/auto_archiver/archivers/youtubedl_archiver.py +++ b/src/auto_archiver/archivers/youtubedl_archiver.py @@ -33,7 +33,115 @@ class YoutubeDLArchiver(Archiver): "cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"}, "cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"}, } + + def download_additional_media(self, ie: str, video_data: dict, metadata: Metadata) -> Metadata: + """ + Downloads additional media like images, comments, subtitles, etc. + Creates a 'media' object and attaches it to the metadata object. + """ + + # TODO: should we download all thumbnails, or just the chosen thumbnail? + + # Right now, just getting the single thumbnail + thumbnail_url = video_data.get('thumbnail') + if thumbnail_url: + try: + cover_image_path = self.download_from_url(thumbnail_url) + media = Media(cover_image_path) + metadata.add_media(media, id="cover") + except Exception as e: + logger.error(f"Error downloading cover image {thumbnail_url}: {e}") + + return metadata + + def keys_to_clean(self, ie: str, video_data: dict) -> dict: + """ + Clean up the video data to make it more readable and remove unnecessary keys that ytdlp adds + """ + + base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads', + 'duration_string', 'thumbnails', 'http_headers', 'webpage_url_basename', 'webpage_url_domain', + 'extractor', 'extractor_key', 'playlist', 'playlist_index', 'duration_string', 'protocol', 'requested_subtitles', + 'format_id', 'acodec', 'vcodec', 'ext', 'epoch', '_has_drm', 'filesize', 'audio_ext', 'video_ext', 'vbr', 'abr', + 'resolution', 'dynamic_range', 'aspect_ratio', 'cookies', 'format', 'quality', 'preference', 'artists', + 'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status', + '_format_sort_fields', 'chapters', 'uploader_id', 'uploader_url', 'requested_formats', 'format_note', + 'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio'] + if ie == 'TikTok': + return base_keys + [] + + return base_keys + + def add_metadata(self, ie: str, video_data: dict, url:str, result: Metadata) -> Metadata: + """ + Creates a Metadata object from the give video_data + """ + + # first add the media + result = self.download_additional_media(ie, video_data, result) + + # keep the full title, no need for the shortened title (?) + video_data['title'] = video_data.pop('fulltitle', video_data.get('title')) + result.set_title(video_data.pop('title', url)) + + # then add the platform specific additional metadata + for key, mapping in self.video_data_metadata_mapping(ie, video_data).items(): + if isinstance(mapping, str): + result.set(key, eval(f"video_data{mapping}")) + elif callable(mapping): + result.set(key, mapping(video_data)) + result.set_url(url) + + # extract comments if enabled + if self.comments: + result.set("comments", [{ + "text": c["text"], + "author": c["author"], + "timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc) + } for c in video_data.get("comments", [])]) + + # then add the common metadata + if (timestamp := video_data.pop("timestamp", None)): + timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat() + result.set_timestamp(timestamp) + if (upload_date := video_data.pop("upload_date", None)): + upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc) + result.set("upload_date", upload_date) + + # then clean away any keys we don't want + for clean_key in self.keys_to_clean(ie, video_data): + video_data.pop(clean_key, None) + + # then add the rest of the video data + for k, v in video_data.items(): + if v: + result.set(k, v) + + return result + + def video_data_metadata_mapping(self, ie: str, video_data: dict) -> dict: + """ + Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object. + Can be either a string for direct mapping, or a function, or a lambda. + """ + return {} + + def suitable(self, item: Metadata) -> bool: + """ + Checks for valid URLs out of all ytdlp extractors. + Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites' + """ + url = item.get_url() + for ie_key, ie in yt_dlp.YoutubeDL()._ies.items(): + # Note: this will return True for *all* URLs due to the 'generic' extractor from ytdlp (valid for all URLs). + # should we check for the 'GenericIE' extractor and return False? + # if ie.IE_NAME == 'generic'... - leaving it in for now, since we also want the ability to download from generic sites + # perhaps one solution is to return 'False' initially, and then if no other installed archivers work, we try again using the generic one + if ie.suitable(url) and ie.working(): + return True + return False + def download(self, item: Metadata) -> Metadata: url = item.get_url() @@ -70,7 +178,6 @@ class YoutubeDLArchiver(Archiver): ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments}) #TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded? info = ydl.extract_info(url, download=True) - if "entries" in info: entries = info.get("entries", []) if not len(entries): @@ -78,9 +185,9 @@ class YoutubeDLArchiver(Archiver): return False else: entries = [info] + ie = info['extractor_key'] result = Metadata() - result.set_title(info.get("title")) - if "description" in info: result.set_content(info["description"]) + for entry in entries: try: filename = ydl.prepare_filename(entry) @@ -104,22 +211,11 @@ class YoutubeDLArchiver(Archiver): except Exception as e: logger.error(f"Error processing entry {entry}: {e}") - # extract comments if enabled - if self.comments: - result.set("comments", [{ - "text": c["text"], - "author": c["author"], - "timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc) - } for c in info.get("comments", [])]) + result = self.add_metadata(ie, info, url, result) + extractor_name = "yt-dlp" + if ie: + extractor_name += f"--{ie}IE" - if (timestamp := info.get("timestamp")): - #TODO: fix deprecated timestamp, - timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat() - result.set_timestamp(timestamp) - if (upload_date := info.get("upload_date")): - upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc) - result.set("upload_date", upload_date) - - if self.end_means_success: result.success("yt-dlp") - else: result.status = "yt-dlp" + if self.end_means_success: result.success(extractor_name) + else: result.status = extractor_name return result diff --git a/tests/archivers/test_archiver_base.py b/tests/archivers/test_archiver_base.py index 3c9ffbd..ed77739 100644 --- a/tests/archivers/test_archiver_base.py +++ b/tests/archivers/test_archiver_base.py @@ -13,7 +13,7 @@ class TestArchiverBase(object): def setup_archiver(self): assert self.archiver_class is not None, "self.archiver_class must be set on the subclass" assert self.config is not None, "self.config must be a dict set on the subclass" - self.archiver = self.archiver_class(self.config) + self.archiver = self.archiver_class({self.archiver_class.name: self.config}) def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""): assert test_response is not False diff --git a/tests/archivers/test_youtubedl_archiver.py b/tests/archivers/test_youtubedl_archiver.py new file mode 100644 index 0000000..f9f1d76 --- /dev/null +++ b/tests/archivers/test_youtubedl_archiver.py @@ -0,0 +1,57 @@ +import pytest +from pathlib import Path + +from auto_archiver.archivers.youtubedl_archiver import YoutubeDLArchiver + +from .test_archiver_base import TestArchiverBase + +class TestYoutubeDLArchiver(TestArchiverBase): + """Tests YoutubeDL Archiver + """ + archiver_class = YoutubeDLArchiver + config = { + 'subtitles': False, + 'comments': False, + 'livestreams': False, + 'live_from_start': False, + 'end_means_success': True, + 'allow_playlist': False, + 'max_downloads': "inf", + 'proxy': None, + 'cookies_from_browser': False, + 'cookie_file': None, + } + + @pytest.mark.parametrize("url, is_suitable", [ + ("https://www.youtube.com/watch?v=5qap5aO4i9A", True), + ("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", True), + ("https://www.instagram.com/p/CU1J9JYJ9Zz/", True), + ("https://www.facebook.com/nytimes/videos/10160796550110716", True), + ("https://www.twitch.tv/videos/1167226570", True), + ("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True), + ("https://google.com", True)]) + def test_suitable_urls(self, make_item, url, is_suitable): + """ + Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs + This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for, + and then if and only if all archivers fails, does it fall back to the generic archiver) + """ + assert self.archiver.suitable(make_item(url)) == is_suitable + + @pytest.mark.download + def test_download_tiktok(self, make_item): + item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970") + result = self.archiver.download(item) + assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970" + + @pytest.mark.download + def test_download_youtube(self, make_item): + # url https://www.youtube.com/watch?v=5qap5aO4i9A + item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ") + result = self.archiver.download(item) + assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ" + assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!" + assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/" + assert len(result.media) == 2 + assert Path(result.media[0].filename).name == "J---aiyznGQ.webm" + assert Path(result.media[1].filename).name == "hqdefault.jpg" \ No newline at end of file