From f8e846d59a8181d0d73c7fb81d9d562e06192d49 Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Tue, 25 Feb 2025 11:44:35 +0000 Subject: [PATCH] Create facebook dropin - working for images + text. CAVEAT: only gets the first ~100 chars of the post at the moment --- .../modules/generic_extractor/dropin.py | 12 ++++- .../modules/generic_extractor/facebook.py | 44 +++++++++++++------ .../generic_extractor/generic_extractor.py | 31 ++++++++++--- .../modules/local_storage/__manifest__.py | 2 +- tests/extractors/test_generic_extractor.py | 44 ++++++++++++++++++- 5 files changed, 110 insertions(+), 23 deletions(-) diff --git a/src/auto_archiver/modules/generic_extractor/dropin.py b/src/auto_archiver/modules/generic_extractor/dropin.py index c5749ff..22f1792 100644 --- a/src/auto_archiver/modules/generic_extractor/dropin.py +++ b/src/auto_archiver/modules/generic_extractor/dropin.py @@ -1,3 +1,4 @@ +from typing import Type from yt_dlp.extractor.common import InfoExtractor from auto_archiver.core.metadata import Metadata from auto_archiver.core.extractor import Extractor @@ -23,6 +24,8 @@ class GenericDropin: """ + extractor: Type[Extractor] = None + def extract_post(self, url: str, ie_instance: InfoExtractor): """ This method should return the post data from the url. @@ -55,4 +58,11 @@ class GenericDropin: """ This method should download any additional media from the post. """ - return metadata \ No newline at end of file + return metadata + + def is_suitable(self, url, info_extractor: InfoExtractor): + """ + Used to override the InfoExtractor's 'is_suitable' method. Dropins should override this method to return True if the url is suitable for the extractor + (based on being able to parse other URLs) + """ + return False \ No newline at end of file diff --git a/src/auto_archiver/modules/generic_extractor/facebook.py b/src/auto_archiver/modules/generic_extractor/facebook.py index fed8e09..d41c484 100644 --- a/src/auto_archiver/modules/generic_extractor/facebook.py +++ b/src/auto_archiver/modules/generic_extractor/facebook.py @@ -1,18 +1,36 @@ +import re from .dropin import GenericDropin - +from auto_archiver.core.metadata import Metadata +from auto_archiver.core.media import Media class Facebook(GenericDropin): - def extract_post(self, url: str, ie_instance): - video_id = ie_instance._match_valid_url(url).group('id') - ie_instance._download_webpage( - url.replace('://m.facebook.com/', '://www.facebook.com/'), video_id) - webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group('id')) - - # TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged - post_data = ie_instance._extract_metadata(webpage) - return post_data + def extract_post(self, url: str, ie_instance): + post_id_regex = r'(?Ppfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))' + post_id = re.search(post_id_regex, url).group('id') + webpage = ie_instance._download_webpage( + url.replace('://m.facebook.com/', '://www.facebook.com/'), post_id) + + # WARN: Will only work once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged + # TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ... + post_data = ie_instance._extract_metadata(webpage, post_id) + return post_data + def create_metadata(self, post: dict, ie_instance, archiver, url): - metadata = archiver.create_metadata(url) - metadata.set_title(post.get('title')).set_content(post.get('description')).set_post_data(post) - return metadata \ No newline at end of file + result = Metadata() + result.set_content(post.get('description', '')) + result.set_title(post.get('title', '')) + result.set('author', post.get('uploader', '')) + result.set_url(url) + return result + + def is_suitable(self, url, info_extractor): + regex = r'(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)' + return re.match(regex, url) + + def skip_ytdlp_download(self, url: str, ie_instance): + """ + Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX + """ + if re.search(r'/t.\d+/\d+', url): + return True \ No newline at end of file diff --git a/src/auto_archiver/modules/generic_extractor/generic_extractor.py b/src/auto_archiver/modules/generic_extractor/generic_extractor.py index 72fe3e0..cc0cbea 100644 --- a/src/auto_archiver/modules/generic_extractor/generic_extractor.py +++ b/src/auto_archiver/modules/generic_extractor/generic_extractor.py @@ -8,6 +8,8 @@ from loguru import logger from auto_archiver.core.extractor import Extractor from auto_archiver.core import Metadata, Media +class Skip(Exception): + pass class GenericExtractor(Extractor): _dropins = {} @@ -15,8 +17,20 @@ class GenericExtractor(Extractor): """ Returns a list of valid extractors for the given URL""" for info_extractor in yt_dlp.YoutubeDL()._ies.values(): - if info_extractor.suitable(url) and info_extractor.working(): + if not info_extractor.working(): + continue + + # check if there's a dropin and see if that declares whether it's suitable + dropin = self.dropin_for_name(info_extractor.ie_key()) + if dropin and dropin.is_suitable(url, info_extractor): yield info_extractor + continue + + if info_extractor.suitable(url): + yield info_extractor + continue + + def suitable(self, url: str) -> bool: """ @@ -129,7 +143,8 @@ class GenericExtractor(Extractor): return False post_data = dropin.extract_post(url, ie_instance) - return dropin.create_metadata(post_data, ie_instance, self, url) + result = dropin.create_metadata(post_data, ie_instance, self, url) + return self.add_metadata(post_data, info_extractor, url, result) def get_metadata_for_video(self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: @@ -181,6 +196,7 @@ class GenericExtractor(Extractor): dropin_class_name = dropin_name.title() def _load_dropin(dropin): dropin_class = getattr(dropin, dropin_class_name)() + dropin.extractor = self return self._dropins.setdefault(dropin_name, dropin_class) try: @@ -225,8 +241,9 @@ class GenericExtractor(Extractor): dropin_submodule = self.dropin_for_name(info_extractor.ie_key()) try: - if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url): - raise Exception(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}") + if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor): + logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()} (dropin override)") + raise Skip() # don't download since it can be a live stream data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False) @@ -240,15 +257,17 @@ class GenericExtractor(Extractor): if info_extractor.ie_key() == "generic": # don't clutter the logs with issues about the 'generic' extractor not having a dropin return False + + if not isinstance(e, Skip): + logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead') - logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead') try: result = self.get_metadata_for_post(info_extractor, url, ydl) except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e: logger.error(f'Error downloading metadata for post: {post_e}') return False except Exception as generic_e: - logger.debug(f'Attempt to extract using ytdlp extractor "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True) + logger.debug(f'Attempt to extract using ytdlp dropin for "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True) return False if result: diff --git a/src/auto_archiver/modules/local_storage/__manifest__.py b/src/auto_archiver/modules/local_storage/__manifest__.py index 6d9cf53..ed978a7 100644 --- a/src/auto_archiver/modules/local_storage/__manifest__.py +++ b/src/auto_archiver/modules/local_storage/__manifest__.py @@ -17,7 +17,7 @@ "choices": ["random", "static"], }, "save_to": {"default": "./local_archive", "help": "folder where to save archived content"}, - "save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"}, + "save_absolute": {"default": True, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)"}, }, "description": """ LocalStorage: A storage module for saving archived content locally on the filesystem. diff --git a/tests/extractors/test_generic_extractor.py b/tests/extractors/test_generic_extractor.py index 54f4d9c..ac280f7 100644 --- a/tests/extractors/test_generic_extractor.py +++ b/tests/extractors/test_generic_extractor.py @@ -39,6 +39,17 @@ class TestGenericExtractor(TestExtractorBase): assert self.extractor.dropin_for_name("dropin", additional_paths=[path]) + @pytest.mark.parametrize("url, suitable_extractors", [ + ("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]), + ("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]), + ("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]), + ("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]), + ("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),]) + def test_suitable_extractors(self, url, suitable_extractors): + suitable_extractors = suitable_extractors + ['generic'] # the generic is valid for all + extractors = list(self.extractor.suitable_extractors(url)) + assert len(extractors) == len(suitable_extractors) + assert [e.ie_key().lower() for e in extractors] == suitable_extractors @pytest.mark.parametrize("url, is_suitable", [ ("https://www.youtube.com/watch?v=5qap5aO4i9A", True), @@ -48,7 +59,7 @@ class TestGenericExtractor(TestExtractorBase): ("https://www.twitch.tv/videos/1167226570", True), ("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True), ("https://google.com", True)]) - def test_suitable_urls(self, make_item, url, is_suitable): + def test_suitable_urls(self, url, is_suitable): """ Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for, @@ -209,4 +220,33 @@ class TestGenericExtractor(TestExtractorBase): timestamp ) assert len(post.media) == 1 - assert post.media[0].hash == image_hash \ No newline at end of file + assert post.media[0].hash == image_hash + + @pytest.mark.download + def test_download_facebook_video(self, make_item): + + post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133")) + assert len(post.media) == 2 + assert post.media[0].filename.endswith("588371253839133.mp4") + assert post.media[0].mimetype == "video/mp4" + + assert post.media[1].filename.endswith(".jpg") + assert post.media[1].mimetype == "image/jpeg" + + assert "Bellingchat Premium is with Kolina Koltai" in post.get_title() + + @pytest.mark.download + def test_download_facebook_image(self, make_item): + + post = self.extractor.download(make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/")) + + assert len(post.media) == 1 + assert post.media[0].filename.endswith(".png") + assert "Byline Festival - BylineFest Partner" == post.get_title() + + @pytest.mark.download + def test_download_facebook_text_only(self, make_item): + url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl" + post = self.extractor.download(make_item(url)) + assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get('content') + assert post.get_title() == "Bellingcat"