mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-12 05:08:28 +03:00
Merge branch 'main' into webdriver-cookies
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
""" Auto Archiver Utilities. """
|
||||
"""Auto Archiver Utilities."""
|
||||
|
||||
# we need to explicitly expose the available imports here
|
||||
from .misc import *
|
||||
from .webdriver import Webdriver
|
||||
|
||||
# handy utils from ytdlp
|
||||
from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none, url_or_none)
|
||||
from yt_dlp.utils import clean_html, traverse_obj, strip_or_none, url_or_none
|
||||
|
||||
@@ -16,22 +16,23 @@ def mkdir_if_not_exists(folder):
|
||||
|
||||
def expand_url(url):
|
||||
# expand short URL links
|
||||
if 'https://t.co/' in url:
|
||||
if "https://t.co/" in url:
|
||||
try:
|
||||
r = requests.get(url)
|
||||
logger.debug(f'Expanded url {url} to {r.url}')
|
||||
logger.debug(f"Expanded url {url} to {r.url}")
|
||||
return r.url
|
||||
except:
|
||||
logger.error(f'Failed to expand url {url}')
|
||||
except Exception:
|
||||
logger.error(f"Failed to expand url {url}")
|
||||
return url
|
||||
|
||||
|
||||
def getattr_or(o: object, prop: str, default=None):
|
||||
try:
|
||||
res = getattr(o, prop)
|
||||
if res is None: raise
|
||||
if res is None:
|
||||
raise
|
||||
return res
|
||||
except:
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
|
||||
@@ -61,18 +62,19 @@ def random_str(length: int = 32) -> str:
|
||||
return str(uuid.uuid4()).replace("-", "")[:length]
|
||||
|
||||
|
||||
def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: int = 16000000) -> str:
|
||||
def calculate_file_hash(filename: str, hash_algo=hashlib.sha256, chunksize: int = 16000000) -> str:
|
||||
hash = hash_algo()
|
||||
with open(filename, "rb") as f:
|
||||
while True:
|
||||
buf = f.read(chunksize)
|
||||
if not buf: break
|
||||
if not buf:
|
||||
break
|
||||
hash.update(buf)
|
||||
return hash.hexdigest()
|
||||
|
||||
|
||||
def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) -> datetime | None:
|
||||
""" parse a datetime string with option of passing a specific format
|
||||
"""parse a datetime string with option of passing a specific format
|
||||
|
||||
Args:
|
||||
dt_str: the datetime string to parse
|
||||
@@ -88,19 +90,24 @@ def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) ->
|
||||
|
||||
|
||||
def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | None:
|
||||
""" Consistent parsing of timestamps.
|
||||
"""Consistent parsing of timestamps.
|
||||
Args:
|
||||
If utc=True, the timezone is set to UTC,
|
||||
if iso=True, the output is an iso string
|
||||
Use dayfirst to signify between date formats which put the date vs month first:
|
||||
e.g. DD/MM/YYYY vs MM/DD/YYYY
|
||||
"""
|
||||
if not ts: return
|
||||
"""
|
||||
if not ts:
|
||||
return
|
||||
try:
|
||||
if isinstance(ts, str): ts = parse_dt(ts, dayfirst=dayfirst)
|
||||
if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts)
|
||||
if utc: ts = ts.replace(tzinfo=timezone.utc)
|
||||
if iso: return ts.isoformat()
|
||||
if isinstance(ts, str):
|
||||
ts = parse_dt(ts, dayfirst=dayfirst)
|
||||
if isinstance(ts, (int, float)):
|
||||
ts = datetime.fromtimestamp(ts)
|
||||
if utc:
|
||||
ts = ts.replace(tzinfo=timezone.utc)
|
||||
if iso:
|
||||
return ts.isoformat()
|
||||
return ts
|
||||
except Exception as e:
|
||||
logger.error(f"Unable to parse timestamp {ts}: {e}")
|
||||
|
||||
@@ -4,8 +4,8 @@ from ipaddress import ip_address
|
||||
|
||||
|
||||
AUTHWALL_URLS = [
|
||||
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
|
||||
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
|
||||
]
|
||||
|
||||
|
||||
@@ -14,17 +14,16 @@ def check_url_or_raise(url: str) -> bool | ValueError:
|
||||
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
|
||||
"""
|
||||
|
||||
|
||||
if not (url.startswith("http://") or url.startswith("https://")):
|
||||
raise ValueError(f"Invalid URL scheme for url {url}")
|
||||
|
||||
|
||||
parsed = urlparse(url)
|
||||
if not parsed.hostname:
|
||||
raise ValueError(f"Invalid URL hostname for url {url}")
|
||||
|
||||
|
||||
if parsed.hostname == "localhost":
|
||||
raise ValueError(f"Localhost URLs cannot be parsed for security reasons (for url {url})")
|
||||
|
||||
|
||||
if parsed.scheme not in ["http", "https"]:
|
||||
raise ValueError(f"Invalid URL scheme, only http and https supported (for url {url})")
|
||||
|
||||
@@ -32,7 +31,7 @@ def check_url_or_raise(url: str) -> bool | ValueError:
|
||||
ip = ip_address(parsed.hostname)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
else:
|
||||
if not ip.is_global:
|
||||
raise ValueError(f"IP address {ip} is not globally reachable")
|
||||
@@ -42,18 +41,21 @@ def check_url_or_raise(url: str) -> bool | ValueError:
|
||||
raise ValueError(f"Link-local IP address {ip} used")
|
||||
if ip.is_private:
|
||||
raise ValueError(f"Private IP address {ip} used")
|
||||
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def domain_for_url(url: str) -> str:
|
||||
"""
|
||||
SECURITY: parse the domain using urllib to avoid any potential security issues
|
||||
"""
|
||||
return urlparse(url).netloc
|
||||
|
||||
|
||||
def clean(url: str) -> str:
|
||||
return url
|
||||
|
||||
|
||||
def is_auth_wall(url: str) -> bool:
|
||||
"""
|
||||
checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work
|
||||
@@ -64,13 +66,15 @@ def is_auth_wall(url: str) -> bool:
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def remove_get_parameters(url: str) -> str:
|
||||
# http://example.com/file.mp4?t=1 -> http://example.com/file.mp4
|
||||
# useful for mimetypes to work
|
||||
parsed_url = urlparse(url)
|
||||
new_url = urlunparse(parsed_url._replace(query=''))
|
||||
new_url = urlunparse(parsed_url._replace(query=""))
|
||||
return new_url
|
||||
|
||||
|
||||
def is_relevant_url(url: str) -> bool:
|
||||
"""
|
||||
Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc.
|
||||
@@ -78,42 +82,59 @@ def is_relevant_url(url: str) -> bool:
|
||||
clean_url = remove_get_parameters(url)
|
||||
|
||||
# favicons
|
||||
if "favicon" in url: return False
|
||||
if "favicon" in url:
|
||||
return False
|
||||
# ifnore icons
|
||||
if clean_url.endswith(".ico"): return False
|
||||
if clean_url.endswith(".ico"):
|
||||
return False
|
||||
# ignore SVGs
|
||||
if remove_get_parameters(url).endswith(".svg"): return False
|
||||
if remove_get_parameters(url).endswith(".svg"):
|
||||
return False
|
||||
|
||||
# twitter profile pictures
|
||||
if "twimg.com/profile_images" in url: return False
|
||||
if "twimg.com" in url and "/default_profile_images" in url: return False
|
||||
if "twimg.com/profile_images" in url:
|
||||
return False
|
||||
if "twimg.com" in url and "/default_profile_images" in url:
|
||||
return False
|
||||
|
||||
# instagram profile pictures
|
||||
if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False
|
||||
if "https://scontent.cdninstagram.com/" in url and "150x150" in url:
|
||||
return False
|
||||
# instagram recurring images
|
||||
if "https://static.cdninstagram.com/rsrc.php/" in url: return False
|
||||
if "https://static.cdninstagram.com/rsrc.php/" in url:
|
||||
return False
|
||||
|
||||
# telegram
|
||||
if "https://telegram.org/img/emoji/" in url: return False
|
||||
if "https://telegram.org/img/emoji/" in url:
|
||||
return False
|
||||
|
||||
# youtube
|
||||
if "https://www.youtube.com/s/gaming/emoji/" in url: return False
|
||||
if "https://yt3.ggpht.com" in url and "default-user=" in url: return False
|
||||
if "https://www.youtube.com/s/search/audio/" in url: return False
|
||||
if "https://www.youtube.com/s/gaming/emoji/" in url:
|
||||
return False
|
||||
if "https://yt3.ggpht.com" in url and "default-user=" in url:
|
||||
return False
|
||||
if "https://www.youtube.com/s/search/audio/" in url:
|
||||
return False
|
||||
|
||||
# ok
|
||||
if " https://ok.ru/res/i/" in url: return False
|
||||
if " https://ok.ru/res/i/" in url:
|
||||
return False
|
||||
|
||||
# vk
|
||||
if "https://vk.com/emoji/" in url: return False
|
||||
if "vk.com/images/" in url: return False
|
||||
if "vk.com/images/reaction/" in url: return False
|
||||
if "https://vk.com/emoji/" in url:
|
||||
return False
|
||||
if "vk.com/images/" in url:
|
||||
return False
|
||||
if "vk.com/images/reaction/" in url:
|
||||
return False
|
||||
|
||||
# wikipedia
|
||||
if "wikipedia.org/static" in url: return False
|
||||
if "wikipedia.org/static" in url:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def twitter_best_quality_url(url: str) -> str:
|
||||
"""
|
||||
some twitter image URLs point to a less-than best quality
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
""" This Webdriver class acts as a context manager for the selenium webdriver. """
|
||||
"""This Webdriver class acts as a context manager for the selenium webdriver."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
|
||||
#import domain_for_url
|
||||
# import domain_for_url
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from http.cookiejar import MozillaCookieJar
|
||||
|
||||
@@ -20,16 +21,15 @@ from loguru import logger
|
||||
|
||||
|
||||
class CookieSettingDriver(webdriver.Firefox):
|
||||
|
||||
facebook_accept_cookies: bool
|
||||
cookies: str
|
||||
cookiejar: MozillaCookieJar
|
||||
|
||||
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
|
||||
if os.environ.get('RUNNING_IN_DOCKER'):
|
||||
if os.environ.get("RUNNING_IN_DOCKER"):
|
||||
# Selenium doesn't support linux-aarch64 driver, we need to set this manually
|
||||
kwargs['service'] = webdriver.FirefoxService(executable_path='/usr/local/bin/geckodriver')
|
||||
|
||||
kwargs["service"] = webdriver.FirefoxService(executable_path="/usr/local/bin/geckodriver")
|
||||
|
||||
super(CookieSettingDriver, self).__init__(*args, **kwargs)
|
||||
self.cookies = cookies
|
||||
self.cookiejar = cookiejar
|
||||
@@ -39,38 +39,44 @@ class CookieSettingDriver(webdriver.Firefox):
|
||||
if self.cookies or self.cookiejar:
|
||||
# set up the driver to make it not 'cookie averse' (needs a context/URL)
|
||||
# get the 'robots.txt' file which should be quick and easy
|
||||
robots_url = urlunparse(urlparse(url)._replace(path='/robots.txt', query='', fragment=''))
|
||||
robots_url = urlunparse(urlparse(url)._replace(path="/robots.txt", query="", fragment=""))
|
||||
super(CookieSettingDriver, self).get(robots_url)
|
||||
|
||||
if self.cookies:
|
||||
# an explicit cookie is set for this site, use that first
|
||||
for cookie in self.cookies.split(";"):
|
||||
for name, value in cookie.split("="):
|
||||
self.driver.add_cookie({'name': name, 'value': value})
|
||||
self.driver.add_cookie({"name": name, "value": value})
|
||||
elif self.cookiejar:
|
||||
domain = urlparse(url).netloc
|
||||
regex = re.compile(f"(www)?\.?{domain}$")
|
||||
for cookie in self.cookiejar:
|
||||
if regex.match(cookie.domain):
|
||||
try:
|
||||
self.add_cookie({
|
||||
'name': cookie.name,
|
||||
'value': cookie.value,
|
||||
'path': cookie.path,
|
||||
'domain': cookie.domain,
|
||||
'secure': bool(cookie.secure),
|
||||
'expiry': cookie.expires
|
||||
})
|
||||
self.add_cookie(
|
||||
{
|
||||
"name": cookie.name,
|
||||
"value": cookie.value,
|
||||
"path": cookie.path,
|
||||
"domain": cookie.domain,
|
||||
"secure": bool(cookie.secure),
|
||||
"expiry": cookie.expires,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to add cookie ({cookie.domain}) to webdriver for url {domain}: {e}")
|
||||
|
||||
|
||||
|
||||
super(CookieSettingDriver, self).get(url)
|
||||
time.sleep(2)
|
||||
|
||||
# Try and use some common button text to reject/accept cookies
|
||||
for text in ["Refuse non-essential cookies", "Decline optional cookies", "Reject additional cookies", "Reject all", "Accept all cookies"]:
|
||||
for text in [
|
||||
"Refuse non-essential cookies",
|
||||
"Decline optional cookies",
|
||||
"Reject additional cookies",
|
||||
"Reject all",
|
||||
"Accept all cookies",
|
||||
]:
|
||||
try:
|
||||
xpath = f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text.lower()}')]"
|
||||
self.find_element(By.XPATH, xpath).click()
|
||||
@@ -89,11 +95,34 @@ class CookieSettingDriver(webdriver.Firefox):
|
||||
logger.warning("Unable to find the 'close' button on the facebook login window")
|
||||
pass
|
||||
|
||||
|
||||
else:
|
||||
# for all other sites, try and use some common button text to reject/accept cookies
|
||||
for text in [
|
||||
"Refuse non-essential cookies",
|
||||
"Decline optional cookies",
|
||||
"Reject additional cookies",
|
||||
"Reject all",
|
||||
"Accept all cookies",
|
||||
]:
|
||||
try:
|
||||
xpath = f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text.lower()}')]"
|
||||
WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
|
||||
break
|
||||
except selenium_exceptions.WebDriverException:
|
||||
pass
|
||||
|
||||
|
||||
class Webdriver:
|
||||
def __init__(self, width: int, height: int, timeout_seconds: int,
|
||||
facebook_accept_cookies: bool = False, http_proxy: str = "",
|
||||
print_options: dict = {}, auth: dict = {}) -> webdriver:
|
||||
def __init__(
|
||||
self,
|
||||
width: int,
|
||||
height: int,
|
||||
timeout_seconds: int,
|
||||
facebook_accept_cookies: bool = False,
|
||||
http_proxy: str = "",
|
||||
print_options: dict = {},
|
||||
auth: dict = {},
|
||||
) -> webdriver:
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.timeout_seconds = timeout_seconds
|
||||
@@ -108,23 +137,29 @@ class Webdriver:
|
||||
def __enter__(self) -> webdriver:
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.add_argument("--headless")
|
||||
options.add_argument(f'--proxy-server={self.http_proxy}')
|
||||
options.set_preference('network.protocol-handler.external.tg', False)
|
||||
options.add_argument(f"--proxy-server={self.http_proxy}")
|
||||
options.set_preference("network.protocol-handler.external.tg", False)
|
||||
# if facebook cookie popup is present, force the browser to English since then it's easier to click the 'Decline optional cookies' option
|
||||
if self.facebook_accept_cookies:
|
||||
options.add_argument('--lang=en')
|
||||
options.add_argument("--lang=en")
|
||||
|
||||
try:
|
||||
self.driver = CookieSettingDriver(cookies=self.auth.get('cookies'), cookiejar=self.auth.get('cookies_jar'),
|
||||
facebook_accept_cookies=self.facebook_accept_cookies, options=options)
|
||||
self.driver = CookieSettingDriver(
|
||||
cookies=self.auth.get("cookies"),
|
||||
cookiejar=self.auth.get("cookies_jar"),
|
||||
facebook_accept_cookies=self.facebook_accept_cookies,
|
||||
options=options,
|
||||
)
|
||||
self.driver.set_window_size(self.width, self.height)
|
||||
self.driver.set_page_load_timeout(self.timeout_seconds)
|
||||
self.driver.print_options = self.print_options
|
||||
except selenium_exceptions.TimeoutException as e:
|
||||
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
|
||||
logger.error(
|
||||
f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}"
|
||||
)
|
||||
|
||||
return self.driver
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.driver.close()
|
||||
self.driver.quit()
|
||||
|
||||
Reference in New Issue
Block a user