From c574b694ed0db50792b0719504486252848adfdd Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Mon, 3 Feb 2025 17:25:59 +0100 Subject: [PATCH] Set up screenshot enricher to use authentication/cookies --- src/auto_archiver/core/base_module.py | 15 +- src/auto_archiver/core/orchestrator.py | 2 +- .../enrichers/screenshot_enricher.py | 2 +- .../generic_extractor/generic_extractor.py | 2 +- .../modules/generic_extractor/twitter.py | 2 +- .../screenshot_enricher.py | 6 +- .../modules/wacz_enricher/wacz_enricher.py | 2 +- .../wayback_extractor_enricher.py | 2 +- src/auto_archiver/utils/__init__.py | 1 - src/auto_archiver/utils/url.py | 129 +++++++++--------- src/auto_archiver/utils/webdriver.py | 86 +++++++++--- 11 files changed, 153 insertions(+), 96 deletions(-) diff --git a/src/auto_archiver/core/base_module.py b/src/auto_archiver/core/base_module.py index d23643c..fcfe9ea 100644 --- a/src/auto_archiver/core/base_module.py +++ b/src/auto_archiver/core/base_module.py @@ -4,6 +4,7 @@ from typing import Mapping, Any from abc import ABC from copy import deepcopy, copy from tempfile import TemporaryDirectory +from auto_archiver.utils import url as UrlUtil from loguru import logger @@ -78,7 +79,7 @@ class BaseModule(ABC): self.config = config for key, val in config.get(self.name, {}).items(): setattr(self, key, val) - + def auth_for_site(self, site: str, extract_cookies=True) -> Mapping[str, Any]: """ Returns the authentication information for a given site. This is used to authenticate @@ -98,8 +99,7 @@ class BaseModule(ABC): # TODO: think about if/how we can deal with sites that have multiple domains (main one is x.com/twitter.com) # for now the user must enter them both, like "x.com,twitter.com" in their config. Maybe we just hard-code? - # SECURITY: parse the domain using urllib - site = urlparse(site).netloc + site = UrlUtil.domain_for_url(site) # add the 'www' version of the site to the list of sites to check authdict = {} @@ -116,12 +116,11 @@ class BaseModule(ABC): logger.debug(f"Could not find exact authentication information for site '{site}'. \ did find information for '{key}' which is close, is this what you meant? \ If so, edit your authentication settings to make sure it exactly matches.") - def get_ytdlp_cookiejar(args): import yt_dlp from yt_dlp import parse_options - + logger.debug(f"Extracting cookies from settings: {args[1]}") # parse_options returns a named tuple as follows, we only need the ydl_options part # collections.namedtuple('ParsedOptions', ('parser', 'options', 'urls', 'ydl_opts')) ytdlp_opts = getattr(parse_options(args), 'ydl_opts') @@ -130,10 +129,12 @@ class BaseModule(ABC): # get the cookies jar, prefer the browser cookies than the file if 'cookies_from_browser' in self.authentication: authdict['cookies_from_browser'] = self.authentication['cookies_from_browser'] - authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies-from-browser', self.authentication['cookies_from_browser']]) + if extract_cookies: + authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies-from-browser', self.authentication['cookies_from_browser']]) elif 'cookies_file' in self.authentication: authdict['cookies_file'] = self.authentication['cookies_file'] - authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies', self.authentication['cookies_file']]) + if extract_cookies: + authdict['cookies_jar'] = get_ytdlp_cookiejar(['--cookies', self.authentication['cookies_file']]) return authdict diff --git a/src/auto_archiver/core/orchestrator.py b/src/auto_archiver/core/orchestrator.py index 85b3d61..dbc8a33 100644 --- a/src/auto_archiver/core/orchestrator.py +++ b/src/auto_archiver/core/orchestrator.py @@ -174,7 +174,7 @@ class ArchivingOrchestrator: default={}, action=AuthenticationJsonParseAction) # logging arguments - parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO') + parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO', type=str.upper) parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None) parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None) diff --git a/src/auto_archiver/enrichers/screenshot_enricher.py b/src/auto_archiver/enrichers/screenshot_enricher.py index 0d05d92..abb1e16 100644 --- a/src/auto_archiver/enrichers/screenshot_enricher.py +++ b/src/auto_archiver/enrichers/screenshot_enricher.py @@ -4,7 +4,7 @@ from selenium.common.exceptions import TimeoutException from auto_archiver.core import Enricher -from ..utils import Webdriver, UrlUtil, random_str +from ..utils import Webdriver, url as UrlUtil, random_str from ..core import Media, Metadata class ScreenshotEnricher(Enricher): diff --git a/src/auto_archiver/modules/generic_extractor/generic_extractor.py b/src/auto_archiver/modules/generic_extractor/generic_extractor.py index bc884a6..d1b1fb6 100644 --- a/src/auto_archiver/modules/generic_extractor/generic_extractor.py +++ b/src/auto_archiver/modules/generic_extractor/generic_extractor.py @@ -274,7 +274,7 @@ class GenericExtractor(Extractor): "max_downloads": self.max_downloads, "playlistend": self.max_downloads} # set up auth - auth = self.auth_for_site(url) + auth = self.auth_for_site(url, extract_cookies=False) # order of importance: username/pasword -> api_key -> cookie -> cookie_from_browser -> cookies_file if auth: if 'username' in auth and 'password' in auth: diff --git a/src/auto_archiver/modules/generic_extractor/twitter.py b/src/auto_archiver/modules/generic_extractor/twitter.py index 83c1f4f..3faed6b 100644 --- a/src/auto_archiver/modules/generic_extractor/twitter.py +++ b/src/auto_archiver/modules/generic_extractor/twitter.py @@ -5,7 +5,7 @@ from loguru import logger from slugify import slugify from auto_archiver.core.metadata import Metadata, Media -from auto_archiver.utils import UrlUtil +from auto_archiver.utils import url as UrlUtil from auto_archiver.core.extractor import Extractor from .dropin import GenericDropin, InfoExtractor diff --git a/src/auto_archiver/modules/screenshot_enricher/screenshot_enricher.py b/src/auto_archiver/modules/screenshot_enricher/screenshot_enricher.py index 8e7639a..e1da99d 100644 --- a/src/auto_archiver/modules/screenshot_enricher/screenshot_enricher.py +++ b/src/auto_archiver/modules/screenshot_enricher/screenshot_enricher.py @@ -6,7 +6,7 @@ from selenium.common.exceptions import TimeoutException from auto_archiver.core import Enricher -from auto_archiver.utils import Webdriver, UrlUtil, random_str +from auto_archiver.utils import Webdriver, url as UrlUtil, random_str from auto_archiver.core import Media, Metadata class ScreenshotEnricher(Enricher): @@ -19,7 +19,9 @@ class ScreenshotEnricher(Enricher): return logger.debug(f"Enriching screenshot for {url=}") - with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy, print_options=self.print_options) as driver: + auth = self.auth_for_site(url) + with Webdriver(self.width, self.height, self.timeout, facebook_accept_cookies='facebook.com' in url, + http_proxy=self.http_proxy, print_options=self.print_options, auth=auth) as driver: try: driver.get(url) time.sleep(int(self.sleep_before_screenshot)) diff --git a/src/auto_archiver/modules/wacz_enricher/wacz_enricher.py b/src/auto_archiver/modules/wacz_enricher/wacz_enricher.py index 3f67b7c..1586b75 100644 --- a/src/auto_archiver/modules/wacz_enricher/wacz_enricher.py +++ b/src/auto_archiver/modules/wacz_enricher/wacz_enricher.py @@ -7,7 +7,7 @@ from warcio.archiveiterator import ArchiveIterator from auto_archiver.core import Media, Metadata from auto_archiver.core import Extractor, Enricher -from auto_archiver.utils import UrlUtil, random_str +from auto_archiver.utils import url as UrlUtil, random_str class WaczExtractorEnricher(Enricher, Extractor): diff --git a/src/auto_archiver/modules/wayback_extractor_enricher/wayback_extractor_enricher.py b/src/auto_archiver/modules/wayback_extractor_enricher/wayback_extractor_enricher.py index 0e25440..1763b12 100644 --- a/src/auto_archiver/modules/wayback_extractor_enricher/wayback_extractor_enricher.py +++ b/src/auto_archiver/modules/wayback_extractor_enricher/wayback_extractor_enricher.py @@ -3,7 +3,7 @@ from loguru import logger import time, requests from auto_archiver.core import Extractor, Enricher -from auto_archiver.utils import UrlUtil +from auto_archiver.utils import url as UrlUtil from auto_archiver.core import Metadata class WaybackExtractorEnricher(Enricher, Extractor): diff --git a/src/auto_archiver/utils/__init__.py b/src/auto_archiver/utils/__init__.py index d2063d0..ed2d3bb 100644 --- a/src/auto_archiver/utils/__init__.py +++ b/src/auto_archiver/utils/__init__.py @@ -2,7 +2,6 @@ # we need to explicitly expose the available imports here from .misc import * from .webdriver import Webdriver -from .url import UrlUtil from .atlos import get_atlos_config_options # handy utils from ytdlp diff --git a/src/auto_archiver/utils/url.py b/src/auto_archiver/utils/url.py index 3b67514..40884da 100644 --- a/src/auto_archiver/utils/url.py +++ b/src/auto_archiver/utils/url.py @@ -1,83 +1,84 @@ import re from urllib.parse import urlparse, urlunparse -class UrlUtil: - AUTHWALL_URLS = [ - re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels - re.compile(r"https:\/\/www\.instagram\.com"), # instagram - ] +AUTHWALL_URLS = [ + re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels + re.compile(r"https:\/\/www\.instagram\.com"), # instagram +] - @staticmethod - def clean(url: str) -> str: return url +def domain_for_url(url: str) -> str: + """ + SECURITY: parse the domain using urllib to avoid any potential security issues + """ + return urlparse(url).netloc - @staticmethod - def is_auth_wall(url: str) -> bool: - """ - checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work - """ - for regex in UrlUtil.AUTHWALL_URLS: - if regex.match(url): - return True +def clean(url: str) -> str: + return url - return False +def is_auth_wall(url: str) -> bool: + """ + checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work + """ + for regex in AUTHWALL_URLS: + if regex.match(url): + return True - @staticmethod - def remove_get_parameters(url: str) -> str: - # http://example.com/file.mp4?t=1 -> http://example.com/file.mp4 - # useful for mimetypes to work - parsed_url = urlparse(url) - new_url = urlunparse(parsed_url._replace(query='')) - return new_url + return False - @staticmethod - def is_relevant_url(url: str) -> bool: - """ - Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc. - """ - clean_url = UrlUtil.remove_get_parameters(url) +def remove_get_parameters(url: str) -> str: + # http://example.com/file.mp4?t=1 -> http://example.com/file.mp4 + # useful for mimetypes to work + parsed_url = urlparse(url) + new_url = urlunparse(parsed_url._replace(query='')) + return new_url - # favicons - if "favicon" in url: return False - # ifnore icons - if clean_url.endswith(".ico"): return False - # ignore SVGs - if UrlUtil.remove_get_parameters(url).endswith(".svg"): return False +def is_relevant_url(url: str) -> bool: + """ + Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc. + """ + clean_url = remove_get_parameters(url) - # twitter profile pictures - if "twimg.com/profile_images" in url: return False - if "twimg.com" in url and "/default_profile_images" in url: return False + # favicons + if "favicon" in url: return False + # ifnore icons + if clean_url.endswith(".ico"): return False + # ignore SVGs + if remove_get_parameters(url).endswith(".svg"): return False - # instagram profile pictures - if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False - # instagram recurring images - if "https://static.cdninstagram.com/rsrc.php/" in url: return False + # twitter profile pictures + if "twimg.com/profile_images" in url: return False + if "twimg.com" in url and "/default_profile_images" in url: return False - # telegram - if "https://telegram.org/img/emoji/" in url: return False + # instagram profile pictures + if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False + # instagram recurring images + if "https://static.cdninstagram.com/rsrc.php/" in url: return False - # youtube - if "https://www.youtube.com/s/gaming/emoji/" in url: return False - if "https://yt3.ggpht.com" in url and "default-user=" in url: return False - if "https://www.youtube.com/s/search/audio/" in url: return False + # telegram + if "https://telegram.org/img/emoji/" in url: return False - # ok - if " https://ok.ru/res/i/" in url: return False + # youtube + if "https://www.youtube.com/s/gaming/emoji/" in url: return False + if "https://yt3.ggpht.com" in url and "default-user=" in url: return False + if "https://www.youtube.com/s/search/audio/" in url: return False - # vk - if "https://vk.com/emoji/" in url: return False - if "vk.com/images/" in url: return False - if "vk.com/images/reaction/" in url: return False + # ok + if " https://ok.ru/res/i/" in url: return False - # wikipedia - if "wikipedia.org/static" in url: return False + # vk + if "https://vk.com/emoji/" in url: return False + if "vk.com/images/" in url: return False + if "vk.com/images/reaction/" in url: return False - return True + # wikipedia + if "wikipedia.org/static" in url: return False - @staticmethod - def twitter_best_quality_url(url: str) -> str: - """ - some twitter image URLs point to a less-than best quality - this returns the URL pointing to the highest (original) quality - """ - return re.sub(r"name=(\w+)", "name=orig", url, 1) + return True + +def twitter_best_quality_url(url: str) -> str: + """ + some twitter image URLs point to a less-than best quality + this returns the URL pointing to the highest (original) quality + """ + return re.sub(r"name=(\w+)", "name=orig", url, 1) diff --git a/src/auto_archiver/utils/webdriver.py b/src/auto_archiver/utils/webdriver.py index cf84c35..efb1102 100644 --- a/src/auto_archiver/utils/webdriver.py +++ b/src/auto_archiver/utils/webdriver.py @@ -9,12 +9,72 @@ from loguru import logger from selenium.webdriver.common.by import By import time +#import domain_for_url +from urllib.parse import urlparse, urlunparse +from http.cookiejar import MozillaCookieJar +class CookieSettingDriver(webdriver.Firefox): + + facebook_accept_cookies: bool + cookies: str + cookiejar: MozillaCookieJar + + def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs): + super(CookieSettingDriver, self).__init__(*args, **kwargs) + self.cookies = cookies + self.cookiejar = cookiejar + self.facebook_accept_cookies = facebook_accept_cookies + + def get(self, url: str): + if self.cookies or self.cookiejar: + # set up the driver to make it not 'cookie averse' (needs a context/URL) + # get the 'robots.txt' file which should be quick and easy + robots_url = urlunparse(urlparse(url)._replace(path='/robots.txt', query='', fragment='')) + super(CookieSettingDriver, self).get(robots_url) + + if self.cookies: + # an explicit cookie is set for this site, use that first + for cookie in self.cookies.split(";"): + for name, value in cookie.split("="): + self.driver.add_cookie({'name': name, 'value': value}) + elif self.cookiejar: + domain = urlparse(url).netloc.lstrip("www.") + for cookie in self.cookiejar: + if domain in cookie.domain: + try: + self.add_cookie({ + 'name': cookie.name, + 'value': cookie.value, + 'path': cookie.path, + 'domain': cookie.domain, + 'secure': bool(cookie.secure), + 'expiry': cookie.expires + }) + except Exception as e: + logger.warning(f"Failed to add cookie to webdriver: {e}") + + if self.facebook_accept_cookies: + try: + logger.debug(f'Trying fb click accept cookie popup.') + super(CookieSettingDriver, self).get("http://www.facebook.com") + essential_only = self.find_element(By.XPATH, "//span[contains(text(), 'Decline optional cookies')]") + essential_only.click() + logger.debug(f'fb click worked') + # linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page + time.sleep(2) + except Exception as e: + logger.warning(f'Failed on fb accept cookies.', e) + # now get the actual URL + super(CookieSettingDriver, self).get(url) + class Webdriver: - def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "", print_options: dict = {}) -> webdriver: + def __init__(self, width: int, height: int, timeout_seconds: int, + facebook_accept_cookies: bool = False, http_proxy: str = "", + print_options: dict = {}, auth: dict = {}) -> webdriver: self.width = width self.height = height self.timeout_seconds = timeout_seconds + self.auth = auth self.facebook_accept_cookies = facebook_accept_cookies self.http_proxy = http_proxy # create and set print options @@ -23,32 +83,26 @@ class Webdriver: setattr(self.print_options, k, v) def __enter__(self) -> webdriver: + options = webdriver.FirefoxOptions() - options.add_argument("--headless") + # options.add_argument("--headless") options.add_argument(f'--proxy-server={self.http_proxy}') options.set_preference('network.protocol-handler.external.tg', False) + # if facebook cookie popup is present, force the browser to English since then it's easier to click the 'Decline optional cookies' option + if self.facebook_accept_cookies: + options.add_argument('--lang=en') + try: - self.driver = webdriver.Firefox(options=options) + self.driver = CookieSettingDriver(cookies=self.auth.get('cookies'), cookiejar=self.auth.get('cookies_jar'), + facebook_accept_cookies=self.facebook_accept_cookies, options=options) self.driver.set_window_size(self.width, self.height) self.driver.set_page_load_timeout(self.timeout_seconds) self.driver.print_options = self.print_options except TimeoutException as e: logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}") - if self.facebook_accept_cookies: - try: - logger.debug(f'Trying fb click accept cookie popup.') - self.driver.get("http://www.facebook.com") - foo = self.driver.find_element(By.XPATH, "//button[@data-cookiebanner='accept_only_essential_button']") - foo.click() - logger.debug(f'fb click worked') - # linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page - time.sleep(2) - except: - logger.warning(f'Failed on fb accept cookies.') - return self.driver - + def __exit__(self, exc_type, exc_val, exc_tb): self.driver.close() self.driver.quit()