mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-12 21:28:29 +03:00
Set up screenshot enricher to use authentication/cookies
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
# we need to explicitly expose the available imports here
|
||||
from .misc import *
|
||||
from .webdriver import Webdriver
|
||||
from .url import UrlUtil
|
||||
from .atlos import get_atlos_config_options
|
||||
|
||||
# handy utils from ytdlp
|
||||
|
||||
@@ -1,83 +1,84 @@
|
||||
import re
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
class UrlUtil:
|
||||
|
||||
AUTHWALL_URLS = [
|
||||
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
|
||||
]
|
||||
AUTHWALL_URLS = [
|
||||
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def clean(url: str) -> str: return url
|
||||
def domain_for_url(url: str) -> str:
|
||||
"""
|
||||
SECURITY: parse the domain using urllib to avoid any potential security issues
|
||||
"""
|
||||
return urlparse(url).netloc
|
||||
|
||||
@staticmethod
|
||||
def is_auth_wall(url: str) -> bool:
|
||||
"""
|
||||
checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work
|
||||
"""
|
||||
for regex in UrlUtil.AUTHWALL_URLS:
|
||||
if regex.match(url):
|
||||
return True
|
||||
def clean(url: str) -> str:
|
||||
return url
|
||||
|
||||
return False
|
||||
def is_auth_wall(url: str) -> bool:
|
||||
"""
|
||||
checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work
|
||||
"""
|
||||
for regex in AUTHWALL_URLS:
|
||||
if regex.match(url):
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def remove_get_parameters(url: str) -> str:
|
||||
# http://example.com/file.mp4?t=1 -> http://example.com/file.mp4
|
||||
# useful for mimetypes to work
|
||||
parsed_url = urlparse(url)
|
||||
new_url = urlunparse(parsed_url._replace(query=''))
|
||||
return new_url
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def is_relevant_url(url: str) -> bool:
|
||||
"""
|
||||
Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc.
|
||||
"""
|
||||
clean_url = UrlUtil.remove_get_parameters(url)
|
||||
def remove_get_parameters(url: str) -> str:
|
||||
# http://example.com/file.mp4?t=1 -> http://example.com/file.mp4
|
||||
# useful for mimetypes to work
|
||||
parsed_url = urlparse(url)
|
||||
new_url = urlunparse(parsed_url._replace(query=''))
|
||||
return new_url
|
||||
|
||||
# favicons
|
||||
if "favicon" in url: return False
|
||||
# ifnore icons
|
||||
if clean_url.endswith(".ico"): return False
|
||||
# ignore SVGs
|
||||
if UrlUtil.remove_get_parameters(url).endswith(".svg"): return False
|
||||
def is_relevant_url(url: str) -> bool:
|
||||
"""
|
||||
Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc.
|
||||
"""
|
||||
clean_url = remove_get_parameters(url)
|
||||
|
||||
# twitter profile pictures
|
||||
if "twimg.com/profile_images" in url: return False
|
||||
if "twimg.com" in url and "/default_profile_images" in url: return False
|
||||
# favicons
|
||||
if "favicon" in url: return False
|
||||
# ifnore icons
|
||||
if clean_url.endswith(".ico"): return False
|
||||
# ignore SVGs
|
||||
if remove_get_parameters(url).endswith(".svg"): return False
|
||||
|
||||
# instagram profile pictures
|
||||
if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False
|
||||
# instagram recurring images
|
||||
if "https://static.cdninstagram.com/rsrc.php/" in url: return False
|
||||
# twitter profile pictures
|
||||
if "twimg.com/profile_images" in url: return False
|
||||
if "twimg.com" in url and "/default_profile_images" in url: return False
|
||||
|
||||
# telegram
|
||||
if "https://telegram.org/img/emoji/" in url: return False
|
||||
# instagram profile pictures
|
||||
if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False
|
||||
# instagram recurring images
|
||||
if "https://static.cdninstagram.com/rsrc.php/" in url: return False
|
||||
|
||||
# youtube
|
||||
if "https://www.youtube.com/s/gaming/emoji/" in url: return False
|
||||
if "https://yt3.ggpht.com" in url and "default-user=" in url: return False
|
||||
if "https://www.youtube.com/s/search/audio/" in url: return False
|
||||
# telegram
|
||||
if "https://telegram.org/img/emoji/" in url: return False
|
||||
|
||||
# ok
|
||||
if " https://ok.ru/res/i/" in url: return False
|
||||
# youtube
|
||||
if "https://www.youtube.com/s/gaming/emoji/" in url: return False
|
||||
if "https://yt3.ggpht.com" in url and "default-user=" in url: return False
|
||||
if "https://www.youtube.com/s/search/audio/" in url: return False
|
||||
|
||||
# vk
|
||||
if "https://vk.com/emoji/" in url: return False
|
||||
if "vk.com/images/" in url: return False
|
||||
if "vk.com/images/reaction/" in url: return False
|
||||
# ok
|
||||
if " https://ok.ru/res/i/" in url: return False
|
||||
|
||||
# wikipedia
|
||||
if "wikipedia.org/static" in url: return False
|
||||
# vk
|
||||
if "https://vk.com/emoji/" in url: return False
|
||||
if "vk.com/images/" in url: return False
|
||||
if "vk.com/images/reaction/" in url: return False
|
||||
|
||||
return True
|
||||
# wikipedia
|
||||
if "wikipedia.org/static" in url: return False
|
||||
|
||||
@staticmethod
|
||||
def twitter_best_quality_url(url: str) -> str:
|
||||
"""
|
||||
some twitter image URLs point to a less-than best quality
|
||||
this returns the URL pointing to the highest (original) quality
|
||||
"""
|
||||
return re.sub(r"name=(\w+)", "name=orig", url, 1)
|
||||
return True
|
||||
|
||||
def twitter_best_quality_url(url: str) -> str:
|
||||
"""
|
||||
some twitter image URLs point to a less-than best quality
|
||||
this returns the URL pointing to the highest (original) quality
|
||||
"""
|
||||
return re.sub(r"name=(\w+)", "name=orig", url, 1)
|
||||
|
||||
@@ -9,12 +9,72 @@ from loguru import logger
|
||||
from selenium.webdriver.common.by import By
|
||||
import time
|
||||
|
||||
#import domain_for_url
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from http.cookiejar import MozillaCookieJar
|
||||
|
||||
class CookieSettingDriver(webdriver.Firefox):
|
||||
|
||||
facebook_accept_cookies: bool
|
||||
cookies: str
|
||||
cookiejar: MozillaCookieJar
|
||||
|
||||
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
|
||||
super(CookieSettingDriver, self).__init__(*args, **kwargs)
|
||||
self.cookies = cookies
|
||||
self.cookiejar = cookiejar
|
||||
self.facebook_accept_cookies = facebook_accept_cookies
|
||||
|
||||
def get(self, url: str):
|
||||
if self.cookies or self.cookiejar:
|
||||
# set up the driver to make it not 'cookie averse' (needs a context/URL)
|
||||
# get the 'robots.txt' file which should be quick and easy
|
||||
robots_url = urlunparse(urlparse(url)._replace(path='/robots.txt', query='', fragment=''))
|
||||
super(CookieSettingDriver, self).get(robots_url)
|
||||
|
||||
if self.cookies:
|
||||
# an explicit cookie is set for this site, use that first
|
||||
for cookie in self.cookies.split(";"):
|
||||
for name, value in cookie.split("="):
|
||||
self.driver.add_cookie({'name': name, 'value': value})
|
||||
elif self.cookiejar:
|
||||
domain = urlparse(url).netloc.lstrip("www.")
|
||||
for cookie in self.cookiejar:
|
||||
if domain in cookie.domain:
|
||||
try:
|
||||
self.add_cookie({
|
||||
'name': cookie.name,
|
||||
'value': cookie.value,
|
||||
'path': cookie.path,
|
||||
'domain': cookie.domain,
|
||||
'secure': bool(cookie.secure),
|
||||
'expiry': cookie.expires
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to add cookie to webdriver: {e}")
|
||||
|
||||
if self.facebook_accept_cookies:
|
||||
try:
|
||||
logger.debug(f'Trying fb click accept cookie popup.')
|
||||
super(CookieSettingDriver, self).get("http://www.facebook.com")
|
||||
essential_only = self.find_element(By.XPATH, "//span[contains(text(), 'Decline optional cookies')]")
|
||||
essential_only.click()
|
||||
logger.debug(f'fb click worked')
|
||||
# linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
|
||||
time.sleep(2)
|
||||
except Exception as e:
|
||||
logger.warning(f'Failed on fb accept cookies.', e)
|
||||
# now get the actual URL
|
||||
super(CookieSettingDriver, self).get(url)
|
||||
|
||||
class Webdriver:
|
||||
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "", print_options: dict = {}) -> webdriver:
|
||||
def __init__(self, width: int, height: int, timeout_seconds: int,
|
||||
facebook_accept_cookies: bool = False, http_proxy: str = "",
|
||||
print_options: dict = {}, auth: dict = {}) -> webdriver:
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.timeout_seconds = timeout_seconds
|
||||
self.auth = auth
|
||||
self.facebook_accept_cookies = facebook_accept_cookies
|
||||
self.http_proxy = http_proxy
|
||||
# create and set print options
|
||||
@@ -23,32 +83,26 @@ class Webdriver:
|
||||
setattr(self.print_options, k, v)
|
||||
|
||||
def __enter__(self) -> webdriver:
|
||||
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.add_argument("--headless")
|
||||
# options.add_argument("--headless")
|
||||
options.add_argument(f'--proxy-server={self.http_proxy}')
|
||||
options.set_preference('network.protocol-handler.external.tg', False)
|
||||
# if facebook cookie popup is present, force the browser to English since then it's easier to click the 'Decline optional cookies' option
|
||||
if self.facebook_accept_cookies:
|
||||
options.add_argument('--lang=en')
|
||||
|
||||
try:
|
||||
self.driver = webdriver.Firefox(options=options)
|
||||
self.driver = CookieSettingDriver(cookies=self.auth.get('cookies'), cookiejar=self.auth.get('cookies_jar'),
|
||||
facebook_accept_cookies=self.facebook_accept_cookies, options=options)
|
||||
self.driver.set_window_size(self.width, self.height)
|
||||
self.driver.set_page_load_timeout(self.timeout_seconds)
|
||||
self.driver.print_options = self.print_options
|
||||
except TimeoutException as e:
|
||||
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
|
||||
|
||||
if self.facebook_accept_cookies:
|
||||
try:
|
||||
logger.debug(f'Trying fb click accept cookie popup.')
|
||||
self.driver.get("http://www.facebook.com")
|
||||
foo = self.driver.find_element(By.XPATH, "//button[@data-cookiebanner='accept_only_essential_button']")
|
||||
foo.click()
|
||||
logger.debug(f'fb click worked')
|
||||
# linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
|
||||
time.sleep(2)
|
||||
except:
|
||||
logger.warning(f'Failed on fb accept cookies.')
|
||||
|
||||
return self.driver
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.driver.close()
|
||||
self.driver.quit()
|
||||
|
||||
Reference in New Issue
Block a user