an improved approach for #295

This commit is contained in:
msramalho
2025-07-06 14:04:01 +01:00
parent 80d61e8b85
commit 9d3cd5774b
4 changed files with 96 additions and 48 deletions

View File

@@ -97,7 +97,10 @@ class AntibotExtractorEnricher(Extractor, Enricher):
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
dropin = self._get_suitable_dropin(url, sb) dropin = self._get_suitable_dropin(url, sb)
dropin.open_page(url) if not dropin.open_page(url):
# TODO: could we detect deleted videos?
logger.warning("Failed to open drop-in page")
return False
if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)): if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)):
logger.warning("Skipping since auth wall or CAPTCHA was detected") logger.warning("Skipping since auth wall or CAPTCHA was detected")

View File

@@ -1,17 +1,20 @@
from contextlib import suppress from contextlib import suppress
from typing import Mapping from typing import Mapping
from auto_archiver.utils.custom_logger import logger
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
class TikTokDropin(Dropin): class TikTokDropin(Dropin):
""" """
A class to handle TikTok drop-in functionality for the antibot extractor enricher module. A class to handle TikTok drop-in functionality for the antibot extractor enricher module.
""" """
def documentation() -> Mapping[str, str]: def documentation() -> Mapping[str, str]:
return { return {
"name": "TikTok Dropin", "name": "TikTok Dropin",
"description": "Handles TikTok posts and works without authentication.", "description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.",
"site": "tiktok.com", "site": "tiktok.com",
} }
@@ -33,6 +36,9 @@ class TikTokDropin(Dropin):
# TODO: implement login logic # TODO: implement login logic
if url != self.sb.get_current_url(): if url != self.sb.get_current_url():
return False return False
if self.sb.is_text_visible("Video currently unavailable"):
logger.debug("Video may have been removed or is private.")
return False
return True return True
def hit_auth_wall(self) -> bool: def hit_auth_wall(self) -> bool:

View File

