From 9d3cd5774bbdc91e3e756eb9b9a3ad047a755e05 Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Sun, 6 Jul 2025 14:04:01 +0100 Subject: [PATCH] an improved approach for #295 --- .../antibot_extractor_enricher.py | 5 +- .../dropins/tiktok.py | 8 +- .../modules/generic_extractor/tiktok.py | 92 +++++++++++++------ .../extractors/test_tiktok_tikwm_extractor.py | 39 ++++---- 4 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/auto_archiver/modules/antibot_extractor_enricher/antibot_extractor_enricher.py b/src/auto_archiver/modules/antibot_extractor_enricher/antibot_extractor_enricher.py index d1a4ee5..ba1dfda 100644 --- a/src/auto_archiver/modules/antibot_extractor_enricher/antibot_extractor_enricher.py +++ b/src/auto_archiver/modules/antibot_extractor_enricher/antibot_extractor_enricher.py @@ -97,7 +97,10 @@ class AntibotExtractorEnricher(Extractor, Enricher): sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future dropin = self._get_suitable_dropin(url, sb) - dropin.open_page(url) + if not dropin.open_page(url): + # TODO: could we detect deleted videos? + logger.warning("Failed to open drop-in page") + return False if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)): logger.warning("Skipping since auth wall or CAPTCHA was detected") diff --git a/src/auto_archiver/modules/antibot_extractor_enricher/dropins/tiktok.py b/src/auto_archiver/modules/antibot_extractor_enricher/dropins/tiktok.py index 82b4f21..5c95c82 100644 --- a/src/auto_archiver/modules/antibot_extractor_enricher/dropins/tiktok.py +++ b/src/auto_archiver/modules/antibot_extractor_enricher/dropins/tiktok.py @@ -1,17 +1,20 @@ from contextlib import suppress from typing import Mapping + +from auto_archiver.utils.custom_logger import logger from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin class TikTokDropin(Dropin): """ A class to handle TikTok drop-in functionality for the antibot extractor enricher module. + """ def documentation() -> Mapping[str, str]: return { "name": "TikTok Dropin", - "description": "Handles TikTok posts and works without authentication.", + "description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.", "site": "tiktok.com", } @@ -33,6 +36,9 @@ class TikTokDropin(Dropin): # TODO: implement login logic if url != self.sb.get_current_url(): return False + if self.sb.is_text_visible("Video currently unavailable"): + logger.debug("Video may have been removed or is private.") + return False return True def hit_auth_wall(self) -> bool: diff --git a/src/auto_archiver/modules/generic_extractor/tiktok.py b/src/auto_archiver/modules/generic_extractor/tiktok.py index 36e8f74..77da850 100644 --- a/src/auto_archiver/modules/generic_extractor/tiktok.py +++ b/src/auto_archiver/modules/generic_extractor/tiktok.py @@ -1,3 +1,4 @@ +import re import requests from auto_archiver.utils.custom_logger import logger @@ -14,12 +15,16 @@ class Tiktok(GenericDropin): It's useful for capturing content that requires a login, like sensitive content. """ + # Regex pattern to match TikTok photo post URLs + PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+" TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}" def suitable(self, url, info_extractor) -> bool: """This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users. Return the 'suitable' method from the TikTokIE class.""" - return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) + return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or ( + re.match(self.PHOTO_URL_REGEX, url) is not None + ) def extract_post(self, url: str, ie_instance): logger.debug("Using Tikwm API to attempt to download tiktok video") @@ -28,56 +33,91 @@ class Tiktok(GenericDropin): r = requests.get(endpoint) if r.status_code != 200: - raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:") + raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com") try: json_response = r.json() except ValueError: - raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}") + raise ValueError("Failed to parse JSON response from tikwm.com") if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})): - raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}") + raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}") # tries to get the non-watermarked version first - video_url = api_data.pop("play", api_data.pop("wmplay", None)) - if not video_url: - raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}") - - api_data["video_url"] = video_url + play_url = api_data.pop("play", api_data.pop("wmplay", None)) + if play_url and "mime_type=audio" in play_url: + play_url = None + if play_url: + api_data["video_url"] = play_url return api_data def keys_to_clean(self, video_data: dict, info_extractor): - return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"] + return [ + "video_url", + "title", + "create_time", + "author", + "cover", + "origin_cover", + "ai_dynamic_cover", + "duration", + "size", + "wm_size", + "music", + "music_info", + "play_count", + "digg_count", + "comment_count", + "share_count", + "download_count", + "collect_count", + "anchors", + "anchors_extras", + "is_ad", + "commerce_info", + "commercial_video_info", + "item_comment_settings", + "mentioned_users", + ] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor def create_metadata(self, post: dict, ie_instance, archiver, url): # prepare result, start by downloading video result = Metadata() - video_url = post.pop("video_url") - + is_success = False # get the cover if possible cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None))) if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)): result.add_media(Media(cover_downloaded)) - # get the video or fail - video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}") - if not video_downloaded: - logger.error("Failed to download video") - return False - video_media = Media(video_downloaded) - if duration := post.get("duration", None): - video_media.set("duration", duration) - result.add_media(video_media) + for image_url in post.pop("images", []): + if image_downloaded := archiver.download_from_url(image_url): + result.add_media(Media(image_downloaded)) + is_success = True # this is an images post and we got it/them + + # get the video if present, could be an image post + if video_url := post.pop("video_url", None): + video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}") + if not video_downloaded: + logger.error("Failed to download video") + return False + video_media = Media(video_downloaded) + if duration := post.pop("duration", None): + video_media.set("duration", duration) + result.add_media(video_media) + is_success = True # this is a video post and we got it # add remaining metadata - result.set_title(post.get("title", "")) + result.set_title(post.pop("title", "")) - if created_at := post.get("create_time", None): + if created_at := post.pop("create_time", None): result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc)) - if author := post.get("author", None): + if author := post.pop("author", None): result.set("author", author) - result.set("api_data", post) - + result.set("api_data", {k: v for k, v in post.items() if v}) + if is_success: + result.success("yt-dlp_TikTok") + else: + raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.") return result diff --git a/tests/extractors/test_tiktok_tikwm_extractor.py b/tests/extractors/test_tiktok_tikwm_extractor.py index 81f29a5..681e591 100644 --- a/tests/extractors/test_tiktok_tikwm_extractor.py +++ b/tests/extractors/test_tiktok_tikwm_extractor.py @@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase): ("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True), ("https://www.tiktok.com/t/ZP8YQ8e5j/", True), ("https://vt.tiktok.com/ZSMTJeqRP/", True), + ("https://tiktok.com/@user/photo/123?lang=en", True), ], ) def test_is_suitable(self, url, is_suitable, tiktok_dropin): @@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase): mock_get.assert_called_once() mock_get.return_value.json.assert_called_once() # first message is just the 'Skipping using ytdlp to download files for TikTok' message - assert ( - "failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'" - in caplog.text - ) + assert "Failed to parse JSON response from tikwm.com" in caplog.text mock_get.return_value.json.side_effect = Exception with caplog.at_level("ERROR"): @@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase): mock_get.assert_called() assert mock_get.call_count == 2 assert mock_get.return_value.json.call_count == 2 - assert ( - "failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'" - in caplog.text - ) + assert "Failed to parse JSON response from tikwm.com" in caplog.text @pytest.mark.parametrize( "response", @@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase): assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False mock_get.assert_called_once() mock_get.return_value.json.assert_called_once() - assert "failed to get a valid response from tikwm.com" in caplog.text + assert "Unable to download with tikwm.com: " in caplog.text @pytest.mark.parametrize( - "response,has_vid", + "response,is_success", [ - ({"data": {"id": 123}}, False), - ({"data": {"wmplay": "url"}}, True), - ({"data": {"play": "url"}}, True), + ({"data": {"id": 123, "images": []}}, False), + ({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True), + ({"data": {"play": "url", "images": ["img1.jpg"]}}, True), + ({"data": {"images": ["img1.jpg"]}}, True), ], ) - def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker): + def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker): + data = {k: v for k, v in response.get("data", {}).items()} mock_get.return_value.status_code = 200 mock_get.return_value.json.return_value = {"msg": "success", **response} result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) - if not has_vid: - assert result is False - else: + total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0) + if is_success: assert result.is_success() - assert len(result.media) == 1 + assert len(result.media) == total_media + else: + assert result is False mock_get.assert_called() - assert mock_get.call_count == 1 + int(has_vid) + assert mock_get.call_count == 1 + total_media mock_get.return_value.json.assert_called_once() def test_correct_data_extracted(self, mock_get, make_item): @@ -142,7 +140,8 @@ class TestTiktokTikwmExtractor(TestExtractorBase): assert len(result.media) == 2 assert result.get_title() == "Title" assert result.get("author") == "Author" - assert result.get("api_data") == {"other": "data", "id": 123} + assert result.get("other") == "data" + assert result.get("api_data") == {"id": 123, "other": "data"} assert result.media[1].get("duration") == 60 assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)