mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-09 20:08:27 +03:00
Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01516724d3 | ||
|
|
a066bf4ca9 | ||
|
|
2233af81f7 | ||
|
|
aacb874b56 | ||
|
|
4b5a8c0199 | ||
|
|
14c56f4916 | ||
|
|
5b131996c6 | ||
|
|
168dfb6254 | ||
|
|
42e16aebd6 | ||
|
|
d6d5a08204 | ||
|
|
e6c5705f70 | ||
|
|
613ba0c05d | ||
|
|
b997bbea2b | ||
|
|
54f53886ef | ||
|
|
0a5ba3385e | ||
|
|
034857075d | ||
|
|
6700250891 | ||
|
|
5e5e1c43a1 | ||
|
|
1e19ad77c6 | ||
|
|
f22af5e123 | ||
|
|
799cef3a8c | ||
|
|
2921061fde | ||
|
|
e531906d73 | ||
|
|
244341d22c | ||
|
|
90932a7bc8 | ||
|
|
488675056b | ||
|
|
a577228465 | ||
|
|
f6863b8eb2 | ||
|
|
5c34ac1293 | ||
|
|
7d972ee9b8 | ||
|
|
b64826dc16 | ||
|
|
23e74803ee | ||
|
|
d03ecdb037 | ||
|
|
a5ebbf4726 | ||
|
|
89e387030d | ||
|
|
8ec053ed1b | ||
|
|
29db537fab | ||
|
|
c4a3a45bf7 | ||
|
|
3ea02c115e | ||
|
|
3d4056ef70 | ||
|
|
51041bf91e | ||
|
|
0765640bff | ||
|
|
06b1f4c0ca | ||
|
|
59b910ec30 | ||
|
|
7e360240bf | ||
|
|
f8e846d59a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -4,6 +4,7 @@ temp/
|
||||
.DS_Store
|
||||
expmt/
|
||||
service_account.json
|
||||
service_account-*.json
|
||||
__pycache__/
|
||||
._*
|
||||
anu.html
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
auto-archiver:
|
||||
@@ -10,7 +9,4 @@ services:
|
||||
volumes:
|
||||
- ./secrets:/app/secrets
|
||||
- ./local_archive:/app/local_archive
|
||||
environment:
|
||||
- WACZ_ENABLE_DOCKER=true
|
||||
- RUNNING_IN_DOCKER=true
|
||||
command: --config secrets/orchestration.yaml
|
||||
|
||||
@@ -6,12 +6,43 @@ This guide explains how to set up Google Sheets to process URLs automatically an
|
||||
2. Setting up a service account so Auto Archiver can access the sheet
|
||||
3. Setting the Auto Archiver settings
|
||||
|
||||
### 1. Setting up your Google Sheet
|
||||
|
||||
Any Google sheet must have at least *one* column, with the name 'link' (you can change this name afterwards). This is the column with the URLs that you want the Auto Archiver to archive.
|
||||
Your sheet can have many other columns that the Auto Archiver can use, and you can also include any additional columns for your own personal use. The order of the columns does not matter, the naming just needs to be correctly assigned to its corresponding value in the configuration file.
|
||||
## 1. Setting up a Google Service Account
|
||||
|
||||
We recommend copying [this template Google Sheet](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?usp=sharing) as a starting point for your project, as this matches the default column names.
|
||||
Once your Google Sheet is set up, you need to create what's called a 'service account' that will allow the Auto Archiver to access it.
|
||||
|
||||
To do this, you can either:
|
||||
* a) follow the steps in [this guide](https://gspread.readthedocs.io/en/latest/oauth2.html) all the way up until step 8. You should have downloaded a file called `service_account.json` and should save it in the `secrets/` folder
|
||||
* b) run the following script to automatically generate the file:
|
||||
```{code} bash
|
||||
https://raw.githubusercontent.com/bellingcat/auto-archiver/refs/heads/main/scripts/generate_google_services.sh | bash -s --
|
||||
```
|
||||
This uses gcloud to create a new project, a new user and downloads the service account automatically for you. The service account file will have the name `service_account-XXXXXXX.json` where XXXXXXX is a random 16 letter/digit string for the project created.
|
||||
|
||||
```{note}
|
||||
To save the generated file to a different folder, pass an argument as follows:
|
||||
```{code} bash
|
||||
https://raw.githubusercontent.com/bellingcat/auto-archiver/refs/heads/main/scripts/generate_google_services.sh | bash -s -- /path/to/secrets
|
||||
```
|
||||
|
||||
----------
|
||||
|
||||
Once you've downloaded the file, you can save it to `secrets/service_account.json` (the default name), or to another file and then change the location in the settings (see step 4).
|
||||
|
||||
Also make sure to **note down** the email address for this service account. You'll need that for step 3.
|
||||
|
||||
```{note}
|
||||
The email address created in this step can be found either by opening the `service_account.json` file, or if you used b) the `generate_google_services.sh` script, then the script will have printed it out for you.
|
||||
|
||||
The email address will look something like `user@project-name.iam.gserviceaccount.com`
|
||||
```
|
||||
|
||||
|
||||
## 2. Setting up your Google Sheet
|
||||
|
||||
We recommend copying [this template Google Sheet](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?usp=sharing) as a starting point for your project, as this matches all the columns required.
|
||||
|
||||
But if you like, you can also create your own custom sheet. The only columns required are 'link', 'archive status', and 'archive location'. 'link' is the column with the URLs that you want the Auto Archiver to archive, the other two record the archival status and result.
|
||||
|
||||
Here's an overview of all the columns, and what a complete sheet would look like.
|
||||
|
||||
@@ -46,21 +77,18 @@ In this example the Ghseet Feeder and Gsheet DB are being used, and the archive
|
||||
|
||||

|
||||
|
||||
We'll change the name of the 'Destination Folder' column in step 3.
|
||||
We'll change the name of the 'Destination Folder' column in the Step 4a.
|
||||
|
||||
## 2. Setting up your Service Account
|
||||
## 3. Share your Google Sheet with your Service Account email address
|
||||
|
||||
Once your Google Sheet is set up, you need to create what's called a 'service account' that will allow the Auto Archiver to access it.
|
||||
Remember that email address you copied in Step 1? Now that you've set up your Google sheet, click 'Share' in the top
|
||||
right hand corner and enter the email address. Make sure to give the account **Editor** access. Here's how that looks:
|
||||
|
||||
To do this, follow the steps in [this guide](https://gspread.readthedocs.io/en/latest/oauth2.html) all the way up until step 8. You should have downloaded a file called `service_account.json` and shared the Google Sheet with the log 'client_email' email address in this file.
|
||||

|
||||
|
||||
Once you've downloaded the file, save it to `secrets/service_account.json`
|
||||
## 4. Setting up the configuration file
|
||||
|
||||
## 3. Setting up the configuration file
|
||||
|
||||
Now that you've set up your Google sheet, and you've set up the service account so Auto Archiver can access the sheet, the final step is to set your configuration.
|
||||
|
||||
First, make sure you have `gsheet_feeder_db` set in the `steps.feeders` section of your config. If you wish to store the results of the archiving process back in your Google sheet, make sure to also set the `ghseet_db` settig in the `steps.databases` section. Here's how this might look:
|
||||
The final step is to set your configuration. First, make sure you have `gsheet_feeder_db` set in the `steps.feeders` section of your config. If you wish to store the results of the archiving process back in your Google sheet, make sure to also put `gsheet_feeder_db` setting in the `steps.databases` section. Here's how this might look:
|
||||
|
||||
```{code} yaml
|
||||
steps:
|
||||
@@ -75,12 +103,15 @@ steps:
|
||||
Next, set up the `gsheet_feeder_db` configuration settings in the 'Configurations' part of the config `orchestration.yaml` file. Open up the file, and set the `gsheet_feeder_db.sheet` setting or the `gsheet_feeder_db.sheet_id` setting. The `sheet` should be the name of your sheet, as it shows in the top left of the sheet.
|
||||
For example, the sheet [here](https://docs.google.com/spreadsheets/d/1NJZo_XZUBKTI1Ghlgi4nTPVvCfb0HXAs6j5tNGas72k/edit?gid=0#gid=0) is called 'Public Auto Archiver template'.
|
||||
|
||||
If you saved your `service_account.json` file to anywhere other than the default location (`secrets/service_account.json`), then also make sure to change that now:
|
||||
|
||||
Here's how this might look:
|
||||
|
||||
```{code} yaml
|
||||
...
|
||||
gsheet_feeder_db:
|
||||
sheet: 'My Awesome Sheet'
|
||||
service_account: secrets/service_account-XXXXX.json # or leave as secrets/service_account.json
|
||||
...
|
||||
```
|
||||
|
||||
@@ -90,7 +121,7 @@ You can also pass these settings directly on the command line without having to
|
||||
|
||||
Here, the sheet name has been overridden/specified in the command line invocation.
|
||||
|
||||
### 3a. (Optional) Changing the column names
|
||||
### 4a. (Optional) Changing the column names
|
||||
|
||||
In step 1, we said we would change the name of the 'Destination Folder'. Perhaps you don't like this name, or already have a sheet with a different name. In our example here, we want to name this column 'Save Folder'. To do this, we need to edit the `ghseet_feeder_db.column` setting in the configuration file.
|
||||
For more information on this setting, see the [Gsheet Feeder Database docs](../modules/autogen/feeder/gsheet_feeder_db.md#configuration-options). We will first copy the default settings from the Gsheet Feeder docs for the 'column' settings, and then edit the 'Destination Folder' section to rename it 'Save Folder'. Our final configuration section looks like:
|
||||
|
||||
BIN
docs/source/how_to/share_sheet.png
Normal file
BIN
docs/source/how_to/share_sheet.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 60 KiB |
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[project]
|
||||
name = "auto-archiver"
|
||||
version = "0.13.6"
|
||||
version = "0.13.8"
|
||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
135
scripts/generate_google_services.sh
Normal file
135
scripts/generate_google_services.sh
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e # Exit on error
|
||||
|
||||
|
||||
UUID=$(LC_ALL=C tr -dc a-z0-9 </dev/urandom | head -c 16)
|
||||
PROJECT_NAME="auto-archiver-$UUID"
|
||||
ACCOUNT_NAME="autoarchiver"
|
||||
KEY_FILE="service_account-$UUID.json"
|
||||
DEST_DIR="$1"
|
||||
|
||||
echo "====================================================="
|
||||
echo "🔧 Auto-Archiver Google Services Setup Script"
|
||||
echo "====================================================="
|
||||
echo "This script will:"
|
||||
echo " 1. Install Google Cloud SDK if needed"
|
||||
echo " 2. Create a Google Cloud project named $PROJECT_NAME"
|
||||
echo " 3. Create a service account for Auto-Archiver"
|
||||
echo " 4. Generate a key file for API access"
|
||||
echo ""
|
||||
echo " Tip: Pass a directory path as an argument to this script to move the key file there"
|
||||
echo " e.g. ./generate_google_services.sh /path/to/secrets"
|
||||
echo "====================================================="
|
||||
|
||||
# Check and install Google Cloud SDK based on platform
|
||||
install_gcloud_sdk() {
|
||||
if command -v gcloud &> /dev/null; then
|
||||
echo "✅ Google Cloud SDK is already installed"
|
||||
return 0
|
||||
fi
|
||||
|
||||
echo "📦 Installing Google Cloud SDK..."
|
||||
|
||||
# Detect OS
|
||||
case "$(uname -s)" in
|
||||
Darwin*)
|
||||
if command -v brew &> /dev/null; then
|
||||
echo "🍺 Installing via Homebrew..."
|
||||
brew install google-cloud-sdk --cask
|
||||
else
|
||||
echo "📥 Downloading Google Cloud SDK for macOS..."
|
||||
curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-latest-darwin-x86_64.tar.gz
|
||||
tar -xf google-cloud-cli-latest-darwin-x86_64.tar.gz
|
||||
./google-cloud-sdk/install.sh --quiet
|
||||
rm google-cloud-cli-latest-darwin-x86_64.tar.gz
|
||||
echo "🔄 Please restart your terminal and run this script again"
|
||||
exit 0
|
||||
fi
|
||||
;;
|
||||
Linux*)
|
||||
echo "📥 Downloading Google Cloud SDK for Linux..."
|
||||
curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-latest-linux-x86_64.tar.gz
|
||||
tar -xf google-cloud-cli-latest-linux-x86_64.tar.gz
|
||||
./google-cloud-sdk/install.sh --quiet
|
||||
rm google-cloud-cli-latest-linux-x86_64.tar.gz
|
||||
echo "🔄 Please restart your terminal and run this script again"
|
||||
exit 0
|
||||
;;
|
||||
CYGWIN*|MINGW*|MSYS*)
|
||||
echo "⚠️ Windows detected. Please follow manual installation instructions at:"
|
||||
echo "https://cloud.google.com/sdk/docs/install-sdk"
|
||||
exit 1
|
||||
;;
|
||||
*)
|
||||
echo "⚠️ Unknown operating system. Please follow manual installation instructions at:"
|
||||
echo "https://cloud.google.com/sdk/docs/install-sdk"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "✅ Google Cloud SDK installed"
|
||||
}
|
||||
|
||||
# Install Google Cloud SDK if needed
|
||||
install_gcloud_sdk
|
||||
|
||||
# Login to Google Cloud
|
||||
if gcloud auth list --filter=status:ACTIVE --format="value(account)" | grep -q "@"; then
|
||||
echo "✅ Already authenticated with Google Cloud"
|
||||
else
|
||||
echo "🔑 Authenticating with Google Cloud..."
|
||||
gcloud auth login
|
||||
fi
|
||||
|
||||
# Create project
|
||||
echo "🌟 Creating Google Cloud project: $PROJECT_NAME"
|
||||
gcloud projects create $PROJECT_NAME
|
||||
|
||||
# Create service account
|
||||
echo "👤 Creating service account: $ACCOUNT_NAME"
|
||||
gcloud iam service-accounts create $ACCOUNT_NAME --project $PROJECT_NAME
|
||||
|
||||
# Enable required APIs (uncomment and add APIs as needed)
|
||||
echo "⬆️ Enabling required Google APIs..."
|
||||
gcloud services enable sheets.googleapis.com --project $PROJECT_NAME
|
||||
gcloud services enable drive.googleapis.com --project $PROJECT_NAME
|
||||
|
||||
# Get the service account email
|
||||
echo "📧 Retrieving service account email..."
|
||||
ACCOUNT_EMAIL=$(gcloud iam service-accounts list --project $PROJECT_NAME --format="value(email)")
|
||||
|
||||
# Create and download key
|
||||
echo "🔑 Generating service account key file: $KEY_FILE"
|
||||
gcloud iam service-accounts keys create $KEY_FILE --iam-account=$ACCOUNT_EMAIL
|
||||
|
||||
# move the file to TARGET_DIR if provided
|
||||
if [[ -n "$DEST_DIR" ]]; then
|
||||
# Expand `~` if used
|
||||
DEST_DIR=$(eval echo "$DEST_DIR")
|
||||
|
||||
# Ensure the directory exists
|
||||
if [[ ! -d "$DEST_DIR" ]]; then
|
||||
mkdir -p "$DEST_DIR"
|
||||
fi
|
||||
|
||||
DEST_PATH="$DEST_DIR/$KEY_FILE"
|
||||
echo "🚚 Moving key file to: $DEST_PATH"
|
||||
mv "$KEY_FILE" "$DEST_PATH"
|
||||
KEY_FILE="$DEST_PATH"
|
||||
fi
|
||||
|
||||
echo "====================================================="
|
||||
echo "✅ SETUP COMPLETE!"
|
||||
echo "====================================================="
|
||||
echo "📝 Important Information:"
|
||||
echo " • Project Name: $PROJECT_NAME"
|
||||
echo " • Service Account: $ACCOUNT_EMAIL"
|
||||
echo " • Key File: $KEY_FILE"
|
||||
echo ""
|
||||
echo "📋 Next Steps:"
|
||||
echo " 1. Share any Google Sheets with this email address:"
|
||||
echo " $ACCOUNT_EMAIL"
|
||||
echo " 2. Move $KEY_FILE to your auto-archiver secrets directory"
|
||||
echo " 3. Update your auto-archiver config to use this key file (if needed)"
|
||||
echo "====================================================="
|
||||
@@ -71,7 +71,16 @@ class BaseModule(ABC):
|
||||
:param site: the domain of the site to get authentication information for
|
||||
:param extract_cookies: whether or not to extract cookies from the given browser/file and return the cookie jar (disabling can speed up processing if you don't actually need the cookies jar).
|
||||
|
||||
:returns: authdict dict of login information for the given site
|
||||
:returns: authdict dict -> {
|
||||
"username": str,
|
||||
"password": str,
|
||||
"api_key": str,
|
||||
"api_secret": str,
|
||||
"cookie": str,
|
||||
"cookies_file": str,
|
||||
"cookies_from_browser": str,
|
||||
"cookies_jar": CookieJar
|
||||
}
|
||||
|
||||
**Global options:**\n
|
||||
* cookies_from_browser: str - the name of the browser to extract cookies from (e.g. 'chrome', 'firefox' - uses ytdlp under the hood to extract\n
|
||||
@@ -85,6 +94,7 @@ class BaseModule(ABC):
|
||||
* cookie: str - a cookie string to use for login (specific to this site)\n
|
||||
* cookies_file: str - the path to a cookies file to use for login (specific to this site)\n
|
||||
* cookies_from_browser: str - the name of the browser to extract cookies from (specitic for this site)\n
|
||||
|
||||
"""
|
||||
# TODO: think about if/how we can deal with sites that have multiple domains (main one is x.com/twitter.com)
|
||||
# for now the user must enter them both, like "x.com,twitter.com" in their config. Maybe we just hard-code?
|
||||
|
||||
@@ -5,6 +5,7 @@ by handling user configuration, validating the steps properties, and implementin
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import subprocess
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, TYPE_CHECKING, Type
|
||||
@@ -17,7 +18,7 @@ import os
|
||||
from os.path import join
|
||||
from loguru import logger
|
||||
import auto_archiver
|
||||
from auto_archiver.core.consts import DEFAULT_MANIFEST, MANIFEST_FILE
|
||||
from auto_archiver.core.consts import DEFAULT_MANIFEST, MANIFEST_FILE, SetupError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .base_module import BaseModule
|
||||
@@ -85,7 +86,11 @@ class ModuleFactory:
|
||||
if not available:
|
||||
message = f"Module '{module_name}' not found. Are you sure it's installed/exists?"
|
||||
if "archiver" in module_name:
|
||||
message += f" Did you mean {module_name.replace('archiver', 'extractor')}?"
|
||||
message += f" Did you mean '{module_name.replace('archiver', 'extractor')}'?"
|
||||
elif "gsheet" in module_name:
|
||||
message += " Did you mean 'gsheet_feeder_db'?"
|
||||
elif "atlos" in module_name:
|
||||
message += " Did you mean 'atlos_feeder_db_storage'?"
|
||||
raise IndexError(message)
|
||||
return available[0]
|
||||
|
||||
@@ -216,9 +221,9 @@ class LazyBaseModule:
|
||||
if not check(dep):
|
||||
logger.error(
|
||||
f"Module '{self.name}' requires external dependency '{dep}' which is not available/setup. \
|
||||
Have you installed the required dependencies for the '{self.name}' module? See the README for more information."
|
||||
Have you installed the required dependencies for the '{self.name}' module? See the documentation for more information."
|
||||
)
|
||||
exit(1)
|
||||
raise SetupError()
|
||||
|
||||
def check_python_dep(dep):
|
||||
# first check if it's a module:
|
||||
@@ -237,8 +242,22 @@ class LazyBaseModule:
|
||||
|
||||
return find_spec(dep)
|
||||
|
||||
def check_bin_dep(dep):
|
||||
dep_exists = shutil.which(dep)
|
||||
|
||||
if dep == "docker":
|
||||
if os.environ.get("RUNNING_IN_DOCKER"):
|
||||
# this is only for the WACZ enricher, which requires docker
|
||||
# if we're already running in docker then we don't need docker
|
||||
return True
|
||||
|
||||
# check if docker daemon is running
|
||||
return dep_exists and subprocess.run(["docker", "ps", "-q"]).returncode == 0
|
||||
|
||||
return dep_exists
|
||||
|
||||
check_deps(self.dependencies.get("python", []), check_python_dep)
|
||||
check_deps(self.dependencies.get("bin", []), lambda dep: shutil.which(dep))
|
||||
check_deps(self.dependencies.get("bin", []), check_bin_dep)
|
||||
|
||||
logger.debug(f"Loading module '{self.display_name}'...")
|
||||
|
||||
|
||||
@@ -373,9 +373,17 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
||||
if module in invalid_modules:
|
||||
continue
|
||||
|
||||
# check to make sure that we're trying to load it as the correct type - i.e. make sure the user hasn't put it under the wrong 'step'
|
||||
lazy_module: LazyBaseModule = self.module_factory.get_module_lazy(module)
|
||||
if module_type not in lazy_module.type:
|
||||
types = ",".join(f"'{t}'" for t in lazy_module.type)
|
||||
raise SetupError(
|
||||
f"Configuration Error: Module '{module}' is not a {module_type}, but has the types: {types}. Please check you set this module up under the right step in your orchestration file."
|
||||
)
|
||||
|
||||
loaded_module = None
|
||||
try:
|
||||
loaded_module: BaseModule = self.module_factory.get_module(module, self.config)
|
||||
loaded_module: BaseModule = lazy_module.load(self.config)
|
||||
except (KeyboardInterrupt, Exception) as e:
|
||||
if not isinstance(e, KeyboardInterrupt) and not isinstance(e, SetupError):
|
||||
logger.error(f"Error during setup of modules: {e}\n{traceback.format_exc()}")
|
||||
|
||||
@@ -74,6 +74,11 @@ If you are having issues with the extractor, you can review the version of `yt-d
|
||||
"default": "inf",
|
||||
"help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit.",
|
||||
},
|
||||
"extractor_args": {
|
||||
"default": {},
|
||||
"help": "Additional arguments to pass to the yt-dlp extractor. See https://github.com/yt-dlp/yt-dlp/blob/master/README.md#extractor-arguments.",
|
||||
"type": "json_loader",
|
||||
},
|
||||
"ytdlp_update_interval": {
|
||||
"default": 5,
|
||||
"help": "How often to check for yt-dlp updates (days). If positive, will check and update yt-dlp every [num] days. Set it to -1 to disable, or 0 to always update on every run.",
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from typing import Type
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
@@ -24,6 +25,8 @@ class GenericDropin:
|
||||
|
||||
"""
|
||||
|
||||
extractor: Type[Extractor] = None
|
||||
|
||||
def extract_post(self, url: str, ie_instance: InfoExtractor):
|
||||
"""
|
||||
This method should return the post data from the url.
|
||||
@@ -55,3 +58,19 @@ class GenericDropin:
|
||||
This method should download any additional media from the post.
|
||||
"""
|
||||
return metadata
|
||||
|
||||
def suitable(self, url, info_extractor: InfoExtractor):
|
||||
"""
|
||||
A method to allow dropins to override their InfoExtractor's 'suitable' method.
|
||||
Dropins should override this method and return True if the url is suitable for the extractor
|
||||
(based on being able to parse other URLs). See the `suitable_extractors` method in the
|
||||
`GenericExtractor` class for how this is implemented.
|
||||
|
||||
The default behaviour of this method is to return the result of the InfoExtractor's 'suitable' method.
|
||||
|
||||
### Example: An example of where this is useful is for the FacebookIE extractor in yt-dlp. By default,
|
||||
it's 'suitable' method only returns True for video URLs. However, we can override this method in the
|
||||
Facebook dropin to return True for all Facebook URLs (photo/post types). This way, the Facebook dropin
|
||||
can be used for all Facebook URLs.
|
||||
"""
|
||||
return info_extractor.suitable(url)
|
||||
|
||||
@@ -1,17 +1,154 @@
|
||||
import re
|
||||
from .dropin import GenericDropin
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from yt_dlp.extractor.facebook import FacebookIE
|
||||
|
||||
# TODO: Remove if / when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
|
||||
from yt_dlp.utils import (
|
||||
clean_html,
|
||||
get_element_by_id,
|
||||
traverse_obj,
|
||||
get_first,
|
||||
merge_dicts,
|
||||
int_or_none,
|
||||
parse_count,
|
||||
)
|
||||
|
||||
|
||||
def _extract_metadata(self, webpage, video_id):
|
||||
post_data = [
|
||||
self._parse_json(j, video_id, fatal=False)
|
||||
for j in re.findall(r"data-sjs>({.*?ScheduledServerJS.*?})</script>", webpage)
|
||||
]
|
||||
post = (
|
||||
traverse_obj(
|
||||
post_data,
|
||||
(..., "require", ..., ..., ..., "__bbox", "require", ..., ..., ..., "__bbox", "result", "data"),
|
||||
expected_type=dict,
|
||||
)
|
||||
or []
|
||||
)
|
||||
media = traverse_obj(
|
||||
post,
|
||||
(
|
||||
...,
|
||||
"attachments",
|
||||
...,
|
||||
lambda k, v: (k == "media" and str(v["id"]) == video_id and v["__typename"] == "Video"),
|
||||
),
|
||||
expected_type=dict,
|
||||
)
|
||||
title = get_first(media, ("title", "text"))
|
||||
description = get_first(media, ("creation_story", "comet_sections", "message", "story", "message", "text"))
|
||||
page_title = title or self._html_search_regex(
|
||||
(
|
||||
r'<h2\s+[^>]*class="uiHeaderTitle"[^>]*>(?P<content>[^<]*)</h2>',
|
||||
r'(?s)<span class="fbPhotosPhotoCaption".*?id="fbPhotoPageCaption"><span class="hasCaption">(?P<content>.*?)</span>',
|
||||
self._meta_regex("og:title"),
|
||||
self._meta_regex("twitter:title"),
|
||||
r"<title>(?P<content>.+?)</title>",
|
||||
),
|
||||
webpage,
|
||||
"title",
|
||||
default=None,
|
||||
group="content",
|
||||
)
|
||||
description = description or self._html_search_meta(
|
||||
["description", "og:description", "twitter:description"], webpage, "description", default=None
|
||||
)
|
||||
uploader_data = (
|
||||
get_first(media, ("owner", {dict}))
|
||||
or get_first(
|
||||
post, ("video", "creation_story", "attachments", ..., "media", lambda k, v: k == "owner" and v["name"])
|
||||
)
|
||||
or get_first(post, (..., "video", lambda k, v: k == "owner" and v["name"]))
|
||||
or get_first(post, ("node", "actors", ..., {dict}))
|
||||
or get_first(post, ("event", "event_creator", {dict}))
|
||||
or get_first(post, ("video", "creation_story", "short_form_video_context", "video_owner", {dict}))
|
||||
or {}
|
||||
)
|
||||
uploader = uploader_data.get("name") or (
|
||||
clean_html(get_element_by_id("fbPhotoPageAuthorName", webpage))
|
||||
or self._search_regex(
|
||||
(r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes("title")), webpage, "uploader", fatal=False
|
||||
)
|
||||
)
|
||||
timestamp = int_or_none(self._search_regex(r'<abbr[^>]+data-utime=["\'](\d+)', webpage, "timestamp", default=None))
|
||||
thumbnail = self._html_search_meta(["og:image", "twitter:image"], webpage, "thumbnail", default=None)
|
||||
# some webpages contain unretrievable thumbnail urls
|
||||
# like https://lookaside.fbsbx.com/lookaside/crawler/media/?media_id=10155168902769113&get_thumbnail=1
|
||||
# in https://www.facebook.com/yaroslav.korpan/videos/1417995061575415/
|
||||
if thumbnail and not re.search(r"\.(?:jpg|png)", thumbnail):
|
||||
thumbnail = None
|
||||
info_dict = {
|
||||
"description": description,
|
||||
"uploader": uploader,
|
||||
"uploader_id": uploader_data.get("id"),
|
||||
"timestamp": timestamp,
|
||||
"thumbnail": thumbnail,
|
||||
"view_count": parse_count(
|
||||
self._search_regex(
|
||||
(r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'),
|
||||
webpage,
|
||||
"view count",
|
||||
default=None,
|
||||
)
|
||||
),
|
||||
"concurrent_view_count": get_first(
|
||||
post, (("video", (..., ..., "attachments", ..., "media")), "liveViewerCount", {int_or_none})
|
||||
),
|
||||
**traverse_obj(
|
||||
post,
|
||||
(
|
||||
lambda _, v: video_id in v["url"],
|
||||
"feedback",
|
||||
{
|
||||
"like_count": ("likers", "count", {int}),
|
||||
"comment_count": ("total_comment_count", {int}),
|
||||
"repost_count": ("share_count_reduced", {parse_count}),
|
||||
},
|
||||
),
|
||||
get_all=False,
|
||||
),
|
||||
}
|
||||
|
||||
info_json_ld = self._search_json_ld(webpage, video_id, default={})
|
||||
info_json_ld["title"] = (
|
||||
re.sub(r"\s*\|\s*Facebook$", "", title or info_json_ld.get("title") or page_title or "")
|
||||
or (description or "").replace("\n", " ")
|
||||
or f"Facebook video #{video_id}"
|
||||
)
|
||||
return merge_dicts(info_json_ld, info_dict)
|
||||
|
||||
|
||||
class Facebook(GenericDropin):
|
||||
def extract_post(self, url: str, ie_instance):
|
||||
video_id = ie_instance._match_valid_url(url).group("id")
|
||||
ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), video_id)
|
||||
webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group("id"))
|
||||
def extract_post(self, url: str, ie_instance: FacebookIE):
|
||||
post_id_regex = r"(?P<id>pfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))"
|
||||
post_id = re.search(post_id_regex, url).group("id")
|
||||
webpage = ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), post_id)
|
||||
|
||||
# TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
|
||||
post_data = ie_instance._extract_metadata(webpage)
|
||||
# TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ...
|
||||
|
||||
# TODO: If/when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged, uncomment next line and delete the one after
|
||||
# post_data = ie_instance._extract_metadata(webpage, post_id)
|
||||
post_data = _extract_metadata(ie_instance, webpage, post_id)
|
||||
return post_data
|
||||
|
||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||
metadata = archiver.create_metadata(url)
|
||||
metadata.set_title(post.get("title")).set_content(post.get("description")).set_post_data(post)
|
||||
return metadata
|
||||
def create_metadata(self, post: dict, ie_instance: FacebookIE, archiver, url):
|
||||
result = Metadata()
|
||||
result.set_content(post.get("description", ""))
|
||||
result.set_title(post.get("title", ""))
|
||||
result.set("author", post.get("uploader", ""))
|
||||
result.set_url(url)
|
||||
return result
|
||||
|
||||
def suitable(self, url, info_extractor: FacebookIE):
|
||||
regex = r"(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)"
|
||||
return re.match(regex, url)
|
||||
|
||||
def skip_ytdlp_download(self, url: str, is_instance: FacebookIE):
|
||||
"""
|
||||
Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX
|
||||
"""
|
||||
if re.search(r"/t.\d+/\d+", url):
|
||||
return True
|
||||
|
||||
@@ -13,6 +13,8 @@ from loguru import logger
|
||||
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.utils import get_datetime_from_str
|
||||
from .dropin import GenericDropin
|
||||
|
||||
|
||||
class SkipYtdlp(Exception):
|
||||
@@ -67,7 +69,14 @@ class GenericExtractor(Extractor):
|
||||
"""
|
||||
Returns a list of valid extractors for the given URL"""
|
||||
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
|
||||
if info_extractor.suitable(url) and info_extractor.working():
|
||||
if not info_extractor.working():
|
||||
continue
|
||||
|
||||
# check if there's a dropin and see if that declares whether it's suitable
|
||||
dropin: GenericDropin = self.dropin_for_name(info_extractor.ie_key())
|
||||
if dropin and dropin.suitable(url, info_extractor):
|
||||
yield info_extractor
|
||||
elif info_extractor.suitable(url):
|
||||
yield info_extractor
|
||||
|
||||
def suitable(self, url: str) -> bool:
|
||||
@@ -188,9 +197,13 @@ class GenericExtractor(Extractor):
|
||||
result = self.download_additional_media(video_data, info_extractor, result)
|
||||
|
||||
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
|
||||
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
|
||||
result.set_url(url)
|
||||
if "description" in video_data:
|
||||
if not result.get_title():
|
||||
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
|
||||
|
||||
if not result.get("url"):
|
||||
result.set_url(url)
|
||||
|
||||
if "description" in video_data and not result.get("content"):
|
||||
result.set_content(video_data["description"])
|
||||
# extract comments if enabled
|
||||
if self.comments:
|
||||
@@ -207,11 +220,14 @@ class GenericExtractor(Extractor):
|
||||
)
|
||||
|
||||
# then add the common metadata
|
||||
if timestamp := video_data.pop("timestamp", None):
|
||||
timestamp = video_data.pop("timestamp", None)
|
||||
if timestamp and not result.get("timestamp"):
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).isoformat()
|
||||
result.set_timestamp(timestamp)
|
||||
if upload_date := video_data.pop("upload_date", None):
|
||||
upload_date = datetime.datetime.strptime(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
|
||||
|
||||
upload_date = video_data.pop("upload_date", None)
|
||||
if upload_date and not result.get("upload_date"):
|
||||
upload_date = get_datetime_from_str(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
|
||||
result.set("upload_date", upload_date)
|
||||
|
||||
# then clean away any keys we don't want
|
||||
@@ -240,7 +256,8 @@ class GenericExtractor(Extractor):
|
||||
return False
|
||||
|
||||
post_data = dropin.extract_post(url, ie_instance)
|
||||
return dropin.create_metadata(post_data, ie_instance, self, url)
|
||||
result = dropin.create_metadata(post_data, ie_instance, self, url)
|
||||
return self.add_metadata(post_data, info_extractor, url, result)
|
||||
|
||||
def get_metadata_for_video(
|
||||
self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL
|
||||
@@ -285,7 +302,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
return self.add_metadata(data, info_extractor, url, result)
|
||||
|
||||
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> Type[InfoExtractor]:
|
||||
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> GenericDropin:
|
||||
dropin_name = dropin_name.lower()
|
||||
|
||||
if dropin_name == "generic":
|
||||
@@ -296,6 +313,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
def _load_dropin(dropin):
|
||||
dropin_class = getattr(dropin, dropin_class_name)()
|
||||
dropin.extractor = self
|
||||
return self._dropins.setdefault(dropin_name, dropin_class)
|
||||
|
||||
try:
|
||||
@@ -340,7 +358,7 @@ class GenericExtractor(Extractor):
|
||||
dropin_submodule = self.dropin_for_name(info_extractor.ie_key())
|
||||
|
||||
try:
|
||||
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
|
||||
if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor):
|
||||
logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
|
||||
raise SkipYtdlp()
|
||||
|
||||
@@ -359,7 +377,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
if not isinstance(e, SkipYtdlp):
|
||||
logger.debug(
|
||||
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead'
|
||||
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead'
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -404,16 +422,20 @@ class GenericExtractor(Extractor):
|
||||
"--write-subs" if self.subtitles else "--no-write-subs",
|
||||
"--write-auto-subs" if self.subtitles else "--no-write-auto-subs",
|
||||
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
||||
"--proxy",
|
||||
self.proxy if self.proxy else "",
|
||||
f"--max-downloads {self.max_downloads}" if self.max_downloads != "inf" else "",
|
||||
f"--playlist-end {self.max_downloads}" if self.max_downloads != "inf" else "",
|
||||
]
|
||||
|
||||
# proxy handling
|
||||
if self.proxy:
|
||||
ydl_options.extend(["--proxy", self.proxy])
|
||||
|
||||
# max_downloads handling
|
||||
if self.max_downloads != "inf":
|
||||
ydl_options.extend(["--max-downloads", str(self.max_downloads)])
|
||||
ydl_options.extend(["--playlist-end", str(self.max_downloads)])
|
||||
|
||||
# set up auth
|
||||
auth = self.auth_for_site(url, extract_cookies=False)
|
||||
|
||||
# order of importance: username/pasword -> api_key -> cookie -> cookies_from_browser -> cookies_file
|
||||
# order of importance: username/password -> api_key -> cookie -> cookies_from_browser -> cookies_file
|
||||
if auth:
|
||||
if "username" in auth and "password" in auth:
|
||||
logger.debug(f"Using provided auth username and password for {url}")
|
||||
@@ -429,6 +451,16 @@ class GenericExtractor(Extractor):
|
||||
logger.debug(f"Using cookies from file {auth['cookies_file']} for {url}")
|
||||
ydl_options.extend(("--cookies", auth["cookies_file"]))
|
||||
|
||||
# Applying user-defined extractor_args
|
||||
if self.extractor_args:
|
||||
for key, args in self.extractor_args.items():
|
||||
logger.debug(f"Setting extractor_args: {key}")
|
||||
if isinstance(args, dict):
|
||||
arg_str = ";".join(f"{k}={v}" for k, v in args.items())
|
||||
else:
|
||||
arg_str = str(args)
|
||||
ydl_options.extend(["--extractor-args", f"{key}:{arg_str}"])
|
||||
|
||||
if self.ytdlp_args:
|
||||
logger.debug("Adding additional ytdlp arguments: {self.ytdlp_args}")
|
||||
ydl_options += self.ytdlp_args.split(" ")
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from yt_dlp.extractor.tiktok import TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from datetime import datetime, timezone
|
||||
from .dropin import GenericDropin
|
||||
@@ -13,6 +16,11 @@ class Tiktok(GenericDropin):
|
||||
|
||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||
|
||||
def suitable(self, url, info_extractor) -> bool:
|
||||
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
|
||||
Return the 'suitable' method from the TikTokIE class."""
|
||||
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
|
||||
|
||||
def extract_post(self, url: str, ie_instance):
|
||||
logger.debug(f"Using Tikwm API to attempt to download tiktok video from {url=}")
|
||||
|
||||
@@ -38,6 +46,9 @@ class Tiktok(GenericDropin):
|
||||
api_data["video_url"] = video_url
|
||||
return api_data
|
||||
|
||||
def keys_to_clean(self, video_data: dict, info_extractor):
|
||||
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
|
||||
|
||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||
# prepare result, start by downloading video
|
||||
result = Metadata()
|
||||
@@ -54,17 +65,17 @@ class Tiktok(GenericDropin):
|
||||
logger.error(f"failed to download video from {video_url}")
|
||||
return False
|
||||
video_media = Media(video_downloaded)
|
||||
if duration := post.pop("duration", None):
|
||||
if duration := post.get("duration", None):
|
||||
video_media.set("duration", duration)
|
||||
result.add_media(video_media)
|
||||
|
||||
# add remaining metadata
|
||||
result.set_title(post.pop("title", ""))
|
||||
result.set_title(post.get("title", ""))
|
||||
|
||||
if created_at := post.pop("create_time", None):
|
||||
if created_at := post.get("create_time", None):
|
||||
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
||||
|
||||
if author := post.pop("author", None):
|
||||
if author := post.get("author", None):
|
||||
result.set("author", author)
|
||||
|
||||
result.set("api_data", post)
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
import re
|
||||
import mimetypes
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from loguru import logger
|
||||
from slugify import slugify
|
||||
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.utils import url as UrlUtil
|
||||
from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
|
||||
from .dropin import GenericDropin, InfoExtractor
|
||||
@@ -33,19 +31,24 @@ class Twitter(GenericDropin):
|
||||
twid = ie_instance._match_valid_url(url).group("id")
|
||||
return ie_instance._extract_status(twid=twid)
|
||||
|
||||
def keys_to_clean(self, video_data, info_extractor):
|
||||
return ["user", "created_at", "entities", "favorited", "translator_type"]
|
||||
|
||||
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
|
||||
result = Metadata()
|
||||
try:
|
||||
if not tweet.get("user") or not tweet.get("created_at"):
|
||||
raise ValueError("Error retreiving post. Are you sure it exists?")
|
||||
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
|
||||
timestamp = get_datetime_from_str(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
|
||||
except (ValueError, KeyError) as ex:
|
||||
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
|
||||
return False
|
||||
|
||||
result.set_title(tweet.get("full_text", "")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(
|
||||
timestamp
|
||||
)
|
||||
full_text = tweet.pop("full_text", "")
|
||||
author = tweet["user"].get("name", "")
|
||||
result.set("author", author).set_url(url)
|
||||
|
||||
result.set_title(f"{author} - {full_text}").set_content(full_text).set_timestamp(timestamp)
|
||||
if not tweet.get("entities", {}).get("media"):
|
||||
logger.debug("No media found, archiving tweet text only")
|
||||
result.status = "twitter-ytdl"
|
||||
|
||||
@@ -70,10 +70,14 @@
|
||||
- Skips redundant updates for empty or invalid data fields.
|
||||
|
||||
### Setup
|
||||
- Requires a Google Service Account JSON file for authentication, which should be stored in `secrets/gsheets_service_account.json`.
|
||||
To set up a service account, follow the instructions [here](https://gspread.readthedocs.io/en/latest/oauth2.html).
|
||||
- Define the `sheet` or `sheet_id` configuration to specify the sheet to archive.
|
||||
- Customize the column names in your Google sheet using the `columns` configuration.
|
||||
- The Google Sheet can be used soley as a feeder or as a feeder and database, but note you can't currently feed into the database from an alternate feeder.
|
||||
1. Requires a Google Service Account JSON file for authentication.
|
||||
To set up a service account, follow the instructions in the [how to](https://auto-archiver.readthedocs.io/en/latest/how_to/gsheets_setup.html),
|
||||
or use the script:
|
||||
```
|
||||
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/bellingcat/auto-archiver/refs/heads/main/scripts/generate_google_services.sh)"
|
||||
```
|
||||
2. Create a Google sheet with the required column(s) and then define the `sheet` or `sheet_id` configuration to specify this sheet.
|
||||
3. Customize the column names in your Google sheet using the `columns` configuration.
|
||||
4. The Google Sheet can be used solely as a feeder or as a feeder and database, but note you can't currently feed into the database from an alternate feeder.
|
||||
""",
|
||||
}
|
||||
|
||||
@@ -29,6 +29,9 @@ class InstagramExtractor(Extractor):
|
||||
# TODO: links to stories
|
||||
|
||||
def setup(self) -> None:
|
||||
logger.warning("Instagram Extractor is not actively maintained, and may not work as expected.")
|
||||
logger.warning("Please consider using the Instagram Tbot Extractor or Instagram API Extractor instead.")
|
||||
|
||||
self.insta = instaloader.Instaloader(
|
||||
download_geotags=True,
|
||||
download_comments=True,
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"save_absolute": {
|
||||
"default": False,
|
||||
"type": "bool",
|
||||
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)",
|
||||
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
|
||||
@@ -19,12 +19,21 @@ class ScreenshotEnricher(Enricher):
|
||||
def enrich(self, to_enrich: Metadata) -> None:
|
||||
url = to_enrich.get_url()
|
||||
|
||||
if UrlUtil.is_auth_wall(url):
|
||||
logger.debug(f"[SKIP] SCREENSHOT since url is behind AUTH WALL: {url=}")
|
||||
return
|
||||
|
||||
logger.debug(f"Enriching screenshot for {url=}")
|
||||
auth = self.auth_for_site(url)
|
||||
|
||||
# screenshot enricher only supports cookie-type auth (selenium)
|
||||
has_valid_auth = auth and (auth.get("cookies") or auth.get("cookies_jar") or auth.get("cookie"))
|
||||
|
||||
if UrlUtil.is_auth_wall(url) and not has_valid_auth:
|
||||
logger.warning(f"[SKIP] SCREENSHOT since url is behind AUTH WALL and no login details provided: {url=}")
|
||||
if any(auth.get(key) for key in ["username", "password", "api_key", "api_secret"]):
|
||||
logger.warning(
|
||||
f"Screenshot enricher only supports cookie-type authentication, you have provided {auth.keys()} which are not supported.\
|
||||
Consider adding 'cookie', 'cookies_file' or 'cookies_from_browser' to your auth for this site."
|
||||
)
|
||||
return
|
||||
|
||||
with self.webdriver_factory(
|
||||
self.width,
|
||||
self.height,
|
||||
|
||||
@@ -2,7 +2,6 @@ import json
|
||||
import re
|
||||
import mimetypes
|
||||
import requests
|
||||
from datetime import datetime
|
||||
|
||||
from loguru import logger
|
||||
from pytwitter import Api
|
||||
@@ -10,6 +9,7 @@ from slugify import slugify
|
||||
|
||||
from auto_archiver.core import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.utils import get_datetime_from_str
|
||||
|
||||
|
||||
class TwitterApiExtractor(Extractor):
|
||||
@@ -91,7 +91,7 @@ class TwitterApiExtractor(Extractor):
|
||||
|
||||
result = Metadata()
|
||||
result.set_title(tweet.data.text)
|
||||
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
result.set_timestamp(get_datetime_from_str(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
|
||||
urls = []
|
||||
if tweet.includes:
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"configs": {
|
||||
"profile": {
|
||||
"default": None,
|
||||
"help": "browsertrix-profile (for profile generation see https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles).",
|
||||
"help": "browsertrix-profile (for profile generation see https://crawler.docs.browsertrix.com/user-guide/browser-profiles/).",
|
||||
},
|
||||
"docker_commands": {"default": None, "help": "if a custom docker invocation is needed"},
|
||||
"timeout": {"default": 120, "help": "timeout for WACZ generation in seconds", "type": "int"},
|
||||
@@ -40,14 +40,27 @@
|
||||
Creates .WACZ archives of web pages using the `browsertrix-crawler` tool, with options for media extraction and screenshot saving.
|
||||
[Browsertrix-crawler](https://crawler.docs.browsertrix.com/user-guide/) is a headless browser-based crawler that archives web pages in WACZ format.
|
||||
|
||||
### Features
|
||||
## Setup
|
||||
|
||||
**Docker**
|
||||
If you are using the Docker file to run Auto Archiver (recommended), then everything is set up and you can use WACZ out of the box!
|
||||
Otherwise, if you are using a local install of Auto Archiver (e.g. pip or dev install), then you will need to install Docker and run
|
||||
the docker daemon to be able to run the `browsertrix-crawler` tool.
|
||||
|
||||
**Browsertrix Profiles**
|
||||
A browsertrix profile is a custom browser profile (login information, browser extensions, etc.) that can be used to archive private or dynamic content.
|
||||
You can run the WACZ Enricher without a profile, but for more resilient archiving, it is recommended to create a profile. See the [Browsertrix documentation](https://crawler.docs.browsertrix.com/user-guide/browser-profiles/)
|
||||
for more information.
|
||||
|
||||
** Docker in Docker **
|
||||
If you are running Auto Archiver within a Docker container, you will need to enable Docker in Docker to run the `browsertrix-crawler` tool.
|
||||
This can be done by setting the `WACZ_ENABLE_DOCKER` environment variable to `1`.
|
||||
|
||||
## Features
|
||||
- Archives web pages into .WACZ format using Docker or direct invocation of `browsertrix-crawler`.
|
||||
- Supports custom profiles for archiving private or dynamic content.
|
||||
- Extracts media (images, videos, audio) and screenshots from the archive, optionally adding them to the enrichment pipeline.
|
||||
- Generates metadata from the archived page's content and structure (e.g., titles, text).
|
||||
|
||||
### Notes
|
||||
- Requires Docker for running `browsertrix-crawler` .
|
||||
- Configurable via parameters for timeout, media extraction, screenshots, and proxy settings.
|
||||
""",
|
||||
}
|
||||
|
||||
@@ -24,7 +24,8 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
self.use_docker = os.environ.get("WACZ_ENABLE_DOCKER") or not os.environ.get("RUNNING_IN_DOCKER")
|
||||
self.docker_in_docker = os.environ.get("WACZ_ENABLE_DOCKER") and os.environ.get("RUNNING_IN_DOCKER")
|
||||
|
||||
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
|
||||
self.crawl_id = random_str(8)
|
||||
self.cwd_dind = f"/crawls/crawls{self.crawl_id}"
|
||||
self.browsertrix_home_host = os.environ.get("BROWSERTRIX_HOME_HOST")
|
||||
self.browsertrix_home_container = os.environ.get("BROWSERTRIX_HOME_CONTAINER") or self.browsertrix_home_host
|
||||
# create crawls folder if not exists, so it can be safely removed in cleanup
|
||||
@@ -50,7 +51,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
|
||||
url = to_enrich.get_url()
|
||||
|
||||
collection = random_str(8)
|
||||
collection = self.crawl_id
|
||||
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(self.tmp_dir)
|
||||
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
|
||||
|
||||
@@ -102,10 +103,11 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
] + cmd
|
||||
|
||||
if self.profile:
|
||||
profile_fn = os.path.join(browsertrix_home_container, "profile.tar.gz")
|
||||
profile_file = f"profile-{self.crawl_id}.tar.gz"
|
||||
profile_fn = os.path.join(browsertrix_home_container, profile_file)
|
||||
logger.debug(f"copying {self.profile} to {profile_fn}")
|
||||
shutil.copyfile(self.profile, profile_fn)
|
||||
cmd.extend(["--profile", os.path.join("/crawls", "profile.tar.gz")])
|
||||
cmd.extend(["--profile", os.path.join("/crawls", profile_file)])
|
||||
|
||||
else:
|
||||
logger.debug(f"generating WACZ without Docker for {url=}")
|
||||
|
||||
@@ -4,8 +4,8 @@ from ipaddress import ip_address
|
||||
|
||||
|
||||
AUTHWALL_URLS = [
|
||||
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
|
||||
re.compile(r"https?:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
|
||||
re.compile(r"https?:\/\/(www\.)?instagram\.com"), # instagram
|
||||
]
|
||||
|
||||
|
||||
@@ -81,56 +81,43 @@ def is_relevant_url(url: str) -> bool:
|
||||
"""
|
||||
clean_url = remove_get_parameters(url)
|
||||
|
||||
# favicons
|
||||
if "favicon" in url:
|
||||
return False
|
||||
# ifnore icons
|
||||
if clean_url.endswith(".ico"):
|
||||
return False
|
||||
# ignore SVGs
|
||||
if remove_get_parameters(url).endswith(".svg"):
|
||||
return False
|
||||
IRRELEVANT_URLS = [
|
||||
# favicons
|
||||
("favicon",),
|
||||
# twitter profile pictures
|
||||
("twimg.com/profile_images",),
|
||||
("twimg.com", "default_profile_images"),
|
||||
# instagram profile pictures
|
||||
("https://scontent.cdninstagram.com/", "150x150"),
|
||||
# instagram recurring images
|
||||
("https://static.cdninstagram.com/rsrc.php/",),
|
||||
# telegram
|
||||
("https://telegram.org/img/emoji/",),
|
||||
# youtube
|
||||
("https://www.youtube.com/s/gaming/emoji/",),
|
||||
("https://yt3.ggpht.com", "default-user="),
|
||||
("https://www.youtube.com/s/search/audio/",),
|
||||
# ok
|
||||
("https://ok.ru/res/i/",),
|
||||
("https://vk.com/emoji/",),
|
||||
("vk.com/images/",),
|
||||
("vk.com/images/reaction/",),
|
||||
# wikipedia
|
||||
("wikipedia.org/static",),
|
||||
]
|
||||
|
||||
# twitter profile pictures
|
||||
if "twimg.com/profile_images" in url:
|
||||
return False
|
||||
if "twimg.com" in url and "/default_profile_images" in url:
|
||||
return False
|
||||
IRRELEVANT_ENDS_WITH = [
|
||||
".svg", # ignore SVGs
|
||||
".ico", # ignore icons
|
||||
]
|
||||
|
||||
# instagram profile pictures
|
||||
if "https://scontent.cdninstagram.com/" in url and "150x150" in url:
|
||||
return False
|
||||
# instagram recurring images
|
||||
if "https://static.cdninstagram.com/rsrc.php/" in url:
|
||||
return False
|
||||
for end in IRRELEVANT_ENDS_WITH:
|
||||
if clean_url.endswith(end):
|
||||
return False
|
||||
|
||||
# telegram
|
||||
if "https://telegram.org/img/emoji/" in url:
|
||||
return False
|
||||
|
||||
# youtube
|
||||
if "https://www.youtube.com/s/gaming/emoji/" in url:
|
||||
return False
|
||||
if "https://yt3.ggpht.com" in url and "default-user=" in url:
|
||||
return False
|
||||
if "https://www.youtube.com/s/search/audio/" in url:
|
||||
return False
|
||||
|
||||
# ok
|
||||
if " https://ok.ru/res/i/" in url:
|
||||
return False
|
||||
|
||||
# vk
|
||||
if "https://vk.com/emoji/" in url:
|
||||
return False
|
||||
if "vk.com/images/" in url:
|
||||
return False
|
||||
if "vk.com/images/reaction/" in url:
|
||||
return False
|
||||
|
||||
# wikipedia
|
||||
if "wikipedia.org/static" in url:
|
||||
return False
|
||||
for parts in IRRELEVANT_URLS:
|
||||
if all(part in clean_url for part in parts):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -22,35 +22,35 @@ from loguru import logger
|
||||
|
||||
class CookieSettingDriver(webdriver.Firefox):
|
||||
facebook_accept_cookies: bool
|
||||
cookies: str
|
||||
cookiejar: MozillaCookieJar
|
||||
cookie: str
|
||||
cookie_jar: MozillaCookieJar
|
||||
|
||||
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
|
||||
def __init__(self, cookie, cookie_jar, facebook_accept_cookies, *args, **kwargs):
|
||||
if os.environ.get("RUNNING_IN_DOCKER"):
|
||||
# Selenium doesn't support linux-aarch64 driver, we need to set this manually
|
||||
kwargs["service"] = webdriver.FirefoxService(executable_path="/usr/local/bin/geckodriver")
|
||||
|
||||
super(CookieSettingDriver, self).__init__(*args, **kwargs)
|
||||
self.cookies = cookies
|
||||
self.cookiejar = cookiejar
|
||||
self.cookie = cookie
|
||||
self.cookie_jar = cookie_jar
|
||||
self.facebook_accept_cookies = facebook_accept_cookies
|
||||
|
||||
def get(self, url: str):
|
||||
if self.cookies or self.cookiejar:
|
||||
if self.cookie_jar or self.cookie:
|
||||
# set up the driver to make it not 'cookie averse' (needs a context/URL)
|
||||
# get the 'robots.txt' file which should be quick and easy
|
||||
robots_url = urlunparse(urlparse(url)._replace(path="/robots.txt", query="", fragment=""))
|
||||
super(CookieSettingDriver, self).get(robots_url)
|
||||
|
||||
if self.cookies:
|
||||
if self.cookie:
|
||||
# an explicit cookie is set for this site, use that first
|
||||
for cookie in self.cookies.split(";"):
|
||||
for name, value in cookie.split("="):
|
||||
self.driver.add_cookie({"name": name, "value": value})
|
||||
elif self.cookiejar:
|
||||
domain = urlparse(url).netloc
|
||||
elif self.cookie_jar:
|
||||
domain = urlparse(url).netloc.removeprefix("www.")
|
||||
regex = re.compile(f"(www)?.?{domain}$")
|
||||
for cookie in self.cookiejar:
|
||||
for cookie in self.cookie_jar:
|
||||
if regex.match(cookie.domain):
|
||||
try:
|
||||
self.add_cookie(
|
||||
@@ -145,8 +145,8 @@ class Webdriver:
|
||||
|
||||
try:
|
||||
self.driver = CookieSettingDriver(
|
||||
cookies=self.auth.get("cookies"),
|
||||
cookiejar=self.auth.get("cookies_jar"),
|
||||
cookie=self.auth.get("cookie"),
|
||||
cookie_jar=self.auth.get("cookies_jar"),
|
||||
facebook_accept_cookies=self.facebook_accept_cookies,
|
||||
options=options,
|
||||
)
|
||||
|
||||
@@ -118,7 +118,7 @@ def pytest_runtest_setup(item):
|
||||
pytest.xfail(f"previous test failed ({test_name})")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def unpickle():
|
||||
"""
|
||||
Returns a helper function that unpickles a file
|
||||
|
||||
11
tests/data/test_modules/example_extractor/__manifest__.py
Normal file
11
tests/data/test_modules/example_extractor/__manifest__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
# Display Name of your module
|
||||
"name": "Example Extractor",
|
||||
# Optional version number, for your own versioning purposes
|
||||
"version": 2.0,
|
||||
# The type of the module, must be one (or more) of the built in module types
|
||||
"type": ["extractor"],
|
||||
# a boolean indicating whether or not a module requires additional user setup before it can be used
|
||||
# for example: adding API keys, installing additional software etc.
|
||||
"requires_setup": False,
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
from auto_archiver.core import Extractor
|
||||
|
||||
|
||||
class ExampleExtractor(Extractor):
|
||||
def download(self, item):
|
||||
print("download")
|
||||
@@ -85,8 +85,8 @@ def test_enrich_adds_screenshot(
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_driver_class.assert_called_once_with(
|
||||
cookies=None,
|
||||
cookiejar=None,
|
||||
cookie=None,
|
||||
cookie_jar=None,
|
||||
facebook_accept_cookies=False,
|
||||
options=mock_options_instance,
|
||||
)
|
||||
@@ -124,6 +124,38 @@ def test_enrich_auth_wall(
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
|
||||
|
||||
def test_skip_authwall_no_cookies(screenshot_enricher, caplog):
|
||||
with caplog.at_level("WARNING"):
|
||||
screenshot_enricher.enrich(Metadata().set_url("https://instagram.com"))
|
||||
assert "[SKIP] SCREENSHOT since url" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"auth",
|
||||
[
|
||||
{"cookie": "cookie"},
|
||||
{"cookies_jar": "cookie"},
|
||||
],
|
||||
)
|
||||
def test_dont_skip_authwall_with_cookies(screenshot_enricher, caplog, mocker, mock_selenium_env, auth):
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=True)
|
||||
|
||||
# patch the authentication dict:
|
||||
screenshot_enricher.authentication = {"example.com": auth}
|
||||
with caplog.at_level("WARNING"):
|
||||
screenshot_enricher.enrich(Metadata().set_url("https://example.com"))
|
||||
assert "[SKIP] SCREENSHOT since url" not in caplog.text
|
||||
|
||||
|
||||
def test_show_warning_wrong_auth_type(screenshot_enricher, caplog, mocker, mock_selenium_env):
|
||||
mock_driver, mock_driver_class, _ = mock_selenium_env
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=True)
|
||||
screenshot_enricher.authentication = {"example.com": {"username": "user", "password": "pass"}}
|
||||
with caplog.at_level("WARNING"):
|
||||
screenshot_enricher.enrich(Metadata().set_url("https://example.com"))
|
||||
assert "Screenshot enricher only supports cookie-type authentication" in caplog.text
|
||||
|
||||
|
||||
def test_handle_timeout_exception(screenshot_enricher, metadata_with_video, mock_selenium_env, mocker):
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from zipfile import ZipFile
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.core.consts import SetupError
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -22,6 +23,15 @@ def wacz_enricher(setup_module, mock_binary_dependencies):
|
||||
return wacz
|
||||
|
||||
|
||||
def test_raises_error_without_docker_installed(setup_module, mocker, caplog):
|
||||
# pretend that docker isn't installed
|
||||
mocker.patch("shutil.which").return_value = None
|
||||
with pytest.raises(SetupError):
|
||||
setup_module("wacz_extractor_enricher", {})
|
||||
|
||||
assert "requires external dependency 'docker' which is not available/setup" in caplog.text
|
||||
|
||||
|
||||
def test_setup_without_docker(wacz_enricher, mocker):
|
||||
mocker.patch.dict(os.environ, {"RUNNING_IN_DOCKER": "1"}, clear=True)
|
||||
wacz_enricher.setup()
|
||||
|
||||
@@ -40,6 +40,22 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
path = os.path.join(dirname(dirname(__file__)), "data/")
|
||||
assert self.extractor.dropin_for_name("dropin", additional_paths=[path])
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, suitable_extractors",
|
||||
[
|
||||
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
|
||||
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
|
||||
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
|
||||
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
|
||||
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
|
||||
],
|
||||
)
|
||||
def test_suitable_extractors(self, url, suitable_extractors):
|
||||
suitable_extractors = suitable_extractors + ["generic"] # the generic is valid for all
|
||||
extractors = list(self.extractor.suitable_extractors(url))
|
||||
assert len(extractors) == len(suitable_extractors)
|
||||
assert [e.ie_key().lower() for e in extractors] == suitable_extractors
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, is_suitable",
|
||||
[
|
||||
@@ -55,7 +71,7 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
("https://google.com", True),
|
||||
],
|
||||
)
|
||||
def test_suitable_urls(self, make_item, url, is_suitable):
|
||||
def test_suitable_urls(self, url, is_suitable):
|
||||
"""
|
||||
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
|
||||
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
|
||||
@@ -190,10 +206,11 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
"Onion rings are just vegetable donuts.",
|
||||
"Cookie Monster - Onion rings are just vegetable donuts.",
|
||||
datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc),
|
||||
"yt-dlp_Twitter: success",
|
||||
)
|
||||
assert post.get("content") == "Onion rings are just vegetable donuts."
|
||||
|
||||
@pytest.mark.download
|
||||
def test_twitter_download_video(self, make_item):
|
||||
@@ -245,3 +262,32 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
self.assertValidResponseMetadata(post, title, timestamp)
|
||||
assert len(post.media) == 1
|
||||
assert post.media[0].hash == image_hash
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_video(self, make_item):
|
||||
post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133"))
|
||||
assert len(post.media) == 2
|
||||
assert post.media[0].filename.endswith("588371253839133.mp4")
|
||||
assert post.media[0].mimetype == "video/mp4"
|
||||
|
||||
assert post.media[1].filename.endswith(".jpg")
|
||||
assert post.media[1].mimetype == "image/jpeg"
|
||||
|
||||
assert "Bellingchat Premium is with Kolina Koltai" in post.get_title()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_image(self, make_item):
|
||||
post = self.extractor.download(
|
||||
make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/")
|
||||
)
|
||||
|
||||
assert len(post.media) == 1
|
||||
assert post.media[0].filename.endswith(".png")
|
||||
assert "Byline Festival - BylineFest Partner" == post.get_title()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_text_only(self, make_item):
|
||||
url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl"
|
||||
post = self.extractor.download(make_item(url))
|
||||
assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get("content")
|
||||
assert post.get_title() == "Bellingcat"
|
||||
|
||||
@@ -4,6 +4,8 @@ import pytest
|
||||
import yt_dlp
|
||||
|
||||
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
|
||||
from auto_archiver.modules.generic_extractor.tiktok import Tiktok, TikTokIE
|
||||
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
@@ -17,11 +19,16 @@ def skip_ytdlp_own_methods(mocker):
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def mock_get(mocker):
|
||||
return mocker.patch("auto_archiver.modules.generic_extractor.tiktok.requests.get")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tiktok_dropin() -> Tiktok:
|
||||
return Tiktok()
|
||||
|
||||
|
||||
class TestTiktokTikwmExtractor(TestExtractorBase):
|
||||
"""
|
||||
Test suite for TestTiktokTikwmExtractor.
|
||||
@@ -34,6 +41,25 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
||||
|
||||
VALID_EXAMPLE_URL = "https://www.tiktok.com/@example/video/1234"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, is_suitable",
|
||||
[
|
||||
("https://bellingcat.com", False),
|
||||
("https://youtube.com", False),
|
||||
("https://tiktok.co/", False),
|
||||
("https://tiktok.com/", False),
|
||||
("https://www.tiktok.com/", False),
|
||||
("https://api.cool.tiktok.com/", False),
|
||||
(VALID_EXAMPLE_URL, True),
|
||||
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
|
||||
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
|
||||
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
|
||||
("https://vt.tiktok.com/ZSMTJeqRP/", True),
|
||||
],
|
||||
)
|
||||
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
|
||||
assert tiktok_dropin.suitable(url, TikTokIE()) == is_suitable
|
||||
|
||||
def test_invalid_json_responses(self, mock_get, make_item, caplog):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.side_effect = ValueError
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
from auto_archiver.core.module import ModuleFactory, LazyBaseModule
|
||||
from auto_archiver.core.base_module import BaseModule
|
||||
from auto_archiver.core.consts import SetupError
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -25,11 +26,9 @@ def test_python_dependency_check(example_module):
|
||||
# monkey patch the manifest to include a nonexistnet dependency
|
||||
example_module.manifest["dependencies"]["python"] = ["does_not_exist"]
|
||||
|
||||
with pytest.raises(SystemExit) as load_error:
|
||||
with pytest.raises(SetupError):
|
||||
example_module.load({})
|
||||
|
||||
assert load_error.value.code == 1
|
||||
|
||||
|
||||
def test_binary_dependency_check(example_module):
|
||||
# example_module requires ffmpeg, which is not installed
|
||||
@@ -81,8 +80,20 @@ def test_load_modules(module_name):
|
||||
# check that default settings are applied
|
||||
default_config = module.configs
|
||||
assert loaded_module.name in loaded_module.config.keys()
|
||||
defaults = {k for k in default_config}
|
||||
assert defaults in [loaded_module.config[module_name].keys()]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_config_defaults(module_name):
|
||||
# test the values of the default config values are set
|
||||
# Note: some modules can alter values in the setup() method, this test checks cases that don't
|
||||
module = ModuleFactory().get_module_lazy(module_name)
|
||||
loaded_module = module.load({})
|
||||
# check that default config values are set
|
||||
default_config = module.configs
|
||||
defaults = {k: v.get("default") for k, v in default_config.items()}
|
||||
assert loaded_module.config[module_name] == defaults
|
||||
assert defaults == loaded_module.config[module_name]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
|
||||
@@ -4,6 +4,7 @@ from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.core.consts import SetupError
|
||||
|
||||
TEST_ORCHESTRATION = "tests/data/test_orchestration.yaml"
|
||||
TEST_MODULES = "tests/data/test_modules/"
|
||||
@@ -224,3 +225,15 @@ def test_multiple_orchestrator(test_args):
|
||||
output: Metadata = list(o2.feed())
|
||||
assert len(output) == 1
|
||||
assert output[0].get_url() == "https://example.com"
|
||||
|
||||
|
||||
def test_wrong_step_type(test_args, caplog):
|
||||
args = test_args + [
|
||||
"--feeders",
|
||||
"example_extractor", # example_extractor is not a valid feeder!
|
||||
]
|
||||
|
||||
orchestrator = ArchivingOrchestrator()
|
||||
with pytest.raises(SetupError) as err:
|
||||
orchestrator.setup(args)
|
||||
assert "Module 'example_extractor' is not a feeder" in str(err.value)
|
||||
|
||||
113
tests/utils/test_urls.py
Normal file
113
tests/utils/test_urls.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import pytest
|
||||
from auto_archiver.utils.url import (
|
||||
is_auth_wall,
|
||||
check_url_or_raise,
|
||||
domain_for_url,
|
||||
is_relevant_url,
|
||||
remove_get_parameters,
|
||||
twitter_best_quality_url,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, is_auth",
|
||||
[
|
||||
("https://example.com", False),
|
||||
("https://t.me/c/abc/123", True),
|
||||
("https://t.me/not-private/", False),
|
||||
("https://instagram.com", True),
|
||||
("https://www.instagram.com", True),
|
||||
("https://www.instagram.com/p/INVALID", True),
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG/", True),
|
||||
],
|
||||
)
|
||||
def test_is_auth_wall(url, is_auth):
|
||||
assert is_auth_wall(url) == is_auth
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, raises",
|
||||
[
|
||||
("http://example.com", False),
|
||||
("https://example.com", False),
|
||||
("ftp://example.com", True),
|
||||
("http://localhost", True),
|
||||
("http://", True),
|
||||
],
|
||||
)
|
||||
def test_check_url_or_raise(url, raises):
|
||||
if raises:
|
||||
with pytest.raises(ValueError):
|
||||
check_url_or_raise(url)
|
||||
else:
|
||||
assert check_url_or_raise(url)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, domain",
|
||||
[
|
||||
("https://example.com", "example.com"),
|
||||
("https://www.example.com", "www.example.com"),
|
||||
("https://www.example.com/path", "www.example.com"),
|
||||
("https://", ""),
|
||||
("http://localhost", "localhost"),
|
||||
],
|
||||
)
|
||||
def test_domain_for_url(url, domain):
|
||||
assert domain_for_url(url) == domain
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, without_get",
|
||||
[
|
||||
("https://example.com", "https://example.com"),
|
||||
("https://example.com?utm_source=example", "https://example.com"),
|
||||
("https://example.com?utm_source=example&other=1", "https://example.com"),
|
||||
("https://example.com/something", "https://example.com/something"),
|
||||
("https://example.com/something?utm_source=example", "https://example.com/something"),
|
||||
],
|
||||
)
|
||||
def test_remove_get_parameters(url, without_get):
|
||||
assert remove_get_parameters(url) == without_get
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, relevant",
|
||||
[
|
||||
("https://example.com", True),
|
||||
("https://example.com/favicon.ico", False),
|
||||
("https://twimg.com/profile_images", False),
|
||||
("https://twimg.com/something/default_profile_images", False),
|
||||
("https://scontent.cdninstagram.com/username/150x150.jpg", False),
|
||||
("https://static.cdninstagram.com/rsrc.php/", False),
|
||||
("https://telegram.org/img/emoji/", False),
|
||||
("https://www.youtube.com/s/gaming/emoji/", False),
|
||||
("https://yt3.ggpht.com/default-user=", False),
|
||||
("https://www.youtube.com/s/search/audio/", False),
|
||||
("https://ok.ru/res/i/", False),
|
||||
("https://vk.com/emoji/", False),
|
||||
("https://vk.com/images/", False),
|
||||
("https://vk.com/images/reaction/", False),
|
||||
("https://wikipedia.org/static", False),
|
||||
("https://example.com/file.svg", False),
|
||||
("https://example.com/file.ico", False),
|
||||
("https://example.com/file.mp4", True),
|
||||
("https://example.com/150x150.jpg", True),
|
||||
("https://example.com/rsrc.php/", True),
|
||||
("https://example.com/img/emoji/", True),
|
||||
],
|
||||
)
|
||||
def test_is_relevant_url(url, relevant):
|
||||
assert is_relevant_url(url) == relevant
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, best_quality",
|
||||
[
|
||||
("https://twitter.com/some_image.jpg?name=small", "https://twitter.com/some_image.jpg?name=orig"),
|
||||
("https://twitter.com/some_image.jpg", "https://twitter.com/some_image.jpg"),
|
||||
("https://twitter.com/some_image.jpg?name=orig", "https://twitter.com/some_image.jpg?name=orig"),
|
||||
],
|
||||
)
|
||||
def test_twitter_best_quality_url(url, best_quality):
|
||||
assert twitter_best_quality_url(url) == best_quality
|
||||
Reference in New Issue
Block a user