@@ -1,3 +1,4 @@
import re
import requests import requests
from auto_archiver.utils.custom_logger import logger from auto_archiver.utils.custom_logger import logger
@@ -14,12 +15,16 @@ class Tiktok(GenericDropin):
It's useful for capturing content that requires a login, like sensitive content. It's useful for capturing content that requires a login, like sensitive content.
""" """
# Regex pattern to match TikTok photo post URLs
PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+"
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}" TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool: def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users. """This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class.""" Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or (
re.match(self.PHOTO_URL_REGEX, url) is not None
)
def extract_post(self, url: str, ie_instance): def extract_post(self, url: str, ie_instance):
logger.debug("Using Tikwm API to attempt to download tiktok video") logger.debug("Using Tikwm API to attempt to download tiktok video")
@@ -28,56 +33,91 @@ class Tiktok(GenericDropin):
r = requests.get(endpoint) r = requests.get(endpoint)
if r.status_code != 200: if r.status_code != 200:
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:") raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com")
try: try:
json_response = r.json() json_response = r.json()
except ValueError: except ValueError:
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}") raise ValueError("Failed to parse JSON response from tikwm.com")
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})): if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}") raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}")
# tries to get the non-watermarked version first # tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None)) play_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url: if play_url and "mime_type=audio" in play_url:
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}") play_url = None
if play_url:
api_data["video_url"] = video_url api_data["video_url"] = play_url
return api_data return api_data
def keys_to_clean(self, video_data: dict, info_extractor): def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"] return [
"video_url",
"title",
"create_time",
"author",
"cover",
"origin_cover",
"ai_dynamic_cover",
"duration",
"size",
"wm_size",
"music",
"music_info",
"play_count",
"digg_count",
"comment_count",
"share_count",
"download_count",
"collect_count",
"anchors",
"anchors_extras",
"is_ad",
"commerce_info",
"commercial_video_info",
"item_comment_settings",
"mentioned_users",
] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor
def create_metadata(self, post: dict, ie_instance, archiver, url): def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video # prepare result, start by downloading video
result = Metadata() result = Metadata()
video_url = post.pop("video_url") is_success = False
# get the cover if possible # get the cover if possible
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None))) cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)): if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded)) result.add_media(Media(cover_downloaded))
# get the video or fail for image_url in post.pop("images", []):
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}") if image_downloaded := archiver.download_from_url(image_url):
if not video_downloaded: result.add_media(Media(image_downloaded))
logger.error("Failed to download video") is_success = True # this is an images post and we got it/them
return False
video_media = Media(video_downloaded) # get the video if present, could be an image post
if duration := post.get("duration", None): if video_url := post.pop("video_url", None):
video_media.set("duration", duration) video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
result.add_media(video_media) if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
is_success = True # this is a video post and we got it
# add remaining metadata # add remaining metadata
result.set_title(post.get("title", "")) result.set_title(post.pop("title", ""))
if created_at := post.get("create_time", None): if created_at := post.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc)) result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.get("author", None): if author := post.pop("author", None):
result.set("author", author) result.set("author", author)
result.set("api_data", post) result.set("api_data", {k: v for k, v in post.items() if v})
if is_success:
result.success("yt-dlp_TikTok")
else:
raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.")
return result return result

View File

@@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True), ("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
("https://www.tiktok.com/t/ZP8YQ8e5j/", True), ("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
("https://vt.tiktok.com/ZSMTJeqRP/", True), ("https://vt.tiktok.com/ZSMTJeqRP/", True),
("https://tiktok.com/@user/photo/123?lang=en", True),
], ],
) )
def test_is_suitable(self, url, is_suitable, tiktok_dropin): def test_is_suitable(self, url, is_suitable, tiktok_dropin):
@@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called_once() mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once() mock_get.return_value.json.assert_called_once()
# first message is just the 'Skipping using ytdlp to download files for TikTok' message # first message is just the 'Skipping using ytdlp to download files for TikTok' message
assert ( assert "Failed to parse JSON response from tikwm.com" in caplog.text
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
mock_get.return_value.json.side_effect = Exception mock_get.return_value.json.side_effect = Exception
with caplog.at_level("ERROR"): with caplog.at_level("ERROR"):
@@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called() mock_get.assert_called()
assert mock_get.call_count == 2 assert mock_get.call_count == 2
assert mock_get.return_value.json.call_count == 2 assert mock_get.return_value.json.call_count == 2
assert ( assert "Failed to parse JSON response from tikwm.com" in caplog.text
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"response", "response",
@@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
mock_get.assert_called_once() mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once() mock_get.return_value.json.assert_called_once()
assert "failed to get a valid response from tikwm.com" in caplog.text assert "Unable to download with tikwm.com: " in caplog.text
@pytest.mark.parametrize( @pytest.mark.parametrize(
"response,has_vid", "response,is_success",
[ [
({"data": {"id": 123}}, False), ({"data": {"id": 123, "images": []}}, False),
({"data": {"wmplay": "url"}}, True), ({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True),
({"data": {"play": "url"}}, True), ({"data": {"play": "url", "images": ["img1.jpg"]}}, True),
({"data": {"images": ["img1.jpg"]}}, True),
], ],
) )
def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker): def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker):
data = {k: v for k, v in response.get("data", {}).items()}
mock_get.return_value.status_code = 200 mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"msg": "success", **response} mock_get.return_value.json.return_value = {"msg": "success", **response}
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
if not has_vid: total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0)
assert result is False if is_success:
else:
assert result.is_success() assert result.is_success()
assert len(result.media) == 1 assert len(result.media) == total_media
else:
assert result is False
mock_get.assert_called() mock_get.assert_called()
assert mock_get.call_count == 1 + int(has_vid) assert mock_get.call_count == 1 + total_media
mock_get.return_value.json.assert_called_once() mock_get.return_value.json.assert_called_once()
def test_correct_data_extracted(self, mock_get, make_item): def test_correct_data_extracted(self, mock_get, make_item):
@@ -142,7 +140,8 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert len(result.media) == 2 assert len(result.media) == 2
assert result.get_title() == "Title" assert result.get_title() == "Title"
assert result.get("author") == "Author" assert result.get("author") == "Author"
assert result.get("api_data") == {"other": "data", "id": 123} assert result.get("other") == "data"
assert result.get("api_data") == {"id": 123, "other": "data"}
assert result.media[1].get("duration") == 60 assert result.media[1].get("duration") == 60
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc) assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)