mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-12 21:28:29 +03:00
Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97d133ce79 | ||
|
|
432ee3dcfd | ||
|
|
794b4f6052 | ||
|
|
965d7d41dd | ||
|
|
e73faa70cc | ||
|
|
80beab9f23 | ||
|
|
200cea4e12 | ||
|
|
1256fde159 | ||
|
|
65e222e177 | ||
|
|
f2eb9ef784 | ||
|
|
2081c16555 | ||
|
|
d3efd7121c | ||
|
|
9d3cd5774b | ||
|
|
80d61e8b85 | ||
|
|
d36cdbfa87 | ||
|
|
c1506ee1cf | ||
|
|
3a34a49822 | ||
|
|
37c6d97275 | ||
|
|
7234eda85f | ||
|
|
a8c1ef3912 | ||
|
|
52ed8196a5 | ||
|
|
2051e8e491 | ||
|
|
21255db86a | ||
|
|
eae0da08b3 | ||
|
|
0d1447117c |
@@ -21,7 +21,7 @@ This allows you to run the auto-archiver without the `poetry run` prefix.
|
|||||||
### Optional Development Packages
|
### Optional Development Packages
|
||||||
|
|
||||||
Install development packages (used for unit tests etc.) using:
|
Install development packages (used for unit tests etc.) using:
|
||||||
`poetry install -with dev`
|
`poetry install --with dev`
|
||||||
|
|
||||||
|
|
||||||
```{toctree}
|
```{toctree}
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ Note not all warnings can be fixed automatically.
|
|||||||
|
|
||||||
Most fixes are safe, but some non-standard practices such as dynamic loading are not picked up by linters. Ensure you check any modifications by this before committing them.
|
Most fixes are safe, but some non-standard practices such as dynamic loading are not picked up by linters. Ensure you check any modifications by this before committing them.
|
||||||
```shell
|
```shell
|
||||||
make ruff-fix
|
make ruff-clean
|
||||||
```
|
```
|
||||||
|
|
||||||
**Changing Configurations ⚙️**
|
**Changing Configurations ⚙️**
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
## Running Tests
|
## Running Tests
|
||||||
|
|
||||||
1. Make sure you've installed the dev dependencies with `pytest install --with dev`
|
1. Make sure you've installed the dev dependencies with `poetry install --with dev`
|
||||||
2. Tests can be run as follows:
|
2. Tests can be run as follows:
|
||||||
```{code} bash
|
```{code} bash
|
||||||
#### Command prefix of 'poetry run' removed here for simplicity
|
#### Command prefix of 'poetry run' removed here for simplicity
|
||||||
@@ -26,7 +26,7 @@ pytest -ra -v tests/test_file.py
|
|||||||
pytest -ra -v tests/test_file.py::test_function_name
|
pytest -ra -v tests/test_file.py::test_function_name
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Some tests require environment variables to be set. You can use the example `.env.test.example` file as a template. Copy it to `.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
|
3. Some tests require environment variables to be set. You can use the example `tests/.env.test.example` file as a template. Copy it to `tests/.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
|
||||||
```{code} bash
|
```{code} bash
|
||||||
cp .env.test.example .env.test
|
cp tests/.env.test.example tests/.env.test
|
||||||
```
|
```
|
||||||
@@ -4,8 +4,9 @@ Extractor modules are used to extract the content of a given URL. Typically, one
|
|||||||
|
|
||||||
Extractors that are able to extract content from a wide range of websites include:
|
Extractors that are able to extract content from a wide range of websites include:
|
||||||
1. Generic Extractor: parses videos and images on sites using the powerful yt-dlp library.
|
1. Generic Extractor: parses videos and images on sites using the powerful yt-dlp library.
|
||||||
2. Wayback Machine Extractor: sends pages to the Wayback machine for archiving, and stores the link.
|
2. Antibot Extractor: uses a headless browser to bypass bot detection and extract content.
|
||||||
3. WACZ Extractor: runs a web browser to 'browse' the URL and save a copy of the page in WACZ format.
|
3. WACZ Extractor: runs a web browser to 'browse' the URL and save a copy of the page in WACZ format.
|
||||||
|
4. Wayback Machine Extractor: sends pages to the Wayback machine for archiving, and stores the archived link.
|
||||||
|
|
||||||
```{include} autogen/extractor.md
|
```{include} autogen/extractor.md
|
||||||
```
|
```
|
||||||
|
|||||||
1998
poetry.lock
generated
1998
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "auto-archiver"
|
name = "auto-archiver"
|
||||||
version = "1.1.1"
|
version = "1.1.4"
|
||||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||||
|
|
||||||
requires-python = ">=3.10,<3.13"
|
requires-python = ">=3.10,<3.13"
|
||||||
@@ -58,6 +58,7 @@ dependencies = [
|
|||||||
"secretstorage (>=3.3.3,<4.0.0)",
|
"secretstorage (>=3.3.3,<4.0.0)",
|
||||||
"seleniumbase (>=4.36.4,<5.0.0)",
|
"seleniumbase (>=4.36.4,<5.0.0)",
|
||||||
"pyautogui (>=0.9.54,<0.10.0)",
|
"pyautogui (>=0.9.54,<0.10.0)",
|
||||||
|
"pyperclip (==1.8.2)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
|
|||||||
@@ -81,6 +81,9 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
|||||||
os.makedirs(self.user_data_dir, exist_ok=True)
|
os.makedirs(self.user_data_dir, exist_ok=True)
|
||||||
|
|
||||||
def enrich(self, to_enrich: Metadata, custom_data_dir: bool = True) -> bool:
|
def enrich(self, to_enrich: Metadata, custom_data_dir: bool = True) -> bool:
|
||||||
|
if to_enrich.get_media_by_id("html_source_code"):
|
||||||
|
logger.info("Antibot has already been executed, skipping.")
|
||||||
|
return True
|
||||||
using_user_data_dir = self.user_data_dir if custom_data_dir else None
|
using_user_data_dir = self.user_data_dir if custom_data_dir else None
|
||||||
url = to_enrich.get_url()
|
url = to_enrich.get_url()
|
||||||
|
|
||||||
@@ -94,9 +97,12 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
|||||||
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
|
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
|
||||||
|
|
||||||
dropin = self._get_suitable_dropin(url, sb)
|
dropin = self._get_suitable_dropin(url, sb)
|
||||||
dropin.open_page(url)
|
if not dropin.open_page(url):
|
||||||
|
# TODO: could we detect deleted videos?
|
||||||
|
logger.warning("Failed to open drop-in page")
|
||||||
|
return False
|
||||||
|
|
||||||
if self.detect_auth_wall and self._hit_auth_wall(sb):
|
if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)):
|
||||||
logger.warning("Skipping since auth wall or CAPTCHA was detected")
|
logger.warning("Skipping since auth wall or CAPTCHA was detected")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -274,8 +280,14 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
|||||||
return
|
return
|
||||||
url = to_enrich.get_url()
|
url = to_enrich.get_url()
|
||||||
all_urls = set()
|
all_urls = set()
|
||||||
|
logger.debug(f"Extracting media for {js_css_selector=}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
sources = sb.execute_script(js_css_selector)
|
||||||
|
except selenium.common.exceptions.JavascriptException as e:
|
||||||
|
logger.error(f"Error executing JavaScript selector {js_css_selector}: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
sources = sb.execute_script(js_css_selector)
|
|
||||||
# js_for_css_selectors
|
# js_for_css_selectors
|
||||||
for src in sources:
|
for src in sources:
|
||||||
if len(all_urls) >= max_media:
|
if len(all_urls) >= max_media:
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Mapping
|
from typing import Mapping
|
||||||
@@ -74,8 +75,11 @@ class Dropin:
|
|||||||
|
|
||||||
You can overwrite this instead of `images_selector` for more control over scraped images.
|
You can overwrite this instead of `images_selector` for more control over scraped images.
|
||||||
"""
|
"""
|
||||||
|
if not self.images_selectors():
|
||||||
|
return "return [];"
|
||||||
|
safe_selector = json.dumps(self.images_selectors())
|
||||||
return f"""
|
return f"""
|
||||||
return Array.from(document.querySelectorAll("{self.images_selectors()}")).map(el => el.src || el.href).filter(Boolean);
|
return Array.from(document.querySelectorAll({safe_selector})).map(el => el.src || el.href).filter(Boolean);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def js_for_video_css_selectors(self) -> str:
|
def js_for_video_css_selectors(self) -> str:
|
||||||
@@ -84,8 +88,11 @@ class Dropin:
|
|||||||
|
|
||||||
You can overwrite this instead of `video_selector` for more control over scraped videos.
|
You can overwrite this instead of `video_selector` for more control over scraped videos.
|
||||||
"""
|
"""
|
||||||
|
if not self.video_selectors():
|
||||||
|
return "return [];"
|
||||||
|
safe_selector = json.dumps(self.video_selectors())
|
||||||
return f"""
|
return f"""
|
||||||
return Array.from(document.querySelectorAll("{self.video_selectors()}")).map(el => el.src || el.href).filter(Boolean);
|
return Array.from(document.querySelectorAll({safe_selector})).map(el => el.src || el.href).filter(Boolean);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def open_page(self, url) -> bool:
|
def open_page(self, url) -> bool:
|
||||||
@@ -103,6 +110,12 @@ class Dropin:
|
|||||||
"""
|
"""
|
||||||
return 0, 0
|
return 0, 0
|
||||||
|
|
||||||
|
def hit_auth_wall(self) -> bool:
|
||||||
|
"""
|
||||||
|
Custom check to see if the current page is behind an authentication wall, if True is returned the default global auth wall detector is used instead. If false, no auth wall is detected and the page is considered open.
|
||||||
|
"""
|
||||||
|
return True
|
||||||
|
|
||||||
def _get_username_password(self, site) -> tuple[str, str]:
|
def _get_username_password(self, site) -> tuple[str, str]:
|
||||||
"""
|
"""
|
||||||
Get the username and password for the site from the extractor's auth data.
|
Get the username and password for the site from the extractor's auth data.
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
from contextlib import suppress
|
||||||
|
from typing import Mapping
|
||||||
|
|
||||||
|
from auto_archiver.utils.custom_logger import logger
|
||||||
|
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
|
||||||
|
|
||||||
|
|
||||||
|
class TikTokDropin(Dropin):
|
||||||
|
"""
|
||||||
|
A class to handle TikTok drop-in functionality for the antibot extractor enricher module.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def documentation() -> Mapping[str, str]:
|
||||||
|
return {
|
||||||
|
"name": "TikTok Dropin",
|
||||||
|
"description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.",
|
||||||
|
"site": "tiktok.com",
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def suitable(url: str) -> bool:
|
||||||
|
return "tiktok.com" in url
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def images_selectors() -> str:
|
||||||
|
return '[data-e2e="detail-photo"] img'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def video_selectors() -> str:
|
||||||
|
return None # TikTok videos should be handled by the generic extractor
|
||||||
|
|
||||||
|
def open_page(self, url) -> bool:
|
||||||
|
self.sb.wait_for_ready_state_complete()
|
||||||
|
self._close_cookies_banner()
|
||||||
|
# TODO: implement login logic
|
||||||
|
if url != self.sb.get_current_url():
|
||||||
|
return False
|
||||||
|
if self.sb.is_text_visible("Video currently unavailable"):
|
||||||
|
logger.debug("Video may have been removed or is private.")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def hit_auth_wall(self) -> bool:
|
||||||
|
return False # TikTok does not require authentication for public posts
|
||||||
|
|
||||||
|
def _close_cookies_banner(self):
|
||||||
|
with suppress(Exception): # selenium.common.exceptions.JavascriptException
|
||||||
|
self.sb.execute_script("""
|
||||||
|
document
|
||||||
|
.querySelector("tiktok-cookie-banner")
|
||||||
|
.shadowRoot.querySelector("faceplate-dialog")
|
||||||
|
.querySelector("button")
|
||||||
|
.click()
|
||||||
|
""")
|
||||||
|
self.sb.click_if_visible("Skip")
|
||||||
@@ -60,6 +60,10 @@ If you are having issues with the extractor, you can review the version of `yt-d
|
|||||||
"default": "",
|
"default": "",
|
||||||
"help": "http/https/socks proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port",
|
"help": "http/https/socks proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port",
|
||||||
},
|
},
|
||||||
|
"proxy_on_failure_only": {
|
||||||
|
"default": True,
|
||||||
|
"help": "Applies only if a proxy is set. In that case if this setting is True, the extractor will only use the proxy if the initial request fails; if it is False, the extractor will always use the proxy.",
|
||||||
|
},
|
||||||
"end_means_success": {
|
"end_means_success": {
|
||||||
"default": True,
|
"default": True,
|
||||||
"help": "if True, any archived content will mean a 'success', if False this extractor will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent extractors can retrieve.",
|
"help": "if True, any archived content will mean a 'success', if False this extractor will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent extractors can retrieve.",
|
||||||
|
|||||||
@@ -307,7 +307,7 @@ class GenericExtractor(Extractor):
|
|||||||
if "description" in video_data and not result.get("content"):
|
if "description" in video_data and not result.get("content"):
|
||||||
result.set_content(video_data.get("description"))
|
result.set_content(video_data.get("description"))
|
||||||
# extract comments if enabled
|
# extract comments if enabled
|
||||||
if self.comments and video_data.get("comments", []) is not None:
|
if self.comments and video_data.get("comments", None) is not None:
|
||||||
result.set(
|
result.set(
|
||||||
"comments",
|
"comments",
|
||||||
[
|
[
|
||||||
@@ -502,6 +502,9 @@ class GenericExtractor(Extractor):
|
|||||||
try:
|
try:
|
||||||
result = self.get_metadata_for_post(info_extractor, url, ydl)
|
result = self.get_metadata_for_post(info_extractor, url, ydl)
|
||||||
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
|
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
|
||||||
|
if "NSFW tweet requires authentication." in str(post_e):
|
||||||
|
logger.warning(str(post_e))
|
||||||
|
return False
|
||||||
logger.error("Error downloading metadata for post: {error}", error=str(post_e))
|
logger.error("Error downloading metadata for post: {error}", error=str(post_e))
|
||||||
return False
|
return False
|
||||||
except Exception as generic_e:
|
except Exception as generic_e:
|
||||||
@@ -513,7 +516,7 @@ class GenericExtractor(Extractor):
|
|||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if result:
|
if result and not result.is_success():
|
||||||
extractor_name = "yt-dlp"
|
extractor_name = "yt-dlp"
|
||||||
if info_extractor:
|
if info_extractor:
|
||||||
extractor_name += f"_{info_extractor.ie_key()}"
|
extractor_name += f"_{info_extractor.ie_key()}"
|
||||||
@@ -525,7 +528,7 @@ class GenericExtractor(Extractor):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def download(self, item: Metadata) -> Metadata:
|
def download(self, item: Metadata, skip_proxy: bool = False) -> Metadata:
|
||||||
url = item.get_url()
|
url = item.get_url()
|
||||||
|
|
||||||
# TODO: this is a temporary hack until this issue is closed: https://github.com/yt-dlp/yt-dlp/issues/11025
|
# TODO: this is a temporary hack until this issue is closed: https://github.com/yt-dlp/yt-dlp/issues/11025
|
||||||
@@ -533,6 +536,16 @@ class GenericExtractor(Extractor):
|
|||||||
url = url.replace("https://ya.ru", "https://yandex.ru")
|
url = url.replace("https://ya.ru", "https://yandex.ru")
|
||||||
item.set("replaced_url", url)
|
item.set("replaced_url", url)
|
||||||
|
|
||||||
|
# proxy_on_failure_only logic
|
||||||
|
if self.proxy and self.proxy_on_failure_only and not skip_proxy:
|
||||||
|
# when proxy_on_failure_only is True, we first try to download without a proxy and only continue with execution if that fails
|
||||||
|
try:
|
||||||
|
if without_proxy := self.download(item, skip_proxy=True):
|
||||||
|
logger.info("Downloaded successfully without proxy.")
|
||||||
|
return without_proxy
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Download without proxy failed, trying with proxy...")
|
||||||
|
|
||||||
ydl_options = [
|
ydl_options = [
|
||||||
"-o",
|
"-o",
|
||||||
os.path.join(self.tmp_dir, "%(id)s.%(ext)s"),
|
os.path.join(self.tmp_dir, "%(id)s.%(ext)s"),
|
||||||
@@ -546,7 +559,7 @@ class GenericExtractor(Extractor):
|
|||||||
]
|
]
|
||||||
|
|
||||||
# proxy handling
|
# proxy handling
|
||||||
if self.proxy:
|
if self.proxy and not skip_proxy:
|
||||||
ydl_options.extend(["--proxy", self.proxy])
|
ydl_options.extend(["--proxy", self.proxy])
|
||||||
|
|
||||||
# max_downloads handling
|
# max_downloads handling
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import re
|
||||||
import requests
|
import requests
|
||||||
from auto_archiver.utils.custom_logger import logger
|
from auto_archiver.utils.custom_logger import logger
|
||||||
|
|
||||||
@@ -14,12 +15,16 @@ class Tiktok(GenericDropin):
|
|||||||
It's useful for capturing content that requires a login, like sensitive content.
|
It's useful for capturing content that requires a login, like sensitive content.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Regex pattern to match TikTok photo post URLs
|
||||||
|
PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+"
|
||||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||||
|
|
||||||
def suitable(self, url, info_extractor) -> bool:
|
def suitable(self, url, info_extractor) -> bool:
|
||||||
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
|
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
|
||||||
Return the 'suitable' method from the TikTokIE class."""
|
Return the 'suitable' method from the TikTokIE class."""
|
||||||
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
|
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or (
|
||||||
|
re.match(self.PHOTO_URL_REGEX, url) is not None
|
||||||
|
)
|
||||||
|
|
||||||
def extract_post(self, url: str, ie_instance):
|
def extract_post(self, url: str, ie_instance):
|
||||||
logger.debug("Using Tikwm API to attempt to download tiktok video")
|
logger.debug("Using Tikwm API to attempt to download tiktok video")
|
||||||
@@ -28,56 +33,91 @@ class Tiktok(GenericDropin):
|
|||||||
|
|
||||||
r = requests.get(endpoint)
|
r = requests.get(endpoint)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
|
raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
json_response = r.json()
|
json_response = r.json()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
|
raise ValueError("Failed to parse JSON response from tikwm.com")
|
||||||
|
|
||||||
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
|
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
|
||||||
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
|
raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}")
|
||||||
|
|
||||||
# tries to get the non-watermarked version first
|
# tries to get the non-watermarked version first
|
||||||
video_url = api_data.pop("play", api_data.pop("wmplay", None))
|
play_url = api_data.pop("play", api_data.pop("wmplay", None))
|
||||||
if not video_url:
|
if play_url and "mime_type=audio" in play_url:
|
||||||
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
|
play_url = None
|
||||||
|
if play_url:
|
||||||
api_data["video_url"] = video_url
|
api_data["video_url"] = play_url
|
||||||
return api_data
|
return api_data
|
||||||
|
|
||||||
def keys_to_clean(self, video_data: dict, info_extractor):
|
def keys_to_clean(self, video_data: dict, info_extractor):
|
||||||
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
|
return [
|
||||||
|
"video_url",
|
||||||
|
"title",
|
||||||
|
"create_time",
|
||||||
|
"author",
|
||||||
|
"cover",
|
||||||
|
"origin_cover",
|
||||||
|
"ai_dynamic_cover",
|
||||||
|
"duration",
|
||||||
|
"size",
|
||||||
|
"wm_size",
|
||||||
|
"music",
|
||||||
|
"music_info",
|
||||||
|
"play_count",
|
||||||
|
"digg_count",
|
||||||
|
"comment_count",
|
||||||
|
"share_count",
|
||||||
|
"download_count",
|
||||||
|
"collect_count",
|
||||||
|
"anchors",
|
||||||
|
"anchors_extras",
|
||||||
|
"is_ad",
|
||||||
|
"commerce_info",
|
||||||
|
"commercial_video_info",
|
||||||
|
"item_comment_settings",
|
||||||
|
"mentioned_users",
|
||||||
|
] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor
|
||||||
|
|
||||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||||
# prepare result, start by downloading video
|
# prepare result, start by downloading video
|
||||||
result = Metadata()
|
result = Metadata()
|
||||||
video_url = post.pop("video_url")
|
is_success = False
|
||||||
|
|
||||||
# get the cover if possible
|
# get the cover if possible
|
||||||
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
|
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
|
||||||
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
|
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
|
||||||
result.add_media(Media(cover_downloaded))
|
result.add_media(Media(cover_downloaded))
|
||||||
|
|
||||||
# get the video or fail
|
for image_url in post.pop("images", []):
|
||||||
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
|
if image_downloaded := archiver.download_from_url(image_url):
|
||||||
if not video_downloaded:
|
result.add_media(Media(image_downloaded))
|
||||||
logger.error("Failed to download video")
|
is_success = True # this is an images post and we got it/them
|
||||||
return False
|
|
||||||
video_media = Media(video_downloaded)
|
# get the video if present, could be an image post
|
||||||
if duration := post.get("duration", None):
|
if video_url := post.pop("video_url", None):
|
||||||
video_media.set("duration", duration)
|
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
|
||||||
result.add_media(video_media)
|
if not video_downloaded:
|
||||||
|
logger.error("Failed to download video")
|
||||||
|
return False
|
||||||
|
video_media = Media(video_downloaded)
|
||||||
|
if duration := post.pop("duration", None):
|
||||||
|
video_media.set("duration", duration)
|
||||||
|
result.add_media(video_media)
|
||||||
|
is_success = True # this is a video post and we got it
|
||||||
|
|
||||||
# add remaining metadata
|
# add remaining metadata
|
||||||
result.set_title(post.get("title", ""))
|
result.set_title(post.pop("title", ""))
|
||||||
|
|
||||||
if created_at := post.get("create_time", None):
|
if created_at := post.pop("create_time", None):
|
||||||
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
||||||
|
|
||||||
if author := post.get("author", None):
|
if author := post.pop("author", None):
|
||||||
result.set("author", author)
|
result.set("author", author)
|
||||||
|
|
||||||
result.set("api_data", post)
|
result.set("api_data", {k: v for k, v in post.items() if v})
|
||||||
|
if is_success:
|
||||||
|
result.success("yt-dlp_TikTok")
|
||||||
|
else:
|
||||||
|
raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.")
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -32,26 +32,37 @@ class GsheetsFeederDB(Feeder, Database):
|
|||||||
if not self.sheet and not self.sheet_id:
|
if not self.sheet and not self.sheet_id:
|
||||||
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
|
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
|
||||||
|
|
||||||
def open_sheet(self):
|
@retry(
|
||||||
|
wait_exponential_multiplier=1,
|
||||||
|
stop_max_attempt_number=6,
|
||||||
|
)
|
||||||
|
def open_sheet(self) -> gspread.Spreadsheet:
|
||||||
if self.sheet:
|
if self.sheet:
|
||||||
return self.gsheets_client.open(self.sheet)
|
return self.gsheets_client.open(self.sheet)
|
||||||
else:
|
else:
|
||||||
return self.gsheets_client.open_by_key(self.sheet_id)
|
return self.gsheets_client.open_by_key(self.sheet_id)
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
wait_exponential_multiplier=1,
|
||||||
|
stop_max_attempt_number=6,
|
||||||
|
)
|
||||||
|
def enumerate_sheets(self, sheet) -> Iterator[gspread.Worksheet]:
|
||||||
|
for worksheet in sheet.worksheets():
|
||||||
|
yield worksheet
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[Metadata]:
|
def __iter__(self) -> Iterator[Metadata]:
|
||||||
sh = self.open_sheet()
|
spreadsheet = self.open_sheet()
|
||||||
for ii, worksheet in enumerate(sh.worksheets()):
|
for worksheet in self.enumerate_sheets(spreadsheet):
|
||||||
if not self.should_process_sheet(worksheet.title):
|
with logger.contextualize(worksheet=f"{spreadsheet.title}:{worksheet.title}"):
|
||||||
logger.debug(f"Skipped worksheet '{worksheet.title}' due to allow/block rules")
|
if not self.should_process_sheet(worksheet.title):
|
||||||
continue
|
logger.debug("Skipped worksheet due to allow/block rules")
|
||||||
logger.info(f"Opening worksheet {ii=}: {worksheet.title=} header={self.header}")
|
continue
|
||||||
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
|
logger.info(f"Opening worksheet header={self.header}")
|
||||||
if len(missing_cols := self.missing_required_columns(gw)):
|
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
|
||||||
logger.debug(
|
if len(missing_cols := self.missing_required_columns(gw)):
|
||||||
f"Skipped worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}"
|
logger.debug(f"Skipped worksheet due to missing required column(s) for {missing_cols}")
|
||||||
)
|
continue
|
||||||
continue
|
|
||||||
with logger.contextualize(worksheet=f"{sh.title}:{worksheet.title}"):
|
|
||||||
# process and yield metadata here:
|
# process and yield metadata here:
|
||||||
yield from self._process_rows(gw)
|
yield from self._process_rows(gw)
|
||||||
logger.info(f"Finished worksheet {worksheet.title}")
|
logger.info(f"Finished worksheet {worksheet.title}")
|
||||||
|
|||||||
@@ -20,7 +20,7 @@
|
|||||||
# "http://tsa.sinpe.fi.cr/tsaHttp/", # self-signed
|
# "http://tsa.sinpe.fi.cr/tsaHttp/", # self-signed
|
||||||
# "http://tsa.cra.ge/signserver/tsa?workerName=qtsa", # self-signed
|
# "http://tsa.cra.ge/signserver/tsa?workerName=qtsa", # self-signed
|
||||||
"http://tss.cnbs.gob.hn/TSS/HttpTspServer",
|
"http://tss.cnbs.gob.hn/TSS/HttpTspServer",
|
||||||
"http://dss.nowina.lu/pki-factory/tsa/good-tsa",
|
# "http://dss.nowina.lu/pki-factory/tsa/good-tsa",
|
||||||
# "https://freetsa.org/tsr", # self-signed
|
# "https://freetsa.org/tsr", # self-signed
|
||||||
],
|
],
|
||||||
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
|
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
|
||||||
|
|||||||
@@ -4,12 +4,12 @@ from importlib.metadata import version
|
|||||||
import hashlib
|
import hashlib
|
||||||
|
|
||||||
from slugify import slugify
|
from slugify import slugify
|
||||||
|
from retrying import retry
|
||||||
import requests
|
import requests
|
||||||
from auto_archiver.utils.custom_logger import logger
|
from auto_archiver.utils.custom_logger import logger
|
||||||
|
|
||||||
from rfc3161_client import (decode_timestamp_response,TimestampRequestBuilder,TimeStampResponse, VerifierBuilder)
|
from rfc3161_client import (decode_timestamp_response, TimestampRequestBuilder, TimeStampResponse, VerifierBuilder)
|
||||||
from rfc3161_client import VerificationError as Rfc3161VerificationError
|
from rfc3161_client import VerificationError as Rfc3161VerificationError
|
||||||
from rfc3161_client.base import HashAlgorithm
|
|
||||||
from rfc3161_client.tsp import SignedData
|
from rfc3161_client.tsp import SignedData
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
@@ -60,7 +60,6 @@ class TimestampingEnricher(Enricher):
|
|||||||
logger.debug(f"No hashes found")
|
logger.debug(f"No hashes found")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
|
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
|
||||||
|
|
||||||
data_to_sign = "\n".join(hashes)
|
data_to_sign = "\n".join(hashes)
|
||||||
@@ -113,7 +112,7 @@ class TimestampingEnricher(Enricher):
|
|||||||
f.write(timestamp_token)
|
f.write(timestamp_token)
|
||||||
return tst_path
|
return tst_path
|
||||||
|
|
||||||
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
|
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
|
||||||
"""
|
"""
|
||||||
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
|
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
|
||||||
|
|
||||||
@@ -158,7 +157,6 @@ class TimestampingEnricher(Enricher):
|
|||||||
|
|
||||||
verifier = builder.build()
|
verifier = builder.build()
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
verifier.verify(timestamp_response, message_hash)
|
verifier.verify(timestamp_response, message_hash)
|
||||||
return certificate
|
return certificate
|
||||||
@@ -171,18 +169,33 @@ class TimestampingEnricher(Enricher):
|
|||||||
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
|
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
|
||||||
|
|
||||||
timestamp_request = (
|
timestamp_request = (
|
||||||
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
|
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
|
||||||
)
|
)
|
||||||
try:
|
|
||||||
|
@retry(
|
||||||
|
wait_exponential_multiplier=1,
|
||||||
|
stop_max_attempt_number=2,
|
||||||
|
)
|
||||||
|
def sign_with_retry():
|
||||||
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
|
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
return response
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = sign_with_retry()
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
logger.error(f"Error while sending request to {tsa_url=}: {e}")
|
logger.error(f"Error while sending request to {tsa_url=}: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
wait_exponential_multiplier=1,
|
||||||
|
stop_max_attempt_number=2,
|
||||||
|
)
|
||||||
|
def decode_with_retry(response):
|
||||||
|
return decode_timestamp_response(response.content)
|
||||||
# Check that we can parse the response but do not *verify* it
|
# Check that we can parse the response but do not *verify* it
|
||||||
try:
|
try:
|
||||||
timestamp_response = decode_timestamp_response(response.content)
|
timestamp_response = decode_with_retry(response)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
|
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
|
||||||
raise
|
raise
|
||||||
@@ -196,7 +209,7 @@ class TimestampingEnricher(Enricher):
|
|||||||
if len(certs) == 1:
|
if len(certs) == 1:
|
||||||
return certs
|
return certs
|
||||||
|
|
||||||
while(len(ordered_certs) < len(certs)):
|
while (len(ordered_certs) < len(certs)):
|
||||||
if len(ordered_certs) == 0:
|
if len(ordered_certs) == 0:
|
||||||
for cert in certs:
|
for cert in certs:
|
||||||
if not [c for c in certs if cert.subject == c.issuer]:
|
if not [c for c in certs if cert.subject == c.issuer]:
|
||||||
@@ -220,7 +233,7 @@ class TimestampingEnricher(Enricher):
|
|||||||
|
|
||||||
cert_chain = []
|
cert_chain = []
|
||||||
for i, cert in enumerate(certificates):
|
for i, cert in enumerate(certificates):
|
||||||
cert_fn = os.path.join(self.tmp_dir, f"{i+1} – {str(cert.serial_number)[:20]}.crt")
|
cert_fn = os.path.join(self.tmp_dir, f"{i + 1} – {str(cert.serial_number)[:20]}.crt")
|
||||||
with open(cert_fn, "wb") as f:
|
with open(cert_fn, "wb") as f:
|
||||||
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
|
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
|
||||||
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))
|
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import json
|
|||||||
from auto_archiver.utils.custom_logger import logger
|
from auto_archiver.utils.custom_logger import logger
|
||||||
import time
|
import time
|
||||||
import requests
|
import requests
|
||||||
|
from urllib3.exceptions import MaxRetryError
|
||||||
from auto_archiver.core import Extractor, Enricher
|
from auto_archiver.core import Extractor, Enricher
|
||||||
from auto_archiver.utils import url as UrlUtil
|
from auto_archiver.utils import url as UrlUtil
|
||||||
from auto_archiver.core import Metadata
|
from auto_archiver.core import Metadata
|
||||||
@@ -45,7 +45,14 @@ class WaybackExtractorEnricher(Enricher, Extractor):
|
|||||||
if self.if_not_archived_within:
|
if self.if_not_archived_within:
|
||||||
post_data["if_not_archived_within"] = self.if_not_archived_within
|
post_data["if_not_archived_within"] = self.if_not_archived_within
|
||||||
# see https://docs.google.com/document/d/1Nsv52MvSjbLb2PCpHlat0gkzw0EvtSgpKHu4mk0MnrA for more options
|
# see https://docs.google.com/document/d/1Nsv52MvSjbLb2PCpHlat0gkzw0EvtSgpKHu4mk0MnrA for more options
|
||||||
r = requests.post("https://web.archive.org/save/", headers=ia_headers, data=post_data, proxies=proxies)
|
try:
|
||||||
|
r = requests.post("https://web.archive.org/save/", headers=ia_headers, data=post_data, proxies=proxies)
|
||||||
|
except MaxRetryError as e:
|
||||||
|
logger.warning(
|
||||||
|
f"MaxRetryError during Wayback POST call to /save, this may be do to a high number of calls leading to rate limiting: {e}"
|
||||||
|
)
|
||||||
|
to_enrich.set("wayback", "failed: possible rate limit")
|
||||||
|
return False
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
logger.error(em := f"Internet archive failed with status of {r.status_code}: {r.json()}")
|
logger.error(em := f"Internet archive failed with status of {r.status_code}: {r.json()}")
|
||||||
@@ -76,6 +83,9 @@ class WaybackExtractorEnricher(Enricher, Extractor):
|
|||||||
if r_status.status_code == 200 and r_json["status"] == "success":
|
if r_status.status_code == 200 and r_json["status"] == "success":
|
||||||
wayback_url = f"https://web.archive.org/web/{r_json['timestamp']}/{r_json['original_url']}"
|
wayback_url = f"https://web.archive.org/web/{r_json['timestamp']}/{r_json['original_url']}"
|
||||||
elif r_status.status_code != 200 or r_json["status"] != "pending":
|
elif r_status.status_code != 200 or r_json["status"] != "pending":
|
||||||
|
if r_json.get("status_ext") in ["error:blocked-url", "error:unauthorized"]:
|
||||||
|
logger.warning("Wayback cannot currently archive the URL, skipping.")
|
||||||
|
to_enrich.set("wayback", r_json.get("status_ext"))
|
||||||
logger.error(f"Wayback failed with {r_json}")
|
logger.error(f"Wayback failed with {r_json}")
|
||||||
return False
|
return False
|
||||||
except requests.exceptions.RequestException as e:
|
except requests.exceptions.RequestException as e:
|
||||||
|
|||||||
@@ -88,6 +88,13 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
|||||||
5,
|
5,
|
||||||
0,
|
0,
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"https://www.tiktok.com/@tracy_2424/photo/7418200173953830162",
|
||||||
|
"TikTok",
|
||||||
|
"Dito ko lang",
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):
|
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||||||
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
|
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
|
||||||
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
|
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
|
||||||
("https://vt.tiktok.com/ZSMTJeqRP/", True),
|
("https://vt.tiktok.com/ZSMTJeqRP/", True),
|
||||||
|
("https://tiktok.com/@user/photo/123?lang=en", True),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
|
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
|
||||||
@@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||||||
mock_get.assert_called_once()
|
mock_get.assert_called_once()
|
||||||
mock_get.return_value.json.assert_called_once()
|
mock_get.return_value.json.assert_called_once()
|
||||||
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
|
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
|
||||||
assert (
|
assert "Failed to parse JSON response from tikwm.com" in caplog.text
|
||||||
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
|
|
||||||
in caplog.text
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_get.return_value.json.side_effect = Exception
|
mock_get.return_value.json.side_effect = Exception
|
||||||
with caplog.at_level("ERROR"):
|
with caplog.at_level("ERROR"):
|
||||||
@@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||||||
mock_get.assert_called()
|
mock_get.assert_called()
|
||||||
assert mock_get.call_count == 2
|
assert mock_get.call_count == 2
|
||||||
assert mock_get.return_value.json.call_count == 2
|
assert mock_get.return_value.json.call_count == 2
|
||||||
assert (
|
assert "Failed to parse JSON response from tikwm.com" in caplog.text
|
||||||
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
|
|
||||||
in caplog.text
|
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"response",
|
"response",
|
||||||
@@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
|
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
|
||||||
mock_get.assert_called_once()
|
mock_get.assert_called_once()
|
||||||
mock_get.return_value.json.assert_called_once()
|
mock_get.return_value.json.assert_called_once()
|
||||||
assert "failed to get a valid response from tikwm.com" in caplog.text
|
assert "Unable to download with tikwm.com: " in caplog.text
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"response,has_vid",
|
"response,is_success",
|
||||||
[
|
[
|
||||||
({"data": {"id": 123}}, False),
|
({"data": {"id": 123, "images": []}}, False),
|
||||||
({"data": {"wmplay": "url"}}, True),
|
({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True),
|
||||||
({"data": {"play": "url"}}, True),
|
({"data": {"play": "url", "images": ["img1.jpg"]}}, True),
|
||||||
|
({"data": {"images": ["img1.jpg"]}}, True),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker):
|
def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker):
|
||||||
|
data = {k: v for k, v in response.get("data", {}).items()}
|
||||||
mock_get.return_value.status_code = 200
|
mock_get.return_value.status_code = 200
|
||||||
mock_get.return_value.json.return_value = {"msg": "success", **response}
|
mock_get.return_value.json.return_value = {"msg": "success", **response}
|
||||||
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
|
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
|
||||||
if not has_vid:
|
total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0)
|
||||||
assert result is False
|
if is_success:
|
||||||
else:
|
|
||||||
assert result.is_success()
|
assert result.is_success()
|
||||||
assert len(result.media) == 1
|
assert len(result.media) == total_media
|
||||||
|
else:
|
||||||
|
assert result is False
|
||||||
mock_get.assert_called()
|
mock_get.assert_called()
|
||||||
assert mock_get.call_count == 1 + int(has_vid)
|
assert mock_get.call_count == 1 + total_media
|
||||||
mock_get.return_value.json.assert_called_once()
|
mock_get.return_value.json.assert_called_once()
|
||||||
|
|
||||||
def test_correct_data_extracted(self, mock_get, make_item):
|
def test_correct_data_extracted(self, mock_get, make_item):
|
||||||
@@ -142,7 +140,9 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||||||
assert len(result.media) == 2
|
assert len(result.media) == 2
|
||||||
assert result.get_title() == "Title"
|
assert result.get_title() == "Title"
|
||||||
assert result.get("author") == "Author"
|
assert result.get("author") == "Author"
|
||||||
assert result.get("api_data") == {"other": "data", "id": 123}
|
assert result.get("other") == "data"
|
||||||
|
assert result.get("comments") is None
|
||||||
|
assert result.get("api_data") == {"id": 123, "other": "data"}
|
||||||
assert result.media[1].get("duration") == 60
|
assert result.media[1].get("duration") == 60
|
||||||
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)
|
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user