Files
auto-archiver/src/auto_archiver/modules/generic_extractor/tiktok.py
msramalho 55d9ffaacd typo
2025-06-17 18:51:21 +01:00

84 lines
3.3 KiB
Python

import requests
from loguru import logger
from yt_dlp.extractor.tiktok import TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE
from auto_archiver.core import Metadata, Media
from datetime import datetime, timezone
from .dropin import GenericDropin
class Tiktok(GenericDropin):
"""
TikTok dropin for the Generic Extractor that uses an unofficial API if/when ytdlp fails.
It's useful for capturing content that requires a login, like sensitive content.
"""
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
def extract_post(self, url: str, ie_instance):
logger.debug(f"Using Tikwm API to attempt to download tiktok video from {url=}")
endpoint = self.TIKWM_ENDPOINT.format(url=url)
r = requests.get(endpoint)
if r.status_code != 200:
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
try:
json_response = r.json()
except ValueError:
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
api_data["video_url"] = video_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
video_url = post.pop("video_url")
# get the cover if possible
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error(f"failed to download video from {video_url}")
return False
video_media = Media(video_downloaded)
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
# add remaining metadata
result.set_title(post.get("title", ""))
if created_at := post.get("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.get("author", None):
result.set("author", author)
result.set("api_data", post)
return result