Merge pull request #313 from bellingcat/feat/antibot-auth

Introduces more flexibility to the Antibot Extractor
This commit is contained in:
Miguel Sozinho Ramalho
2025-06-08 14:42:35 +01:00
committed by GitHub
15 changed files with 456 additions and 60 deletions

View File

@@ -28,6 +28,9 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install ffmpeg
run: sudo apt-get update && sudo apt-get install -y ffmpeg
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
@@ -35,7 +38,7 @@ jobs:
- name: Install latest Poetry
run: pipx install poetry
- name: Cache Poetry and pip artifacts
uses: actions/cache@v4
with:

View File

@@ -22,6 +22,9 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install ffmpeg
run: sudo apt-get update && sudo apt-get install -y ffmpeg
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:

3
.gitignore vendored
View File

@@ -36,4 +36,5 @@ docs/source/autoapi/
docs/source/modules/autogen/
scripts/settings_page.html
scripts/settings/src/schema.json
.vite
.vite
downloaded_files

View File

@@ -52,12 +52,12 @@ authentication:
username: myusername
password: 123
facebook.com:
cookie: single_cookie
facebook.com:
cookie: single_cookie
othersite.com:
api_key: 123
api_secret: 1234
othersite.com:
api_key: 123
api_secret: 1234
```

View File

@@ -98,12 +98,11 @@ class BaseModule(ABC):
"""
# TODO: think about if/how we can deal with sites that have multiple domains (main one is x.com/twitter.com)
# for now the user must enter them both, like "x.com,twitter.com" in their config. Maybe we just hard-code?
site = UrlUtil.domain_for_url(site).removeprefix("www.")
domain = UrlUtil.domain_for_url(site).removeprefix("www.")
# add the 'www' version of the site to the list of sites to check
authdict = {}
for to_try in [site, f"www.{site}"]:
for to_try in [site, domain, f"www.{domain}"]:
if to_try in self.authentication:
authdict.update(self.authentication[to_try])
break
@@ -111,9 +110,9 @@ class BaseModule(ABC):
# do a fuzzy string match just to print a warning - don't use it since it's insecure
if not authdict:
for key in self.authentication.keys():
if key in site or site in key:
if key in domain or domain in key:
logger.debug(
f"Could not find exact authentication information for site '{site}'. \
f"Could not find exact authentication information for '{domain}'. \
did find information for '{key}' which is close, is this what you meant? \
If so, edit your authentication settings to make sure it exactly matches."
)

View File

@@ -578,6 +578,7 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
result.set_url(url)
if original_url != url:
logger.debug(f"Sanitized URL from {original_url} to {url}")
result.set("original_url", original_url)
# 2 - notify start to DBs, propagate already archived if feature enabled in DBs

View File

@@ -2,9 +2,7 @@
"name": "Antibot Extractor/Enricher",
"type": ["extractor", "enricher"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "seleniumbase"],
},
"dependencies": {"python": ["loguru", "seleniumbase", "yt_dlp"], "bin": ["ffmpeg"]},
"configs": {
"save_to_pdf": {
"default": False,
@@ -23,6 +21,10 @@
"default": ".svg,.ico,.gif",
"help": "CSV of media (image/video) file extensions to exclude from download",
},
"user_data_dir": {
"default": "secrets/antibot_user_data",
"help": "Path to the user data directory for the webdriver. This is used to persist browser state, such as cookies and local storage. When using docker it's best to let docker create the folder otherwise there may be permission issues. The Extractor will try to work without it if that error occurs but login sessions will not be used or preserved on those runs.",
},
"proxy": {
"default": None,
"help": "proxy to use for the webdriver, Format: 'SERVER:PORT' or 'USER:PASS@SERVER:PORT'",

View File

@@ -5,11 +5,16 @@ import os
import sys
import traceback
from urllib.parse import urljoin
import glob
import stat
import importlib.util
from loguru import logger
import selenium
from seleniumbase import SB
from auto_archiver.core import Extractor, Enricher, Metadata, Media
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
from auto_archiver.utils.misc import random_str
@@ -34,6 +39,37 @@ class AntibotExtractorEnricher(Extractor, Enricher):
else:
self.max_download_videos = int(self.max_download_videos)
self._prepare_and_warn_about_docker_and_user_data_dir()
self.dropins = self.load_dropins()
def load_dropins(self):
dropins = []
# TODO: add user-configurable drop-ins via config like generic_extractor
dropins_dir = os.path.join(os.path.dirname(__file__), "dropins")
for file_path in glob.glob(os.path.join(dropins_dir, "*.py")):
if os.path.basename(file_path).startswith("_"):
continue # skip __init__.py or private modules
module_name = f"auto_archiver.modules.antibot_extractor_enricher.dropins.{os.path.splitext(os.path.basename(file_path))[0]}"
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for attr in dir(module):
obj = getattr(module, attr)
if getattr(obj, "__module__", None) != module.__name__:
continue # Skip imported modules/classes/functions
if isinstance(obj, type) and issubclass(obj, Dropin):
dropins.append(obj)
logger.debug(f"ANTIBOT loaded drop-in classes: {', '.join([d.__name__ for d in dropins])}")
return dropins
def sanitize_url(self, url: str) -> str:
for dropin in self.dropins:
if dropin.suitable(url):
return dropin.sanitize_url(url)
return url
def download(self, item: Metadata) -> Metadata:
result = Metadata()
result.merge(item)
@@ -41,12 +77,27 @@ class AntibotExtractorEnricher(Extractor, Enricher):
result.status = "antibot"
return result
def enrich(self, to_enrich: Metadata) -> bool:
def _prepare_and_warn_about_docker_and_user_data_dir(self):
os.makedirs(self.user_data_dir, exist_ok=True)
in_docker = os.environ.get("RUNNING_IN_DOCKER")
if in_docker and self.user_data_dir:
st = os.stat(self.user_data_dir)
perms = stat.filemode(st.st_mode)
owner = st.st_uid
group = st.st_gid
if owner != 0 or group != 0:
logger.warning(
f"""ANTIBOT: Running in Docker with user_data_dir {self.user_data_dir} with permissions {perms} and non-root {owner=}. This may cause issues with Chrome, if you get 'session not created' errors make sure to remove the folder and let docker create it."""
)
def enrich(self, to_enrich: Metadata, custom_data_dir: bool = True) -> bool:
using_user_data_dir = self.user_data_dir if custom_data_dir else None
url = to_enrich.get_url()
# TODO: implement cookies auth = self.auth_for_site(url) and combine with if UrlUtil.is_auth_wall(url) like in ScreenshotEnricher
url_sample = url[:75]
try:
with SB(uc=True, agent=self.agent, headed=None, proxy=self.proxy) as sb:
with SB(uc=True, agent=self.agent, headed=None, user_data_dir=using_user_data_dir, proxy=self.proxy) as sb:
logger.info(f"ANTIBOT selenium browser is up with agent {self.agent}, opening {url_sample}...")
sb.uc_open_with_reconnect(url, 4)
@@ -55,11 +106,17 @@ class AntibotExtractorEnricher(Extractor, Enricher):
# TODO: implement other Captcha handling
sb.uc_gui_handle_captcha() # handles Cloudflare Turnstile captcha if detected
# time.sleep(1) # wait for the page to load
suitable_dropin = self._get_suitable_dropin(url, sb)
if suitable_dropin:
suitable_dropin.open_page(url)
if self._hit_auth_wall(sb):
logger.warning(f"ANTIBOT SKIP since auth wall or CAPTCHA was detected for {url_sample}")
return False
logger.debug(f"ANTIBOT no auth wall detected for {url_sample}...")
sb.wait_for_ready_state_complete()
sb.sleep(1) # margin for the page to load completely
to_enrich.set_title(sb.get_title())
self._enrich_html_source_code(sb, to_enrich)
@@ -67,18 +124,42 @@ class AntibotExtractorEnricher(Extractor, Enricher):
if self.save_to_pdf:
self._enrich_full_page_pdf(sb, to_enrich)
self._enrich_download_media(sb, to_enrich, css_selector="img", max_media=self.max_download_images)
self._enrich_download_media(
sb, to_enrich, css_selector="video, source", max_media=self.max_download_videos
)
downloaded_images, downloaded_videos = 0, 0
if suitable_dropin:
downloaded_images, downloaded_videos = suitable_dropin.add_extra_media(to_enrich)
self._enrich_download_media(
sb, to_enrich, css_selector="img", max_media=self.max_download_images - downloaded_images
)
self._enrich_download_media(
sb, to_enrich, css_selector="video, source", max_media=self.max_download_videos - downloaded_videos
)
logger.success(f"ANTIBOT completed for {url_sample}")
return to_enrich
except selenium.common.exceptions.SessionNotCreatedException as e:
if custom_data_dir: # the retry logic only works once
logger.error(
f"ANTIBOT session not created error: {e}. Please remove the user_data_dir {self.user_data_dir} and try again, will retry without user data dir though."
)
return self.enrich(to_enrich, custom_data_dir=False)
raise e # re-raise
except Exception as e:
logger.error(f"ANTIBOT runtime error: {e}: {traceback.format_exc()}")
return False
def _get_suitable_dropin(self, url: str, sb: SB):
"""
Returns a suitable drop-in for the given URL.
This method checks if the URL is suitable for any of the registered drop-ins.
"""
for dropin in self.dropins:
if dropin.suitable(url):
logger.debug(f"ANTIBOT using drop-in {dropin.__class__.__name__} for {url}")
return dropin(sb, self)
# logger.warning(f"ANTIBOT no suitable drop-in found for {url}")
return None
def _hit_auth_wall(self, sb: SB) -> bool:
"""
Tries to detect if the currently loaded page is an auth/login wall.
@@ -202,16 +283,20 @@ class AntibotExtractorEnricher(Extractor, Enricher):
)
url = to_enrich.get_url()
all_urls = set()
media_elements = sb.find_elements(css_selector)
for media in media_elements:
# media_elements = sb.find_elements(css_selector)
sources = sb.execute_script(f"""
return Array.from(document.querySelectorAll("{css_selector}"))
.map(el => el.src || el.href)
.filter(Boolean);
""")
for src in sources:
if len(all_urls) >= max_media:
logger.debug(f"Reached max download limit of {max_media} images/videos.")
break
if src := media.get_attribute("src"):
mimerype = mimetypes.guess_type(src)[0]
if mimerype in self.exclude_media_mimetypes:
continue
full_src = urljoin(url, src)
if full_src not in all_urls and (filename := self.download_from_url(full_src)):
all_urls.add(full_src)
to_enrich.add_media(Media(filename=filename, properties={"url": full_src}))
mimerype = mimetypes.guess_type(src)[0]
if mimerype in self.exclude_media_mimetypes:
continue
full_src = urljoin(url, src)
if full_src not in all_urls and (filename := self.download_from_url(full_src)):
all_urls.add(full_src)
to_enrich.add_media(Media(filename=filename, properties={"url": full_src}))

View File

@@ -0,0 +1,52 @@
from seleniumbase import SB
from auto_archiver.core.extractor import Extractor
from auto_archiver.core.metadata import Metadata
class Dropin:
"""
A class to handle drop-in functionality for the antibot extractor enricher module.
This class is designed to be a base class for drop-ins that can handle specific websites.
"""
def __init__(self, sb: SB, extractor: Extractor):
"""
Initialize the Dropin with the given SeleniumBase instance.
:param sb: An instance of the SeleniumBase class that this drop-in will use.
:param extractor: An instance of the Extractor class that this drop-in will use.
"""
self.sb: SB = sb
self.extractor: Extractor = extractor
@staticmethod
def suitable(url: str) -> bool:
"""
Check if the URL is suitable for processing with this dropin.
:param url: The URL to check.
:return: True if the URL is suitable for processing, False otherwise.
"""
raise NotImplementedError("This method should be implemented in the subclass")
@staticmethod
def sanitize_url(url: str) -> str:
"""
Used to clean URLs before processing them.
"""
return url
def open_page(self, url) -> bool:
"""
Make sure the page is opened, even if it requires authentication, captcha solving, etc.
:param url: The URL to open.
:return: True if success, False otherwise.
"""
raise NotImplementedError("This method should be implemented in the subclass")
def add_extra_media(self, to_enrich: Metadata) -> tuple[int, int]:
"""
Extract image and/or video data from the currently open post with SeleniumBase. Media is added to the `to_enrich` Metadata object.
:return: A tuple (number of Images added, number of Videos added).
"""
raise NotImplementedError("This method should be implemented in the subclass")

View File

@@ -0,0 +1,126 @@
import os
import re
from auto_archiver.core.media import Media
from auto_archiver.core.metadata import Metadata
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
from auto_archiver.utils.misc import ydl_entry_to_filename
import yt_dlp
from loguru import logger
class VkDropin(Dropin):
"""
A class to handle VK drop-in functionality for the antibot extractor enricher module.
"""
WALL_PATTERN = re.compile(r"(wall.{0,1}\d+_\d+)")
VIDEO_PATTERN = re.compile(r"(video.{0,1}\d+_\d+(?:_\w+)?)")
CLIP_PATTERN = re.compile(r"(clip.{0,1}\d+_\d+)")
PHOTO_PATTERN = re.compile(r"(photo.{0,1}\d+_\d+)")
@staticmethod
def suitable(url: str) -> bool:
return "vk.com" in url
@staticmethod
def sanitize_url(url: str) -> str:
"""
Transforms modal URLs like 'https://vk.com/page_name?w=wall-123456_7890' to 'https://vk.com/wall-123456_7890'
"""
for pattern in [VkDropin.WALL_PATTERN, VkDropin.VIDEO_PATTERN, VkDropin.CLIP_PATTERN, VkDropin.PHOTO_PATTERN]:
match = pattern.search(url)
if match:
return f"https://vk.com/{match.group(1)}"
return url
def open_page(self, url) -> bool:
if self.sb.is_text_visible("Sign in to VK"):
self._login()
self.sb.open(url)
return True
def _login(self) -> bool:
# TODO: test method
self.sb.open("https://vk.com")
self.sb.wait_for_ready_state_complete()
if "/feed" in self.sb.get_current_url():
logger.debug("Already logged in to VK.")
return True
# need to login
logger.debug("Logging in to VK...")
auth = self.extractor.auth_for_site("vk.com")
username = auth.get("username", "")
password = auth.get("password", "")
if not username or not password:
raise ValueError("VK authentication requires a username and password.")
logger.debug("Using username: {}", username)
self.sb.click('[data-testid="enter-another-way"]', timeout=10)
self.sb.clear('input[name="login"][type="tel"]', by="css selector", timeout=10)
self.sb.type('input[name="login"][type="tel"]', username, by="css selector", timeout=10)
self.sb.click('button[type="submit"]')
# TODO: handle captcha if it appears
# if sb.is_element_visible("img.vkc__CaptchaPopup__image"):
# captcha_url = sb.get_attribute("img.vkc__CaptchaPopup__image", "src")
# print("CAPTCHA detected:", captcha_url)
# image_url = sb.get_attribute("img[alt*='captcha']", "src")
# solution = solve_captcha(image_url)
# sb.type("input#captcha-text, input[name='captcha']", solution)
# sb.click("button[type='submit']")
self.sb.type('input[name="password"]', password, timeout=15)
self.sb.click('button[type="submit"]')
self.sb.wait_for_ready_state_complete(timeout=10)
self.sb.wait_for_element("body", timeout=10)
# self.sb.sleep(2)
return "/feed" in self.sb.get_current_url()
@logger.catch
def add_extra_media(self, to_enrich: Metadata) -> tuple[int, int]:
"""
Extract video data from the currently open post with SeleniumBase.
:return: A tuple (number of Images added, number of Videos added).
"""
video_urls = [v.get_attribute("href") for v in self.sb.find_elements('a[href*="/video-"]')]
if type(self.extractor.max_download_videos) is int:
video_urls = video_urls[: self.extractor.max_download_videos]
if not video_urls:
return 0, 0
logger.debug(f"Found {len(video_urls)} video URLs in the post, using ytdlp for download.")
ydl_options = [
"-o",
os.path.join(self.extractor.tmp_dir, "%(id)s.%(ext)s"),
"--quiet",
"--no-playlist",
"--no-write-subs",
"--no-write-auto-subs",
"--postprocessor-args",
"ffmpeg:-bitexact",
"--max-filesize",
"1000M", # Limit to 1GB per video
]
*_, validated_options = yt_dlp.parse_options(ydl_options)
downloaded = 0
with yt_dlp.YoutubeDL(validated_options) as ydl:
for url in video_urls:
try:
logger.debug(f"Downloading video from URL: {url}")
info = ydl.extract_info(url, download=True)
filename = ydl_entry_to_filename(ydl, info)
if not filename: # Failed to download video.
continue
media = Media(filename)
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
if x in info:
media.set(x, info[x])
to_enrich.add_media(media)
downloaded += 1
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
return 0, downloaded

View File

@@ -4,9 +4,7 @@
"author": "Bellingcat",
"type": ["extractor"],
"requires_setup": False,
"dependencies": {
"python": ["yt_dlp", "requests", "loguru", "slugify"],
},
"dependencies": {"python": ["yt_dlp", "requests", "loguru", "slugify"], "bin": ["ffmpeg"]},
"description": """
This is the generic extractor used by auto-archiver, which uses `yt-dlp` under the hood.

View File

@@ -1,4 +1,3 @@
import mimetypes
import shutil
import sys
import datetime
@@ -20,6 +19,7 @@ from loguru import logger
from auto_archiver.core.extractor import Extractor
from auto_archiver.core import Metadata, Media
from auto_archiver.utils import get_datetime_from_str
from auto_archiver.utils.misc import ydl_entry_to_filename
from .dropin import GenericDropin
@@ -371,7 +371,6 @@ class GenericExtractor(Extractor):
data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=True)
except MaxDownloadsReached: # proceed as normal once MaxDownloadsReached is raised
pass
logger.success(data)
if "entries" in data:
entries = data.get("entries", [])
@@ -382,27 +381,11 @@ class GenericExtractor(Extractor):
entries = [data]
result = Metadata()
def _helper_get_filename(entry: dict) -> str:
entry_url = entry.get("url")
filename = ydl.prepare_filename(entry)
base_filename, _ = os.path.splitext(filename) # '/get/path/to/file' ignore '.ext'
directory = os.path.dirname(base_filename) # '/get/path/to'
basename = os.path.basename(base_filename) # 'file'
for f in os.listdir(directory):
if (
f.startswith(basename)
or (entry_url and os.path.splitext(f)[0] in entry_url)
and "video/" in (mimetypes.guess_type(f)[0] or "")
):
return os.path.join(directory, f)
return False
for entry in entries:
try:
filename = _helper_get_filename(entry)
filename = ydl_entry_to_filename(ydl, entry)
if not filename or not os.path.exists(filename):
if not filename:
# file was not downloaded or could not be retrieved, example: sensitive videos on YT without using cookies.
continue

View File

@@ -1,5 +1,6 @@
import hashlib
import json
import mimetypes
import os
import uuid
from datetime import datetime, timezone
@@ -116,3 +117,26 @@ def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | Non
def get_current_timestamp() -> str:
return get_timestamp(datetime.now())
def ydl_entry_to_filename(ydl, entry: dict) -> str:
import yt_dlp
ydl: yt_dlp.YoutubeDL
entry_url = entry.get("url")
filename = ydl.prepare_filename(entry)
if os.path.exists(filename):
return filename
base_filename, _ = os.path.splitext(filename) # '/get/path/to/file' ignore '.ext'
directory = os.path.dirname(base_filename) # '/get/path/to'
basename = os.path.basename(base_filename) # 'file'
for f in os.listdir(directory):
if (
f.startswith(basename)
or (entry_url and os.path.splitext(f)[0] in entry_url)
and "video/" in (mimetypes.guess_type(f)[0] or "")
):
return os.path.join(directory, f)
return False

View File

@@ -0,0 +1,101 @@
import pytest
from auto_archiver.modules.antibot_extractor_enricher.dropins.vk import VkDropin
@pytest.mark.parametrize(
"input_url,expected",
[
# Unrelated URL, should return unchanged
(
"https://vk.com/id123456",
"https://vk.com/id123456",
),
(
"https://example.com/",
"https://example.com/",
),
# Wall post modal URL
(
"https://vk.com/somepage?w=wall-123456_7890",
"https://vk.com/wall-123456_7890",
),
# Wall post modal URL with no dash
(
"https://vk.com/somepage?w=wall123456_7890",
"https://vk.com/wall123456_7890",
),
# Photo modal URL
(
"https://vk.com/somepage?w=photo-654321_9876",
"https://vk.com/photo-654321_9876",
),
# Photo modal URL with no dash
(
"https://vk.com/somepage?w=photo654321_9876",
"https://vk.com/photo654321_9876",
),
# Video modal URL
(
"https://vk.com/somepage?w=video-111222_3334",
"https://vk.com/video-111222_3334",
),
# Video modal URL with extra part
(
"https://vk.com/somepage?w=video-111222_3334_ABC",
"https://vk.com/video-111222_3334_ABC",
),
# Video modal URL with no dash
(
"https://vk.com/somepage?w=video111222_3334",
"https://vk.com/video111222_3334",
),
# No modal, should return unchanged
(
"https://vk.com/wall-123456_7890",
"https://vk.com/wall-123456_7890",
),
(
"https://vk.com/photo-654321_9876",
"https://vk.com/photo-654321_9876",
),
(
"https://vk.com/video-111222_3334",
"https://vk.com/video-111222_3334",
),
# Clip modal URL
(
"https://vk.com/somepage?w=clip-555666_7778",
"https://vk.com/clip-555666_7778",
),
# Clip modal URL with no dash
(
"https://vk.com/somepage?w=clip555666_7778",
"https://vk.com/clip555666_7778",
),
# Clip modal URL with extra part
(
"https://vk.com/somepage?w=clip-555666_7778_ABC",
"https://vk.com/clip-555666_7778",
),
# No modal, should return unchanged (clip)
(
"https://vk.com/clip-555666_7778",
"https://vk.com/clip-555666_7778",
),
# Modal with multiple params, should still work with right priority
(
"https://vk.com/somepage?z=photo-654321_9876&w=wall-123456_7890",
"https://vk.com/wall-123456_7890",
),
(
"https://vk.com/somepage?z=photo-654321_9876&w=video-111222_3334",
"https://vk.com/video-111222_3334",
),
(
"https://vk.com/somepage?z=video-111222_3334&w=wall-654321_9876",
"https://vk.com/wall-654321_9876",
),
],
)
def test_sanitize_url(input_url, expected):
assert VkDropin.sanitize_url(input_url) == expected

View File

@@ -40,35 +40,46 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
@pytest.mark.download
@pytest.mark.parametrize(
"url,in_title,image_count,video_count",
"url,in_title,in_text,image_count,video_count",
[
(
"https://en.wikipedia.org/wiki/Western_barn_owl",
"western barn owl",
"Tyto alba",
5,
0,
),
(
"https://www.bellingcat.com/news/2025/04/29/open-sources-show-myanmar-junta-airstrike-damages-despite-post-earthquake-ceasefire/",
"open sources show myanmar",
"Bellingcat has geolocated",
5,
0,
),
(
"https://www.bellingcat.com/news/2025/03/27/gaza-israel-palestine-shot-killed-injured-destroyed-dangerous-drone-journalists-in-gaza/",
"shot from above",
"continued the work of Gazan journalists",
5,
1,
),
(
"https://www.bellingcat.com/about/general-information",
"general information",
"Stichting Bellingcat",
0, # SVGs are ignored
0,
),
(
"https://vk.com/wikipedia?from=search&w=wall-36156673_20451",
"Hounds of Love",
"16 сентября 1985 года лейблом EMI Records.",
5,
0,
),
],
)
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, image_count, video_count):
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):
"""
Test downloading pages with media.
"""
@@ -81,7 +92,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"max_download_videos": "inf",
},
)
url = self.extractor.sanitize_url(url)
item = make_item(url)
result = self.extractor.download(item)
@@ -89,7 +100,14 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
# Check title contains all required words (case-insensitive)
page_title = result.get_title() or ""
assert in_title in page_title.lower(), f"Expected title to contain '{in_title}', got '{page_title}'"
assert in_title.lower() in page_title.lower(), f"Expected title to contain '{in_title}', got '{page_title}'"
# Check text contains all required words (case-insensitive)
with open(result.get_media_by_id("html_source_code").filename, "r", encoding="utf-8") as f:
html_content = f.read()
assert in_text.lower() in html_content.lower(), (
f"Expected HTML to contain '{in_text}', got '{html_content}'"
)
image_media = [m for m in result.media if m.is_image() and not m.get("id") == "screenshot"]
assert len(image_media) == image_count, f"Expected {image_count} image items, got {len(image_media)}"