Compare commits

...

88 Commits

Author SHA1 Message Date
Patrick Robertson
6f10270baf Remove unittest and switch to pytest fully 2025-01-14 16:28:39 +01:00
Patrick Robertson
cef4037ad5 Add documentation on running tests to the readme 2025-01-14 11:30:06 +01:00
Patrick Robertson
1b1af2f0b1 Revert change to twitter_archiver
As per discussion at: https://github.com/bellingcat/auto-archiver/pull/165#discussion_r1905930837
2025-01-14 10:30:41 +01:00
Patrick Robertson
8f17a235f3 Switch to ubuntu-22.04 for CI tests
An issue with oscrypto means it currently does not work on 24.04. Ref: https://github.com/wbond/oscrypto/issues/78#issuecomment-2565688091
2025-01-14 10:24:14 +01:00
Patrick Robertson
ab2eb3c7f5 Add dev dependencies to poetry 2025-01-13 20:42:08 +01:00
Patrick Robertson
bdfedfcf61 Merge branch 'main' into feat/unittest 2025-01-13 19:50:47 +01:00
Erin Clark
9cdaea873b Merge pull request #164 from bellingcat/ec_add_poetry
Migrate to Poetry
2025-01-13 18:49:15 +00:00
erinhmclark
84ee1b422f Update and restrict versions of Poetry and Python. 2025-01-13 17:42:51 +00:00
Patrick Robertson
b9aea99de8 Prettify pytest output 2025-01-13 18:41:24 +01:00
Patrick Robertson
52f064908e Add unit test badges to readme 2025-01-13 18:33:22 +01:00
Patrick Robertson
9b596e59d6 Run expensive download tests once per week, on a month at 2:35pm
(time is offset from the hour to alleviate high load on Github
2025-01-13 18:33:02 +01:00
Patrick Robertson
528b78db85 Flag tombstone tweets for twitter_syndication method 2025-01-13 18:17:24 +01:00
Patrick Robertson
57eacdc24a Merge branch 'main' into feat/unittest 2025-01-13 18:06:55 +01:00
Patrick Robertson
bbef80de4c Add unit tests for html_formatter, csv_db 2025-01-13 17:58:10 +01:00
Patrick Robertson
930d78096a Merge pull request #162 from bellingcat/small_issues
Fix two small issues
2025-01-13 16:39:59 +01:00
Patrick Robertson
2353f9d6a5 Separate CI for download tests and core tests 2025-01-13 16:27:46 +01:00
Patrick Robertson
63973e2ce7 switch to pytest and pytest-recording 2025-01-13 16:23:20 +01:00
erinhmclark
e9a7f435a3 Add package dist directory to .gitignore 2025-01-13 13:33:23 +00:00
Patrick Robertson
e2bc84ccb9 Merge branch 'main' into feat/unittest 2025-01-13 13:15:13 +01:00
erinhmclark
72a8e76fbb Update README.md for usage with Poetry. 2025-01-12 20:21:23 +00:00
erinhmclark
c69a5fa1c9 Refactor Dockerfile for multi-stage builds.
Combining environment and runtime stages due to Poetry's dependency on source code.
2025-01-12 12:38:12 +00:00
erinhmclark
d80b4b7557 Remove snscrape and Python 3.12 restriction. 2025-01-12 12:15:56 +00:00
erinhmclark
cc490f9c10 Updated Dockerfile (not optimised yet) 2025-01-12 12:15:56 +00:00
erinhmclark
08e83eb94e Update pyproject.toml configuration for Poetry version 2.0.0. 2025-01-12 12:15:56 +00:00
erinhmclark
dd822b8b44 Update poetry.lock 2025-01-12 12:15:56 +00:00
erinhmclark
4a63ca7753 Update PyPi workflow to read python version from pyproject.toml. 2025-01-12 12:15:56 +00:00
erinhmclark
6d5b0090d9 Pull version from pyproject.toml file/ 2025-01-12 12:15:56 +00:00
erinhmclark
26abd6f7ae Added TODO comment for adding a version restriction. 2025-01-12 12:15:56 +00:00
erinhmclark
dba8f46016 Replaced comments for python-publish.yaml workflow. 2025-01-12 12:15:56 +00:00
erinhmclark
50e8c93477 Updated workflow for python-publish.yaml to use poetry (untested), and cleanup of pipenv files. 2025-01-12 12:15:56 +00:00
erinhmclark
6da837b374 Add note to update dynamic versioning and references to version. 2025-01-12 12:15:56 +00:00
erinhmclark
660ee82c67 Update Dockerfile for poetry.
Note: Review security with curl installation. Currently locked to known version, but additional checks could be added.
2025-01-12 12:15:56 +00:00
erinhmclark
5490947657 Add packaging to Poetry. 2025-01-12 12:15:56 +00:00
erinhmclark
fd9a6c26ed Create Poetry environment.
Required addition of transitive package (pyOpenSSL) and version restrictions on cryptography, boto3.
2025-01-12 12:15:56 +00:00
Patrick Robertson
3546d4ad79 Fix 'download_syndication' method for tweet archiving (now requires a token)
Plus add in unit tests for token generation + download syndication
2025-01-12 12:55:00 +01:00
Patrick Robertson
c932fb7416 Improved logging when an invalid/deleted tweet is attempted to be downloaded
Plus: unit tests for non-existent tweet + invalid tweet ID
2025-01-12 12:00:45 +01:00
Patrick Robertson
f29950905c Merge branch 'main' into small_issues 2025-01-12 11:47:55 +01:00
Patrick Robertson
8e99d62c97 Merge pull request #165 from bellingcat/fix/snscrape
Remove snscrape from the twitter_archiver
2025-01-09 11:06:14 +01:00
Patrick Robertson
9dc4eb35de Switch to pytest and use vcr for request storing 2025-01-08 11:25:13 +01:00
Patrick Robertson
8c044c15f0 Add base test class for archivers with boilerplate code
Plus: create test class for twitter archiver. Currently WIP
2025-01-08 10:38:56 +01:00
Patrick Robertson
ab9335bb7a Merge branch 'main' into feat/unittest 2025-01-08 10:35:45 +01:00
Patrick Robertson
add83c9650 Remove snscrape from twitter_archiver
1. snscrape twitter downloader no longer works (ref: https://github.com/JustAnotherArchivist/snscrape/issues/1045)
2. snscrape limits python to versions <3.12
2025-01-07 19:40:19 +01:00
Miguel Sozinho Ramalho
a697f0a212 adds an unauthenticated Bluesky archiver (#160)
* adds a TODO for next code iterations

* implements bsky archiver

* adds new archiver to example orchestration file

* Fix downloading media for posts with multiple images

(Images are stored in media/images)

* Setup a basic framework for unit tests

Use 'python -m unittest' from the project root to run

---------

Co-authored-by: Patrick Robertson <robertson.patrick@gmail.com>
2025-01-07 10:28:07 +00:00
Patrick Robertson
bffa3a6254 Merge pull request #159 from bellingcat/print_pdf
Add 'print_pdf' option to the screenshot enricher. Fixes #132
2025-01-06 18:13:38 +01:00
Miguel Sozinho Ramalho
ef471f41e1 adds better debug for wayback failures (#161) 2025-01-06 16:49:11 +00:00
Patrick Robertson
928518cda7 Allow setting cookies for yt-dl (#158) 2025-01-06 16:19:53 +00:00
Patrick Robertson
1bd017000e Add Github CI test workflow 2024-12-31 15:20:33 +01:00
Patrick Robertson
33e967ce4b Update pipfile for:
- pyopenssl==24.2.1
- youtube-dlp==2024.09.27
- numpy==2.1.3

Fixes building/local installs. Also fixes #155
2024-12-31 15:20:11 +01:00
Patrick Robertson
30d423c8e6 Setup a basic framework for unit tests
Use 'python -m unittest' from the project root to run
2024-12-31 14:29:52 +01:00
Patrick Robertson
0c803f15a5 Fix showing preview images in the .html file when using local storage
Local storage media urls are prefixed with '/', previously only http(s) media preview src were displayed
2024-12-31 09:29:31 +01:00
Patrick Robertson
a46f9997ea Better logging when there's a timestamp parse error 2024-12-31 09:28:08 +01:00
msramalho
83da9ae089 adds pdf preview support for html formatter 2024-12-23 18:19:26 +00:00
Patrick Robertson
663c8ad93a Add 'print_pdf' option to the screenshot enricher. Fixes #132 2024-12-20 07:14:03 +01:00
msramalho
e49550163f adds proxy_server option to wacz 2024-10-06 10:45:34 +06:00
msramalho
e6f5981afc numpy version downgrade 2024-10-06 10:10:04 +06:00
msramalho
c62bf1a34d yt-dlp version bump 2024-10-05 17:43:07 +06:00
msramalho
b166d57e61 v0.12.0 bump 2024-08-21 13:34:34 +01:00
msramalho
11c3288267 closes #146 2024-08-21 13:33:58 +01:00
msramalho
004143a58a version bump v0.11.6 2024-07-18 11:27:39 +01:00
msramalho
686f0027c4 adds new entries to example orchestration file 2024-07-18 11:27:15 +01:00
dependabot[bot]
b03cf32c73 Bump authlib from 1.3.0 to 1.3.1 (#144)
Bumps [authlib](https://github.com/lepture/authlib) from 1.3.0 to 1.3.1.
- [Release notes](https://github.com/lepture/authlib/releases)
- [Changelog](https://github.com/lepture/authlib/blob/master/docs/changelog.rst)
- [Commits](https://github.com/lepture/authlib/compare/v1.3.0...v1.3.1)

---
updated-dependencies:
- dependency-name: authlib
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-07-18 11:26:22 +01:00
msramalho
dc9e64397e bumping yt-dlp 2024-07-18 11:23:09 +01:00
msramalho
c7bc5e2988 cleanup 2024-05-15 11:04:29 +01:00
msramalho
1e375bd740 version bump 2024-05-14 16:42:15 +01:00
Miguel Sozinho Ramalho
f8824691dd refactors free twitter archiver strategies (#142) 2024-05-14 16:23:33 +01:00
msramalho
012cc36609 removes deprecated datetime method 2024-05-14 15:54:50 +01:00
Miguel Sozinho Ramalho
7cfe1e39cc #135 fix cleanup of telethon session files (#139)
* closes #135

* version bump
2024-04-16 12:45:45 +01:00
Jett Chen
cf8691bad7 Add yt-dlp based archiving for TwitterArchiver (#138)
* Add ytdlp archiving capability

* Add type annotation

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
2024-04-15 19:54:55 +01:00
R. Miles McCain
f603400d0d Add direct Atlos integration (#137)
* Add Atlos feeder

* Add Atlos db

* Add Atlos storage

* Fix Atlos storages

* Fix Atlos feeder

* Only include URLs in Atlos feeder once they're processed

* Remove print

* Add Atlos documentation to README

* Formatting fixes

* Don't archive existing material

* avoid KeyError in atlos_db

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
2024-04-15 19:25:17 +01:00
msramalho
eb37f0b45b version bump 2024-04-15 19:02:54 +01:00
msramalho
75497f5773 minor bug fix when using an archiver_enricher in enrichers only 2024-04-15 19:02:40 +01:00
msramalho
623e555713 dependencies updates 2024-04-15 19:02:20 +01:00
msramalho
9c7824de57 browsertrix docker updates 2024-04-15 19:01:55 +01:00
msramalho
f4827770e6 adds instagram no stories as success, and fix for telethon-based archivers. 2024-03-05 14:49:10 +00:00
msramalho
601572d76e strip url 2024-02-29 11:54:01 +00:00
msramalho
d21e79a272 general security updates 2024-02-29 11:40:30 +00:00
msramalho
ccf5f857ef adds configurable limits to instagram/youtube 2024-02-25 15:14:17 +00:00
msramalho
7de317d1b5 avoiding exception 2024-02-23 15:54:33 +00:00
msramalho
70075a1e5e improving insta archiver 2024-02-23 15:37:28 +00:00
msramalho
5b9bc4919a version bump 2024-02-23 14:08:23 +00:00
msramalho
f0158ffd9c adds tagged posts and better parsing 2024-02-23 14:08:17 +00:00
msramalho
bfb35a43a9 adds more details from yt-dlp 2024-02-23 14:08:05 +00:00
msramalho
ef5b39c4f1 dind exception 2024-02-22 18:05:56 +00:00
msramalho
24ceafcb64 missing forward slash 2024-02-22 17:47:13 +00:00
msramalho
9fd4bb56a8 new attempt at dind wacz 2024-02-22 17:24:27 +00:00
msramalho
5324d562ba cleanup wacz patch 2024-02-21 18:14:30 +00:00
msramalho
5bf0a0206d version update 2024-02-21 17:26:07 +00:00
msramalho
4941823565 fix growing volume size in wacz_enricher 2024-02-21 17:25:55 +00:00
61 changed files with 4727 additions and 2407 deletions

View File

@@ -1,4 +1,4 @@
# This workflow will upload a Python Package using Twine when a release is created
# This workflow uploads a Python Package to PyPI using Poetry when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
# This workflow uses actions that are not certified by GitHub.
@@ -21,30 +21,34 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Checkout Repository
uses: actions/checkout@v3
- name: Set up Python 3.10
- name: Extract Python Version from pyproject.toml
id: python-version
run: |
version=$(grep 'python =' pyproject.toml | awk -F'"' '{print $2}' | tr -d '^~<=>')
echo "python-version=$version" >> $GITHUB_ENV
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: ${{ env.python-version }}
- name: Install Poetry
run: |
python -m pip install --upgrade pip
python -m pip install "poetry>=2.0.0,<3.0.0"
- name: Install dependencies
run: |
python -m pip install --upgrade --upgrade-strategy=eager pip setuptools wheel twine pipenv
python -m pip install -e . --upgrade
python -m pipenv install --dev --python 3.10
env:
PIPENV_DEFAULT_PYTHON_VERSION: "3.10"
poetry install --no-root
- name: Build wheels
- name: Build the package
run: |
python -m pipenv run python setup.py sdist bdist_wheel
poetry build
- name: Publish a Python distribution to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
verbose: true
skip_existing: true
password: ${{ secrets.PYPI_API_TOKEN }}
packages_dir: dist/
# Step 6: Publish to PyPI
- name: Publish to PyPI
run: |
poetry publish --username __token__ --password ${{ secrets.PYPI_API_TOKEN }}

38
.github/workflows/tests-core.yaml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: Core Tests
on:
push:
branches: [ main ]
paths:
- src/**
pull_request:
paths:
- src/**
jobs:
tests:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
defaults:
run:
working-directory: ./
steps:
- uses: actions/checkout@v4
- name: Install Poetry
run: pipx install poetry
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'poetry'
- name: Install dependencies
run: poetry install --no-interaction --with dev
- name: Run Core Tests
run: poetry run pytest -ra -v -m "not download"

38
.github/workflows/tests-download.yaml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: Download Tests
on:
schedule:
- cron: '35 14 * * 1'
pull_request:
branches: [ main ]
paths:
- src/**
jobs:
tests:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
python-version: ["3.10"] # only run expensive downloads on one (lowest) python version
defaults:
run:
working-directory: ./
steps:
- uses: actions/checkout@v4
- name: Install poetry
run: pipx install poetry
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'poetry'
- name: Install dependencies
run: poetry install --no-interaction --with dev
- name: Run Download Tests
run: poetry run pytest -ra -v -m "download"

4
.gitignore vendored
View File

@@ -27,4 +27,6 @@ instaloader.session
orchestration.yaml
auto_archiver.egg-info*
logs*
*.csv
*.csv
archived/
dist*

View File

@@ -1,31 +1,58 @@
FROM webrecorder/browsertrix-crawler:latest
FROM webrecorder/browsertrix-crawler:1.0.4 AS base
ENV RUNNING_IN_DOCKER=1
ENV RUNNING_IN_DOCKER=1 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONFAULTHANDLER=1 \
PATH="/root/.local/bin:$PATH"
# Installing system dependencies
RUN add-apt-repository ppa:mozillateam/ppa && \
apt-get update && \
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool && \
apt-get install -y --no-install-recommends firefox-esr && \
ln -s /usr/bin/firefox-esr /usr/bin/firefox && \
wget https://github.com/mozilla/geckodriver/releases/download/v0.33.0/geckodriver-v0.33.0-linux64.tar.gz && \
tar -xvzf geckodriver* -C /usr/local/bin && \
chmod +x /usr/local/bin/geckodriver && \
rm geckodriver-v* && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Poetry and runtime
FROM base AS runtime
ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1
RUN pip install --upgrade pip && \
pip install "poetry>=2.0.0,<3.0.0"
WORKDIR /app
RUN pip install --upgrade pip && \
pip install pipenv && \
add-apt-repository ppa:mozillateam/ppa && \
apt-get update && \
apt-get install -y gcc ffmpeg fonts-noto exiftool && \
apt-get install -y --no-install-recommends firefox-esr && \
ln -s /usr/bin/firefox-esr /usr/bin/firefox && \
wget https://github.com/mozilla/geckodriver/releases/download/v0.33.0/geckodriver-v0.33.0-linux64.tar.gz && \
tar -xvzf geckodriver* -C /usr/local/bin && \
chmod +x /usr/local/bin/geckodriver && \
rm geckodriver-v*
COPY pyproject.toml poetry.lock README.md ./
# Copy dependency files and install dependencies (excluding the package itself)
RUN poetry install --only main --no-root --no-cache
COPY Pipfile* ./
# install from pipenv, with browsertrix-only requirements
RUN pipenv install && \
pipenv install pywb uwsgi
# doing this at the end helps during development, builds are quick
COPY ./src/ .
# Copy code: This is needed for poetry to install the package itself,
# but the environment should be cached from the previous step if toml and lock files haven't changed
COPY ./src/ .
RUN poetry install --only main --no-cache
ENTRYPOINT ["pipenv", "run", "python3", "-m", "auto_archiver"]
# Update PATH to include virtual environment binaries
# Allowing entry point to run the application directly with Python
ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"
ENTRYPOINT ["python3", "-m", "auto_archiver"]
# should be executed with 2 volumes (3 if local_storage is used)
# docker run --rm -v $PWD/secrets:/app/secrets -v $PWD/local_archive:/app/local_archive aa pipenv run python3 -m auto_archiver --config secrets/orchestration.yaml

50
Pipfile
View File

@@ -1,50 +0,0 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
gspread = "*"
boto3 = "*"
argparse = "*"
beautifulsoup4 = "*"
tiktok-downloader = "*"
bs4 = "*"
loguru = "*"
ffmpeg-python = "*"
selenium = "*"
snscrape = "*"
telethon = "*"
google-api-python-client = "*"
google-auth-httplib2 = "*"
google-auth-oauthlib = "*"
oauth2client = "*"
pdqhash = "*"
pillow = "*"
python-slugify = "*"
pyyaml = "*"
dateparser = "*"
python-twitter-v2 = "*"
instaloader = "*"
tqdm = "*"
jinja2 = "*"
cryptography = "*"
dataclasses-json = "*"
yt-dlp = "*"
vk-url-scraper = "*"
requests = {extras = ["socks"], version = "*"}
numpy = "*"
warcio = "*"
jsonlines = "*"
pysubs2 = "*"
minify-html = "*"
retrying = "*"
tsp-client = "*"
certvalidator = "*"
[dev-packages]
autopep8 = "*"
setuptools-pipfile = "*"
[requires]
python_version = "3.10"

2093
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,8 @@
[![PyPI version](https://badge.fury.io/py/auto-archiver.svg)](https://badge.fury.io/py/auto-archiver)
[![Docker Image Version (latest by date)](https://img.shields.io/docker/v/bellingcat/auto-archiver?label=version&logo=docker)](https://hub.docker.com/r/bellingcat/auto-archiver)
[![Core Test Status](https://github.com/bellingcat/auto-archiver/workflows/Core%20Tests/badge.svg)](https://github.com/bellingcat/auto-archiver/actions/workflows/tests-core.yaml)
[![Download Test Status](https://github.com/bellingcat/auto-archiver/workflows/Download%20Tests/badge.svg)](https://github.com/bellingcat/auto-archiver/actions/workflows/tests-download.yaml)
<!-- ![Docker Pulls](https://img.shields.io/docker/pulls/bellingcat/auto-archiver) -->
<!-- [![PyPI download month](https://img.shields.io/pypi/dm/auto-archiver.svg)](https://pypi.python.org/pypi/auto-archiver/) -->
<!-- [![Documentation Status](https://readthedocs.org/projects/vk-url-scraper/badge/?version=latest)](https://vk-url-scraper.readthedocs.io/en/latest/?badge=latest) -->
@@ -47,8 +49,8 @@ Docker works like a virtual machine running inside your computer, it isolates ev
<details><summary><code>Python package instructions</code></summary>
1. make sure you have python 3.8 or higher installed
2. install the package `pip/pipenv/conda install auto-archiver`
1. make sure you have python 3.10 or higher installed
2. install the package with your preferred package manager: `pip/pipenv/conda install auto-archiver` or `poetry add auto-archiver`
3. test it's installed with `auto-archiver --help`
4. run it with your orchestration file and pass any flags you want in the command line `auto-archiver --config secrets/orchestration.yaml` if your orchestration file is inside a `secrets/`, which we advise
@@ -66,12 +68,13 @@ This can also be used for development.
Install the following locally:
1. [ffmpeg](https://www.ffmpeg.org/) must also be installed locally for this tool to work.
2. [firefox](https://www.mozilla.org/en-US/firefox/new/) and [geckodriver](https://github.com/mozilla/geckodriver/releases) on a path folder like `/usr/local/bin`.
3. (optional) [fonts-noto](https://fonts.google.com/noto) to deal with multiple unicode characters during selenium/geckodriver's screenshots: `sudo apt install fonts-noto -y`.
3. [Poetry](https://python-poetry.org/docs/#installation) for dependency management and packaging.
4. (optional) [fonts-noto](https://fonts.google.com/noto) to deal with multiple unicode characters during selenium/geckodriver's screenshots: `sudo apt install fonts-noto -y`.
Clone and run:
1. `git clone https://github.com/bellingcat/auto-archiver`
2. `pipenv install`
3. `pipenv run python -m src.auto_archiver --config secrets/orchestration.yaml`
2. `poetry install`
3. `poetry run python -m src.auto_archiver --config secrets/orchestration.yaml`
</details><br/>
@@ -108,7 +111,7 @@ configurations:
# ... configurations for the other steps here ...
```
To see all available `steps` (which archivers, storages, databses, ...) exist check the [example.orchestration.yaml](example.orchestration.yaml).
To see all available `steps` (which archivers, storages, databases, ...) exist check the [example.orchestration.yaml](example.orchestration.yaml).
All the `configurations` in the `orchestration.yaml` file (you can name it differently but need to pass it in the `--config FILENAME` argument) can be seen in the console by using the `--help` flag. They can also be overwritten, for example if you are using the `cli_feeder` to archive from the command line and want to provide the URLs you should do:
@@ -117,7 +120,7 @@ auto-archiver --config secrets/orchestration.yaml --cli_feeder.urls="url1,url2,u
```
Here's the complete workflow that the auto-archiver goes through:
```mermaid
```{mermaid}
graph TD
s((start)) --> F(fa:fa-table Feeder)
F -->|get and clean URL| D1{fa:fa-database Database}
@@ -177,6 +180,38 @@ To use Google Drive storage you need the id of the shared folder in the `config.
#### Telethon + Instagram with telegram bot
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.
#### Atlos
When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example:
```yaml
# orchestration.yaml content
steps:
feeder: atlos_feeder
archivers: # order matters
- youtubedl_archiver
enrichers:
- thumbnail_enricher
- hash_enricher
formatter: html_formatter
storages:
- atlos_storage
databases:
- console_db
- atlos_db
configurations:
atlos_feeder:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_db:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_storage:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
hash_enricher:
algorithm: "SHA-256"
```
## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.
@@ -226,6 +261,20 @@ The "archive location" link contains the path of the archived file, in local sto
## Development
Use `python -m src.auto_archiver --config secrets/orchestration.yaml` to run from the local development environment.
### Testing
Tests are split using `pytest.mark` into 'core' and 'download' tests. Download tests will hit the network and make API calls (e.g. Twitter, Bluesky etc.) and should be run regularly to make sure that APIs have not changed.
Tests can be run as follows:
```
# run core tests
pytest -ra -v -m "not download" # or poetry run pytest -ra -v -m "not download"
# run download tests
pytest -ra -v -m "download" # or poetry run pytest -ra -v -m "download"
# run all tests
pytest -ra -v # or poetry run pytest -ra -v
```
#### Docker development
working with docker locally:
* `docker build . -t auto-archiver` to build a local image

View File

@@ -2,11 +2,13 @@ steps:
# only 1 feeder allowed
feeder: gsheet_feeder # defaults to cli_feeder
archivers: # order matters, uncomment to activate
- bluesky_archiver
# - vk_archiver
# - telethon_archiver
# - telegram_archiver
# - twitter_archiver
# - twitter_api_archiver
# - instagram_api_archiver
# - instagram_tbot_archiver
# - instagram_archiver
# - tiktok_archiver
@@ -15,8 +17,13 @@ steps:
# - wacz_archiver_enricher
enrichers:
- hash_enricher
# - meta_enricher
# - metadata_enricher
# - screenshot_enricher
# - pdq_hash_enricher
# - ssl_enricher
# - timestamping_enricher
# - whisper_enricher
# - thumbnail_enricher
# - wayback_archiver_enricher
# - wacz_archiver_enricher
@@ -88,9 +95,33 @@ configurations:
password: "vk pass"
session_file: "secrets/vk_config.v2.json"
youtubedl_archiver:
subtitles: true
# use one of the following two methods to authenticate in youtube - either provide a cookies file or use the cookies of the given browser
# for more information, see https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp
# cookie_file: "secrets/youtube_cookies.txt"
# cookies_from_browser: firefox
# proxy: socks5://proxy-user:password@proxy-ip:port
screenshot_enricher:
width: 1280
height: 2300
# to save as pdf, uncomment the following lines and adjust the print options
# save_to_pdf: true
# print_options:
# for all options see https://www.selenium.dev/selenium/docs/api/py/webdriver/selenium.webdriver.common.print_page_options.html
# background: true
# orientation: "portrait"
# scale: 1
# page_width: 8.5in
# page_height: 11in
# margin_top: 0.4in
# margin_bottom: 0.4in
# margin_left: 0.4in
# margin_right: 0.4in
# page_ranges: ""
# shrink_to_fit: true
wayback_archiver_enricher:
timeout: 10
key: "wayback key"

3299
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,81 @@
[build-system]
requires = ["setuptools", "wheel", "setuptools-pipfile"]
build-backend = "setuptools.build_meta"
[tool.setuptools-pipfile]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "0.13.0"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"
license = "MIT"
authors = [
{ name = "Bellingcat", email = "tech@bellingcat.com" },
]
readme = "README.md"
keywords = ["archive", "oosi", "osint", "scraping"]
classifiers = [
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3"
]
dependencies = [
"gspread (>=0.0.0)",
"argparse (>=0.0.0)",
"beautifulsoup4 (>=0.0.0)",
"tiktok-downloader (>=0.0.0)",
"bs4 (>=0.0.0)",
"loguru (>=0.0.0)",
"ffmpeg-python (>=0.0.0)",
"selenium (>=0.0.0)",
"telethon (>=0.0.0)",
"google-api-python-client (>=0.0.0)",
"google-auth-httplib2 (>=0.0.0)",
"google-auth-oauthlib (>=0.0.0)",
"oauth2client (>=0.0.0)",
"pdqhash (>=0.0.0)",
"pillow (>=0.0.0)",
"python-slugify (>=0.0.0)",
"pyyaml (>=0.0.0)",
"dateparser (>=0.0.0)",
"python-twitter-v2 (>=0.0.0)",
"instaloader (>=0.0.0)",
"tqdm (>=0.0.0)",
"jinja2 (>=0.0.0)",
"pyOpenSSL (==24.2.1)",
"cryptography (>=41.0.0,<42.0.0)",
"boto3 (>=1.28.0,<2.0.0)",
"dataclasses-json (>=0.0.0)",
"yt-dlp (==2024.09.27)",
"numpy (==2.1.3)",
"vk-url-scraper (>=0.0.0)",
"requests[socks] (>=0.0.0)",
"warcio (>=0.0.0)",
"jsonlines (>=0.0.0)",
"pysubs2 (>=0.0.0)",
"minify-html (>=0.0.0)",
"retrying (>=0.0.0)",
"tsp-client (>=0.0.0)",
"certvalidator (>=0.0.0)",
"toml (>=0.10.2,<0.11.0)"
]
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.4"
autopep8 = "^2.3.1"
[project.scripts]
auto-archiver = "auto_archiver.__main__:main"
[project.urls]
homepage = "https://github.com/bellingcat/auto-archiver"
repository = "https://github.com/bellingcat/auto-archiver"
documentation = "https://github.com/bellingcat/auto-archiver"
[tool.pytest.ini_options]
markers = [
"download: marks tests that download content from the network",
]

View File

@@ -1,53 +0,0 @@
[metadata]
name = auto_archiver
version = attr: auto_archiver.version.__version__
author = Bellingcat
author_email = tech@bellingcat.com
description = Easily archive online media content
long_description = file: README.md
long_description_content_type = text/markdown
keywords = archive, oosi, osint, scraping
license = MIT
classifiers =
Intended Audience :: Developers
Intended Audience :: Science/Research
License :: OSI Approved :: MIT License
Programming Language :: Python :: 3
project_urls =
Source Code = https://github.com/bellingcat/auto-archiver
Bug Tracker = https://github.com/bellingcat/auto-archiver/issues
Bellingcat = https://www.bellingcat.com
platforms = any
[options]
setup_requires =
setuptools-pipfile
zip_safe = False
package_dir=
=src
packages=find:
find_packages=true
python_requires = >=3.8
[options.package_data]
* = *.html
[options.entry_points]
console_scripts =
auto-archiver = auto_archiver.__main__:main
# [options.extras_require]
# pdf = ReportLab>=1.2; RXP
# rest = docutils>=0.3; pack ==1.1, ==1.3
[options.packages.find]
where=src
# include=auto_archiver*
# exclude =
# examples*
# .eggs*
# build*
# secrets*
# tmp*
# docs*
# src.tests*

View File

@@ -1,4 +0,0 @@
from setuptools import setup
if __name__ == "__main__":
setup()

View File

@@ -8,4 +8,5 @@ from .tiktok_archiver import TiktokArchiver
from .telegram_archiver import TelegramArchiver
from .vk_archiver import VkArchiver
from .youtubedl_archiver import YoutubeDLArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .bluesky_archiver import BlueskyArchiver

View File

@@ -48,6 +48,8 @@ class Archiver(Step):
"""
downloads a URL to provided filename, or inferred from URL, returns local filename
"""
# TODO: should we refactor to use requests.get(url, stream=True) and write to file in chunks? compare approaches
# TODO: should we guess the extension?
if not to_filename:
to_filename = url.split('/')[-1].split('?')[0]
if len(to_filename) > 64:

View File

@@ -0,0 +1,119 @@
import os
import re, requests, mimetypes
from loguru import logger
from . import Archiver
from ..core import Metadata, Media, ArchivingContext
class BlueskyArchiver(Archiver):
"""
Uses an unauthenticated Bluesky API to archive posts including metadata, images and videos. Relies on `public.api.bsky.app/xrpc` and `bsky.social/xrpc`. Avoids ATProto to avoid auth.
Some inspiration from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/bluesky.py
"""
name = "bluesky_archiver"
BSKY_POST = re.compile(r"/profile/([^/]+)/post/([a-zA-Z0-9]+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.search(self.BSKY_POST, url):
return False
logger.debug(f"Identified a Bluesky post: {url}, archiving...")
result = Metadata()
# fetch post info and update result
post = self._get_post_from_uri(url)
logger.debug(f"Extracted post info: {post['record']['text']}")
result.set_title(post["record"]["text"])
result.set_timestamp(post["record"]["createdAt"])
for k, v in self._get_post_data(post).items():
if v: result.set(k, v)
# download if embeds present (1 video XOR >=1 images)
for media in self._download_bsky_embeds(post):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")
return result.success("bluesky")
def _get_post_from_uri(self, post_uri: str) -> dict:
"""
Calls a public (no auth needed) Bluesky API to get a post from its uri, uses .getPostThread as it brings author info as well (unlike .getPost).
"""
post_match = re.search(self.BSKY_POST, post_uri)
username = post_match.group(1)
post_id = post_match.group(2)
at_uri = f'at://{username}/app.bsky.feed.post/{post_id}'
r = requests.get(f"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread?uri={at_uri}&depth=0&parent_height=0")
r.raise_for_status()
thread = r.json()
assert thread["thread"]["$type"] == "app.bsky.feed.defs#threadViewPost"
return thread["thread"]["post"]
def _download_bsky_embeds(self, post: dict) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
media = []
embed = post.get("record", {}).get("embed", {})
image_medias = embed.get("images", []) + embed.get("media", {}).get("images", [])
video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e]
for image_media in image_medias:
image_media = self._download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"])
media.append(image_media)
for video_media in video_medias:
video_media = self._download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"])
media.append(video_media)
return media
def _download_bsky_file_as_media(self, cid: str, did: str) -> Media:
"""
Uses the Bluesky API to download a file by its `cid` and `did`.
"""
# TODO: replace with self.download_from_url once that function has been cleaned-up
file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}"
response = requests.get(file_url, stream=True)
response.raise_for_status()
ext = mimetypes.guess_extension(response.headers["Content-Type"])
filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}")
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
media = Media(filename=filename)
media.set("src", file_url)
return media
def _get_post_data(self, post: dict) -> dict:
"""
Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links.
"""
author = post["author"]
if "labels" in author and not author["labels"]: del author["labels"]
if "associated" in author: del author["associated"]
mentions, tags, links = [], [], []
facets = post.get("record", {}).get("facets", [])
for f in facets:
for feature in f["features"]:
if feature["$type"] == "app.bsky.richtext.facet#mention":
mentions.append(feature["did"])
elif feature["$type"] == "app.bsky.richtext.facet#tag":
tags.append(feature["tag"])
elif feature["$type"] == "app.bsky.richtext.facet#link":
links.append(feature["uri"])
res = {"author": author}
if mentions: res["mentions"] = mentions
if tags: res["tags"] = tags
if links: res["links"] = links
return res

View File

@@ -22,6 +22,7 @@ class InstagramAPIArchiver(Archiver):
super().__init__(config)
self.assert_valid_string("access_token")
self.assert_valid_string("api_endpoint")
self.full_profile_max_posts = int(self.full_profile_max_posts)
if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1]
self.full_profile = bool(self.full_profile)
@@ -33,6 +34,7 @@ class InstagramAPIArchiver(Archiver):
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {"default": False, "help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information."},
"full_profile_max_posts": {"default": 0, "help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights"},
"minimize_json_output": {"default": True, "help": "if true, will remove empty values from the json output"},
}
@@ -73,9 +75,9 @@ class InstagramAPIArchiver(Archiver):
if type(d) == list: return [self.cleanup_dict(v) for v in d]
if type(d) != dict: return d
return {
k: self.cleanup_dict(v) if type(v) in [dict, list] else v
k: clean_v
for k, v in d.items()
if v not in [0.0, 0, [], {}, "", None, "null"] and
if (clean_v := self.cleanup_dict(v)) not in [0.0, 0, [], {}, "", None, "null"] and
k not in ["x", "y", "width", "height"]
}
@@ -93,9 +95,6 @@ class InstagramAPIArchiver(Archiver):
if self.full_profile:
user_id = user.get("pk")
# download all posts
self.download_all_posts(result, user_id)
# download all stories
try:
stories = self._download_stories_reusable(result, username)
@@ -104,25 +103,46 @@ class InstagramAPIArchiver(Archiver):
result.append("errors", f"Error downloading stories for {username}")
logger.error(f"Error downloading stories for {username}: {e}")
# download all posts
try:
self.download_all_posts(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading posts for {username}")
logger.error(f"Error downloading posts for {username}: {e}")
# download all tagged
try:
self.download_all_tagged(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading tagged posts for {username}")
logger.error(f"Error downloading tagged posts for {username}: {e}")
# download all highlights
try:
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
result.set("#highlights", count_highlights)
self.download_all_highlights(result, username, user_id)
except Exception as e:
result.append("errors", f"Error downloading highlights for {username}")
logger.error(f"Error downloading highlights for {username}: {e}")
result.set_url(url) # reset as scrape_item modifies it
return result.success("insta profile")
def download_all_highlights(self, result, username, user_id):
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts:
logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#highlights", count_highlights)
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
if id:
post = self.call_api(f"v1/media/by/id", {"id": id})
@@ -166,12 +186,13 @@ class InstagramAPIArchiver(Archiver):
def download_stories(self, result: Metadata, username: str) -> Metadata:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
stories = self._download_stories_reusable(result, username)
if stories == []: return result.success("insta no story")
result.set_title(f"stories {username} at {now}").set("#stories", len(stories))
return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]:
stories = self.call_api(f"v1/user/stories/by/username", {"username": username})
assert stories, f"Stories for {username} not found"
if not stories or not len(stories): return []
stories = stories[::-1] # newest to oldest
for s in tqdm(stories, desc="downloading stories", unit="story"):
@@ -188,7 +209,7 @@ class InstagramAPIArchiver(Archiver):
post_count = 0
while end_cursor != "":
posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
if not len(posts): break
if not len(posts) or not type(posts) == list or len(posts) != 2: break
posts, end_cursor = posts[0], posts[1]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
@@ -199,7 +220,35 @@ class InstagramAPIArchiver(Archiver):
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}")
pbar.update(1)
post_count+=1
if self.full_profile_max_posts and post_count >= self.full_profile_max_posts:
logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#posts", post_count)
def download_all_tagged(self, result: Metadata, user_id: str):
next_page_id = ""
pbar = tqdm(desc="downloading tagged posts")
tagged_count = 0
while next_page_id != None:
resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
posts = resp.get("response", {}).get("items", [])
if not len(posts): break
next_page_id = resp.get("next_page_id")
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
for p in posts:
try: self.scrape_item(result, p, "tagged")
except Exception as e:
result.append("errors", f"Error downloading tagged post {p.get('id')}")
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}")
pbar.update(1)
tagged_count+=1
if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts:
logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#tagged", tagged_count)
### reusable parsing utils below
@@ -217,10 +266,10 @@ class InstagramAPIArchiver(Archiver):
if self.minimize_json_output:
del item["clips_metadata"]
if code := item.get("code"):
result.set("url", f"https://www.instagram.com/p/{code}/")
if code := item.get("code") and not result.get("url"):
result.set_url(f"https://www.instagram.com/p/{code}/")
resources = item.get("resources", [])
resources = item.get("resources", item.get("carousel_media", []))
item, media, media_id = self.scrape_media(item, context)
# if resources are present take the main media from the first resource
if not media and len(resources):
@@ -242,7 +291,7 @@ class InstagramAPIArchiver(Archiver):
def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]:
# remove unnecessary info
if self.minimize_json_output:
for k in ["image_versions", "video_versions", "video_dash_manifest"]:
for k in ["image_versions", "video_versions", "video_dash_manifest", "image_versions2", "video_versions2"]:
if k in item: del item[k]
item = self.cleanup_dict(item)
@@ -253,19 +302,24 @@ class InstagramAPIArchiver(Archiver):
# retrieve video info
best_id = item.get('id', item.get('pk'))
taken_at = item.get("taken_at")
taken_at = item.get("taken_at", item.get("taken_at_ts"))
code = item.get("code")
caption_text = item.get("caption_text")
if "carousel_media" in item: del item["carousel_media"]
if video_url := item.get("video_url"):
filename = self.download_from_url(video_url, verbose=False)
video_media = Media(filename=filename)
if taken_at: video_media.set("date", taken_at)
if code: video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text: video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
elif image_media:
if taken_at: image_media.set("date", taken_at)
if code: image_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text: image_media.set("text", caption_text)
image_media.set("data", [item])
return item, image_media, f"{context or 'image'} {best_id}"

View File

@@ -42,7 +42,7 @@ class InstagramTbotArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file
self.session_file = new_session_file.replace(".session", "")
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -54,8 +54,9 @@ class InstagramTbotArchiver(Archiver):
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
if os.path.exists(self.session_file):
os.remove(self.session_file)
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -49,7 +49,7 @@ class TelethonArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"telethon-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file
self.session_file = new_session_file.replace(".session", "")
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -101,8 +101,9 @@ class TelethonArchiver(Archiver):
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
if os.path.exists(self.session_file):
os.remove(self.session_file)
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
"""

View File

@@ -1,7 +1,9 @@
import re, requests, mimetypes, json
import re, requests, mimetypes, json, math
from typing import Union
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify
from . import Archiver
@@ -29,7 +31,7 @@ class TwitterArchiver(Archiver):
# expand URL if t.co and clean tracker GET params
if 'https://t.co/' in url:
try:
r = requests.get(url)
r = requests.get(url, timeout=30)
logger.debug(f'Expanded url {url} to {r.url}')
url = r.url
except:
@@ -43,64 +45,79 @@ class TwitterArchiver(Archiver):
can handle private/public channels
"""
url = item.get_url()
# detect URLs that we definitely cannot handle
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
result = Metadata()
strategies = [self.download_yt_dlp, self.download_syndication]
for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}")
try:
result = strategy(item, url, tweet_id)
if result: return result
except Exception as ex:
logger.error(f"Failed to download {url} with {strategy.__name__}: {type(ex).__name__} occurred. args: {ex.args}")
logger.warning(f"No free strategy worked for {url}")
return False
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return self.download_alternative(item, url, tweet_id)
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
return result
for i, tweet_media in enumerate(tweet.media):
media = Media(filename="")
mimetype = ""
if type(tweet_media) == Video:
variant = max(
[v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate)
media.set("src", variant.url).set("duration", tweet_media.duration)
mimetype = variant.contentType
elif type(tweet_media) == Gif:
variant = tweet_media.variants[0]
media.set("src", variant.url)
mimetype = variant.contentType
elif type(tweet_media) == Photo:
media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl))
mimetype = "image/jpeg"
else:
logger.warning(f"Could not get media URL of {tweet_media}")
continue
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
return result.success("twitter-snscrape")
def download_alternative(self, item: Metadata, url: str, tweet_id: str) -> Metadata:
def generate_token(self, tweet_id: str) -> str:
"""Generates the syndication token for a tweet ID.
Taken from https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
And Vercel's code: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27
"""
Hack alternative working again.
https://stackoverflow.com/a/71867055/6196010 (OUTDATED URL)
# Perform the division and multiplication by π
result = (int(tweet_id) / 1e15) * math.pi
fractional_part = result % 1
# Convert to base 36
base_36 = ''
while result >= 1:
base_36 = "0123456789abcdefghijklmnopqrstuvwxyz"[int(result % 36)] + base_36
result = math.floor(result / 36)
# Append fractional part in base 36
while fractional_part > 0 and len(base_36) < 11: # Limit to avoid infinite loop
fractional_part *= 36
digit = int(fractional_part)
base_36 += "0123456789abcdefghijklmnopqrstuvwxyz"[digit]
fractional_part -= digit
# Remove leading zeros and dots
return base_36.replace('0', '').replace('.', '')
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
"""
Downloads tweets using Twitter's own embed API (Hack).
Background on method can be found at:
https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-1615937362
https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
next to test: https://cdn.embedly.com/widgets/media.html?&schema=twitter&url=https://twitter.com/bellingcat/status/1674700676612386816
"""
logger.debug(f"Trying twitter hack for {url=}")
result = Metadata()
hack_url = "https://cdn.syndication.twimg.com/tweet-result"
params = {
'id': tweet_id,
'token': self.generate_token(tweet_id)
}
hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200: return False
r = requests.get(hack_url, params=params, timeout=10)
if r.status_code != 200 or r.json()=={}:
logger.warning(f"SyndicationHack: Failed to get tweet information from {hack_url}.")
return False
result = Metadata()
tweet = r.json()
if tweet.get('__typename') == 'TweetTombstone':
logger.error(f"Failed to get tweet {tweet_id}: {tweet['tombstone']['text']['text']}")
return False
urls = []
for p in tweet.get("photos", []):
urls.append(p["url"])
@@ -108,9 +125,9 @@ class TwitterArchiver(Archiver):
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", [])))
urls.append(self.choose_variant(v.get("variants", []))['url'])
logger.debug(f"Twitter hack got {urls=}")
logger.debug(f"Twitter hack got media {urls=}")
for i, u in enumerate(urls):
media = Media(filename="")
@@ -122,9 +139,49 @@ class TwitterArchiver(Archiver):
media.filename = self.download_from_url(u, f'{slugify(url)}_{i}{ext}')
result.add_media(media)
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
return result.success("twitter-hack")
return result.success("twitter-syndication")
def download_yt_dlp(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
downloader = YoutubeDL()
tie = TwitterIE(downloader)
tweet = tie._extract_status(tweet_id)
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError(f"Error retreiving post with id {tweet_id}. Are you sure it exists?")
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
return False
result\
.set_title(tweet.get('full_text', ''))\
.set_content(json.dumps(tweet, ensure_ascii=False))\
.set_timestamp(timestamp)
if not tweet.get("entities", {}).get("media"):
logger.debug('No media found, archiving tweet text only')
result.status = "twitter-ytdl"
return result
for i, tw_media in enumerate(tweet["entities"]["media"]):
media = Media(filename="")
mimetype = ""
if tw_media["type"] == "photo":
media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https']))
mimetype = "image/jpeg"
elif tw_media["type"] == "video":
variant = self.choose_variant(tw_media['video_info']['variants'])
media.set("src", variant['url'])
mimetype = variant['content_type']
elif tw_media["type"] == "animated_gif":
variant = tw_media['video_info']['variants'][0]
media.set("src", variant['url'])
mimetype = variant['content_type']
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
result.add_media(media)
return result.success("twitter-ytdl")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
@@ -140,13 +197,13 @@ class TwitterArchiver(Archiver):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var.get("type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
if var.get("content_type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var.get("src", variant)
variant = var
else:
variant = var.get("src") if not variant else variant
variant = var if not variant else variant
return variant

View File

@@ -15,6 +15,8 @@ class YoutubeDLArchiver(Archiver):
self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
@staticmethod
def configs() -> dict:
@@ -26,6 +28,10 @@ class YoutubeDLArchiver(Archiver):
"live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."},
"proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"},
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
"cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"},
"cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"},
}
def download(self, item: Metadata) -> Metadata:
@@ -34,12 +40,21 @@ class YoutubeDLArchiver(Archiver):
if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie:
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
if item.netloc in ['youtube.com', 'www.youtube.com']:
if self.cookies_from_browser:
logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube')
ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,)
elif self.cookie_file:
logger.debug(f'Using cookies from file {self.cookie_file}')
ydl_options['cookiefile'] = self.cookie_file
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': True, 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy}
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
try:
# don'd download since it can be a live stream
# don't download since it can be a live stream
info = ydl.extract_info(url, download=False)
if info.get('is_live', False) and not self.livestreams:
logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting")
@@ -52,7 +67,8 @@ class YoutubeDLArchiver(Archiver):
return False
# this time download
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
info = ydl.extract_info(url, download=True)
if "entries" in info:
@@ -64,13 +80,17 @@ class YoutubeDLArchiver(Archiver):
result = Metadata()
result.set_title(info.get("title"))
if "description" in info: result.set_content(info["description"])
for entry in entries:
try:
filename = ydl.prepare_filename(entry)
if not os.path.exists(filename):
filename = filename.split('.')[0] + '.mkv'
new_media = Media(filename).set("duration", info.get("duration"))
new_media = Media(filename)
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
if x in entry: new_media.set(x, entry[x])
# read text from subtitles if enabled
if self.subtitles:
for lang, val in (info.get('requested_subtitles') or {}).items():
@@ -89,11 +109,12 @@ class YoutubeDLArchiver(Archiver):
result.set("comments", [{
"text": c["text"],
"author": c["author"],
"timestamp": datetime.datetime.utcfromtimestamp(c.get("timestamp")).replace(tzinfo=datetime.timezone.utc)
"timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc)
} for c in info.get("comments", [])])
if (timestamp := info.get("timestamp")):
timestamp = datetime.datetime.utcfromtimestamp(timestamp).replace(tzinfo=datetime.timezone.utc).isoformat()
#TODO: fix deprecated timestamp,
timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if (upload_date := info.get("upload_date")):
upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc)

View File

@@ -25,10 +25,11 @@ class Media:
_mimetype: str = None # eg: image/jpeg
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude
def store(self: Media, override_storages: List = None, url: str = "url-not-available"):
# stores the media into the provided/available storages [Storage]
# repeats the process for its properties, in case they have inner media themselves
# for now it only goes down 1 level but it's easy to make it recursive if needed
def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
# 'Any' typing for metadata to avoid circular imports. Stores the media
# into the provided/available storages [Storage] repeats the process for
# its properties, in case they have inner media themselves for now it
# only goes down 1 level but it's easy to make it recursive if needed.
storages = override_storages or ArchivingContext.get("storages")
if not len(storages):
logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
@@ -36,7 +37,7 @@ class Media:
for s in storages:
for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url)
s.store(any_media, url, metadata=metadata)
def all_inner_media(self, include_self=False):
""" Media can be inside media properties, examples include transformations on original media.

View File

@@ -48,7 +48,7 @@ class Metadata:
self.remove_duplicate_media_by_hash()
storages = override_storages or ArchivingContext.get("storages")
for media in self.media:
media.store(override_storages=storages, url=self.get_url())
media.store(override_storages=storages, url=self.get_url(), metadata=self)
def set(self, key: str, val: Any) -> Metadata:
self.metadata[key] = val

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Generator, Union, List
from urllib.parse import urlparse
from ipaddress import ip_address
from .context import ArchivingContext
@@ -26,7 +28,7 @@ class ArchivingOrchestrator:
ArchivingContext.set("storages", self.storages, keep_on_reset=True)
try:
for a in self.archivers: a.setup()
for a in self.all_archivers_for_setup(): a.setup()
except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}")
self.cleanup()
@@ -34,7 +36,7 @@ class ArchivingOrchestrator:
def cleanup(self)->None:
logger.info("Cleaning up")
for a in self.archivers: a.cleanup()
for a in self.all_archivers_for_setup(): a.cleanup()
def feed(self) -> Generator[Metadata]:
for item in self.feeder:
@@ -60,7 +62,9 @@ class ArchivingOrchestrator:
exit()
except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.databases: d.failed(item)
for d in self.databases:
if type(e) == AssertionError: d.failed(item, str(e))
else: d.failed(item)
def archive(self, result: Metadata) -> Union[Metadata, None]:
@@ -73,7 +77,8 @@ class ArchivingOrchestrator:
5. Store all downloaded/generated media
6. Call selected Formatter and store formatted if needed
"""
original_url = result.get_url()
original_url = result.get_url().strip()
self.assert_valid_url(original_url)
# 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs
url = original_url
@@ -90,7 +95,9 @@ class ArchivingOrchestrator:
if cached_result:
logger.debug("Found previously archived entry")
for d in self.databases:
d.done(cached_result, cached=True)
try: d.done(cached_result, cached=True)
except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return cached_result
# 3 - call archivers until one succeeds
@@ -113,13 +120,39 @@ class ArchivingOrchestrator:
# 6 - format and store formatted if needed
if (final_media := self.formatter.format(result)):
final_media.store(url=url)
final_media.store(url=url, metadata=result)
result.set_final_media(final_media)
if result.is_empty():
result.status = "nothing archived"
# signal completion to databases and archivers
for d in self.databases: d.done(result)
for d in self.databases:
try: d.done(result)
except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return result
def assert_valid_url(self, url: str) -> bool:
"""
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
assert url.startswith("http://") or url.startswith("https://"), f"Invalid URL scheme"
parsed = urlparse(url)
assert parsed.scheme in ["http", "https"], f"Invalid URL scheme"
assert parsed.hostname, f"Invalid URL hostname"
assert parsed.hostname != "localhost", f"Invalid URL"
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError: pass
else:
assert ip.is_global, f"Invalid IP used"
assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
def all_archivers_for_setup(self) -> List[Archiver]:
return self.archivers + [e for e in self.enrichers if isinstance(e, Archiver)]

View File

@@ -2,4 +2,5 @@ from .database import Database
from .gsheet_db import GsheetsDb
from .console_db import ConsoleDb
from .csv_db import CSVDb
from .api_db import AAApiDb
from .api_db import AAApiDb
from .atlos_db import AtlosDb

View File

@@ -0,0 +1,79 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
name = "atlos_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -21,8 +21,8 @@ class ConsoleDb(Database):
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
def failed(self, item: Metadata) -> None:
logger.error(f"FAILED {item}")
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")

View File

@@ -22,7 +22,7 @@ class Database(Step, ABC):
"""signals the DB that the given item archival has started"""
pass
def failed(self, item: Metadata) -> None:
def failed(self, item: Metadata, reason:str) -> None:
"""update DB accordingly for failure"""
pass

View File

@@ -29,9 +29,9 @@ class GsheetsDb(Database):
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress')
def failed(self, item: Metadata) -> None:
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, 'Archive failed')
self._safe_status_update(item, f'Archive failed {reason}')
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
@@ -102,6 +102,11 @@ class GsheetsDb(Database):
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
# TODO: to make gsheet_db less coupled with gsheet_feeder's "gsheet" parameter, this method could 1st try to fetch "gsheet" from ArchivingContext and, if missing, manage its own singleton - not needed for now
gw: GWorksheet = ArchivingContext.get("gsheet").get("worksheet")
row: int = ArchivingContext.get("gsheet").get("row")
if gsheet := ArchivingContext.get("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
print(self.sheet_id)
return gw, row

View File

@@ -14,9 +14,26 @@ class HashEnricher(Enricher):
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
algo_choices = self.configs()["algorithm"]["choices"]
algos = self.configs()["algorithm"]
algo_choices = algos["choices"]
if not getattr(self, 'algorithm', None):
if not config.get('algorithm'):
logger.warning(f"No hash algorithm selected, defaulting to {algos['default']}")
self.algorithm = algos["default"]
else:
self.algorithm = config["algorithm"]
assert self.algorithm in algo_choices, f"Invalid hash algorithm selected, must be one of {algo_choices} (you selected {self.algorithm})."
if not getattr(self, 'chunksize', None):
if config.get('chunksize'):
self.chunksize = config["chunksize"]
else:
self.chunksize = self.configs()["chunksize"]["default"]
self.chunksize = int(self.chunksize)
assert self.chunksize >= -1, "read length must be non-negative or -1"
ArchivingContext.set("hash_enricher.algorithm", self.algorithm, keep_on_reset=True)
@staticmethod

View File

@@ -1,5 +1,7 @@
from loguru import logger
import time, os
import base64
from selenium.common.exceptions import TimeoutException
@@ -18,22 +20,31 @@ class ScreenshotEnricher(Enricher):
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug(f"[SKIP] SCREENSHOT since url is behind AUTH WALL: {url=}")
return
logger.debug(f"Enriching screenshot for {url=}")
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy) as driver:
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy, print_options=self.print_options) as driver:
try:
driver.get(url)
time.sleep(int(self.sleep_before_screenshot))
screenshot_file = os.path.join(ArchivingContext.get_tmp_dir(), f"screenshot_{random_str(8)}.png")
driver.save_screenshot(screenshot_file)
to_enrich.add_media(Media(filename=screenshot_file), id="screenshot")
if self.save_to_pdf:
pdf_file = os.path.join(ArchivingContext.get_tmp_dir(), f"pdf_{random_str(8)}.pdf")
pdf = driver.print_page(driver.print_options)
with open(pdf_file, "wb") as f:
f.write(base64.b64decode(pdf))
to_enrich.add_media(Media(filename=pdf_file), id="pdf")
except TimeoutException:
logger.info("TimeoutException loading page for screenshot")
except Exception as e:

View File

@@ -27,7 +27,10 @@ class SSLEnricher(Enricher):
if not to_enrich.media and self.skip_when_nothing_archived: return
url = to_enrich.get_url()
domain = urlparse(url).netloc
parsed = urlparse(url)
assert parsed.scheme in ["https"], f"Invalid URL scheme {url=}"
domain = parsed.netloc
logger.debug(f"fetching SSL certificate for {domain=} in {url=}")
cert = ssl.get_server_certificate((domain, 443))

View File

@@ -34,7 +34,24 @@ class WaczArchiverEnricher(Enricher, Archiver):
"extract_screenshot": {"default": True, "help": "If enabled the screenshot captured by browsertrix will be extracted into separate Media and appear in the html report. The .wacz file will be kept untouched."},
"socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"},
"socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
"proxy_server": {"default": None, "help": "SOCKS server proxy URL, in development"},
}
def setup(self) -> None:
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
self.browsertrix_home_host = os.environ.get('BROWSERTRIX_HOME_HOST')
self.browsertrix_home_container = os.environ.get('BROWSERTRIX_HOME_CONTAINER') or self.browsertrix_home_host
# create crawls folder if not exists, so it can be safely removed in cleanup
if self.docker_in_docker:
os.makedirs(self.cwd_dind, exist_ok=True)
def cleanup(self) -> None:
if self.docker_in_docker:
logger.debug(f"Removing {self.cwd_dind=}")
shutil.rmtree(self.cwd_dind, ignore_errors=True)
def download(self, item: Metadata) -> Metadata:
# this new Metadata object is required to avoid duplication
@@ -51,27 +68,30 @@ class WaczArchiverEnricher(Enricher, Archiver):
url = to_enrich.get_url()
collection = random_str(8)
browsertrix_home_host = os.environ.get('BROWSERTRIX_HOME_HOST') or os.path.abspath(ArchivingContext.get_tmp_dir())
browsertrix_home_container = os.environ.get('BROWSERTRIX_HOME_CONTAINER') or browsertrix_home_host
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(ArchivingContext.get_tmp_dir())
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
cmd = [
"crawl",
"--url", url,
"--scopeType", "page",
"--generateWACZ",
"--text",
"--text", "to-pages",
"--screenshot", "fullPage",
"--collection", collection,
"--id", collection,
"--saveState", "never",
"--behaviors", "autoscroll,autoplay,autofetch,siteSpecific",
"--behaviorTimeout", str(self.timeout),
"--timeout", str(self.timeout)]
"--timeout", str(self.timeout),
"--blockAds" # TODO: test
]
if self.docker_in_docker:
cmd.extend(["--cwd", self.cwd_dind])
# call docker if explicitly enabled or we are running on the host (not in docker)
use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
if use_docker:
if self.use_docker:
logger.debug(f"generating WACZ in Docker for {url=}")
logger.debug(f"{browsertrix_home_host=} {browsertrix_home_container=}")
if self.docker_commands:
@@ -93,9 +113,12 @@ class WaczArchiverEnricher(Enricher, Archiver):
try:
logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}")
if self.socks_proxy_host and self.socks_proxy_port:
my_env = os.environ.copy()
if self.proxy_server:
logger.debug("Using PROXY_SERVER proxy for browsertrix-crawler")
my_env["PROXY_SERVER"] = self.proxy_server
elif self.socks_proxy_host and self.socks_proxy_port:
logger.debug("Using SOCKS proxy for browsertrix-crawler")
my_env = os.environ.copy()
my_env["SOCKS_HOST"] = self.socks_proxy_host
my_env["SOCKS_PORT"] = str(self.socks_proxy_port)
subprocess.run(cmd, check=True, env=my_env)
@@ -103,7 +126,10 @@ class WaczArchiverEnricher(Enricher, Archiver):
logger.error(f"WACZ generation failed: {e}")
return False
if use_docker:
if self.docker_in_docker:
wacz_fn = os.path.join(self.cwd_dind, "collections", collection, f"{collection}.wacz")
elif self.use_docker:
wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz")
else:
wacz_fn = os.path.join("collections", collection, f"{collection}.wacz")
@@ -116,7 +142,9 @@ class WaczArchiverEnricher(Enricher, Archiver):
if self.extract_media or self.extract_screenshot:
self.extract_media_from_wacz(to_enrich, wacz_fn)
if use_docker:
if self.docker_in_docker:
jsonl_fn = os.path.join(self.cwd_dind, "collections", collection, "pages", "pages.jsonl")
elif self.use_docker:
jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl")
else:
jsonl_fn = os.path.join("collections", collection, "pages", "pages.jsonl")
@@ -139,7 +167,7 @@ class WaczArchiverEnricher(Enricher, Archiver):
"""
Receives a .wacz archive, and extracts all relevant media from it, adding them to to_enrich.
"""
logger.info(f"WACZ extract_media flag is set, extracting media from {wacz_filename=}")
logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}")
# unzipping the .wacz
tmp_dir = ArchivingContext.get_tmp_dir()
@@ -160,10 +188,11 @@ class WaczArchiverEnricher(Enricher, Archiver):
# get media out of .warc
counter = 0
seen_urls = set()
import json
with open(warc_filename, 'rb') as warc_stream:
for record in ArchiveIterator(warc_stream):
# only include fetched resources
if record.rec_type == "resource" and self.extract_screenshot: # screenshots
if record.rec_type == "resource" and record.content_type == "image/png" and self.extract_screenshot: # screenshots
fn = os.path.join(tmp_dir, f"warc-file-{counter}.png")
with open(fn, "wb") as outf: outf.write(record.raw_stream.read())
m = Media(filename=fn)
@@ -209,4 +238,4 @@ class WaczArchiverEnricher(Enricher, Archiver):
to_enrich.add_media(m, warc_fn)
counter += 1
seen_urls.add(record_url)
logger.info(f"WACZ extract_media finished, found {counter} relevant media file(s)")
logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")

View File

@@ -1,3 +1,4 @@
import json
from loguru import logger
import time, requests
@@ -70,11 +71,16 @@ class WaybackArchiverEnricher(Enricher, Archiver):
return False
# check job status
job_id = r.json().get('job_id')
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
try:
job_id = r.json().get('job_id')
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
return False
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON with job_id from Wayback and got {r.text}")
return False
# waits at most timeout seconds until job is completed, otherwise only enriches the job_id information
start_time = time.time()
wayback_url = False
@@ -92,6 +98,9 @@ class WaybackArchiverEnricher(Enricher, Archiver):
except requests.exceptions.RequestException as e:
logger.warning(f"RequestException: fetching status for {url=} due to: {e}")
break
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON from Wayback and got {r.text} for {url=}")
break
except Exception as e:
logger.warning(f"error fetching status for {url=} due to: {e}")
if not wayback_url:

View File

@@ -44,7 +44,7 @@ class WhisperEnricher(Enricher):
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
m.store(url=url)
m.store(url=url, metadata=to_enrich)
try:
job_id = self.submit_job(m)
job_results[job_id] = False

View File

@@ -1,3 +1,4 @@
from.feeder import Feeder
from .gsheet_feeder import GsheetsFeeder
from .cli_feeder import CLIFeeder
from .cli_feeder import CLIFeeder
from .atlos_feeder import AtlosFeeder

View File

@@ -0,0 +1,56 @@
from loguru import logger
import requests
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options
class AtlosFeeder(Feeder):
name = "atlos_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
logger.success(f"Processed {count} URL(s)")

View File

@@ -21,7 +21,7 @@ class HtmlFormatter(Formatter):
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")))
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")), autoescape=True)
# JinjaHelper class static methods are added as filters
self.environment.filters.update({
k: v.__func__ for k, v in JinjaHelpers.__dict__.items() if isinstance(v, staticmethod)

View File

@@ -286,11 +286,11 @@
// logic for enabled/disabled greyscale
// Get references to the checkboxes and images/videos
const safeImageViewCheckbox = document.getElementById('safe-media-view');
const imagesVideos = document.querySelectorAll('img, video');
const visualPreviews = document.querySelectorAll('img, video,embed');
// Function to toggle grayscale effect
function toggleGrayscale() {
imagesVideos.forEach(element => {
visualPreviews.forEach(element => {
if (safeImageViewCheckbox.checked) {
// Enable grayscale effect
element.style.filter = 'grayscale(1)';
@@ -307,7 +307,7 @@
safeImageViewCheckbox.addEventListener('change', toggleGrayscale);
// Handle the hover effect using JavaScript
imagesVideos.forEach(element => {
visualPreviews.forEach(element => {
element.addEventListener('mouseenter', () => {
// Disable grayscale effect on hover
element.style.filter = 'none';

View File

@@ -3,7 +3,7 @@
{% for url in m.urls %}
{% if url | length == 0 %}
No URL available for {{ m.key }}.
{% elif 'http' in url %}
{% elif 'http://' in url or 'https://' in url or url.startswith('/') %}
{% if 'image' in m.mimetype %}
<div>
<a href="{{ url }}">
@@ -32,6 +32,10 @@ No URL available for {{ m.key }}.
Your browser does not support the video element.
</video>
</div>
{% elif 'application/pdf' in m.mimetype %}
<div>
<embed src="{{ url }}" width="100%" height="400px"/>
</div>
{% elif 'audio' in m.mimetype %}
<div>
<audio controls>

View File

@@ -1,4 +1,5 @@
from .storage import Storage
from .s3 import S3Storage
from .local import LocalStorage
from .gd import GDriveStorage
from .gd import GDriveStorage
from .atlos import AtlosStorage

View File

@@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options
class AtlosStorage(Storage):
name = "atlos_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -1,12 +1,12 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import IO
from typing import IO, Optional
import os
from ..utils.misc import random_str
from ..core import Media, Step, ArchivingContext
from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher
from loguru import logger
from slugify import slugify
@@ -43,12 +43,12 @@ class Storage(Step):
# only for typing...
return Step.init(name, config, Storage)
def store(self, media: Media, url: str) -> None:
def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored():
logger.debug(f"{media.key} already stored, skipping")
return
self.set_key(media, url)
self.upload(media)
self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media))
@abstractmethod

View File

@@ -3,4 +3,5 @@ from .gworksheet import GWorksheet
from .misc import *
from .webdriver import Webdriver
from .gsheet import Gsheets
from .url import UrlUtil
from .url import UrlUtil
from .atlos import get_atlos_config_options

View File

@@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

View File

@@ -2,18 +2,24 @@ from __future__ import annotations
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.print_page_options import PrintOptions
from loguru import logger
from selenium.webdriver.common.by import By
import time
class Webdriver:
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "") -> webdriver:
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "", print_options: dict = {}) -> webdriver:
self.width = width
self.height = height
self.timeout_seconds = timeout_seconds
self.facebook_accept_cookies = facebook_accept_cookies
self.http_proxy = http_proxy
# create and set print options
self.print_options = PrintOptions()
for k, v in print_options.items():
setattr(self.print_options, k, v)
def __enter__(self) -> webdriver:
options = webdriver.FirefoxOptions()
@@ -24,6 +30,7 @@ class Webdriver:
self.driver = webdriver.Firefox(options=options)
self.driver.set_window_size(self.width, self.height)
self.driver.set_page_load_timeout(self.timeout_seconds)
self.driver.print_options = self.print_options
except TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")

View File

@@ -1,12 +1,12 @@
""" Version information for the auto_archiver package.
TODO: This is a placeholder to replicate previous versioning.
"""
from importlib.metadata import version as get_version
VERSION_SHORT = get_version("auto_archiver")
_MAJOR = "0"
_MINOR = "9"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "2"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
VERSION_SHORT = "{0}.{1}".format(_MAJOR, _MINOR)
__version__ = "{0}.{1}.{2}{3}".format(_MAJOR, _MINOR, _PATCH, _SUFFIX)
__version__ = f"{VERSION_SHORT}{_SUFFIX}"

6
tests/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
import tempfile
from auto_archiver.core.context import ArchivingContext
ArchivingContext.reset(full_reset=True)
ArchivingContext.set_tmp_dir(tempfile.gettempdir())

View File

View File

@@ -0,0 +1,27 @@
import pytest
from auto_archiver.core import Metadata
from auto_archiver.core import Step
from auto_archiver.core.metadata import Metadata
class TestArchiverBase(object):
archiver_class = None
config = None
@pytest.fixture(autouse=True)
def setup_archiver(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
assert test_response is not False
if not status:
assert test_response.is_success()
else:
assert status == test_response.status
assert title == test_response.get_title()
assert timestamp, test_response.get("timestamp")

View File

@@ -0,0 +1,73 @@
import pytest
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
from .test_archiver_base import TestArchiverBase
class TestBlueskyArchiver(TestArchiverBase):
"""Tests Bluesky Archiver
Note that these tests will download API responses from the bluesky API, so they may be slow.
This is an intended feature, as we want to test to ensure the bluesky API format hasn't changed,
and also test the archiver's ability to download media.
"""
archiver_class = BlueskyArchiver
config = {}
@pytest.mark.download
def test_download_media_with_images(self):
# url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y
post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
# just make sure bsky haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
assert "record" in post
assert "embed" in post["record"]
assert "media" in post["record"]["embed"]
assert "images" in post["record"]["embed"]["media"]
assert len(post["record"]["embed"]["media"]["images"]) == 2
# try downloading the media files
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 2
# check the IDs
assert "bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src')
assert "bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src')
@pytest.mark.download
def test_download_post_with_single_image(self):
# url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
# just make sure bsky haven't changed their format, images should be under "record/embed/images"
# there should be 1 image
assert "record" in post
assert "embed" in post["record"]
assert "images" in post["record"]["embed"]
assert len(post["record"]["embed"]["images"]) == 1
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
assert "bafkreihljdtomy4yulx4nfxuqdatlgvdg45vxdmjzzhclsd4ludk7zfma4" in media[0].get('src')
@pytest.mark.download
def test_download_post_with_video(self):
# url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
# just make sure bsky haven't changed their format, video should be under "record/embed/video"
assert "record" in post
assert "embed" in post["record"]
assert "video" in post["record"]["embed"]
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
assert "bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src')

View File

@@ -0,0 +1,140 @@
import datetime
import pytest
from auto_archiver.archivers.twitter_archiver import TwitterArchiver
from .test_archiver_base import TestArchiverBase
class TestTwitterArchiver(TestArchiverBase):
archiver_class = TwitterArchiver
config = {}
@pytest.mark.parametrize("url, expected", [
("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL
("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839"), # strip tracking params
("https://www.bellingcat.com/category/resources/", "https://www.bellingcat.com/category/resources/"), # non-twitter/x urls unchanged
("https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # shouldn't strip params from non-twitter/x URLs
])
def test_sanitize_url(self, url, expected):
assert expected == self.archiver.sanitize_url(url)
@pytest.mark.parametrize("url, exptected_username, exptected_tweetid", [
("https://twitter.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
("https://www.bellingcat.com/category/resources/", False, False)
])
def test_get_username_tweet_id_from_url(self, url, exptected_username, exptected_tweetid):
username, tweet_id = self.archiver.get_username_tweet_id(url)
assert exptected_username == username
assert exptected_tweetid == tweet_id
def test_choose_variants(self):
# taken from the response for url https://x.com/bellingcat/status/1871552600346415571
variant_list = [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'},
{'bitrate': 256000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/480x270/OqZIrKV0LFswMvxS.mp4?tag=12'},
{'bitrate': 832000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'},
{'bitrate': 2176000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12'}
]
chosen_variant = self.archiver.choose_variant(variant_list)
assert chosen_variant == variant_list[3]
@pytest.mark.parametrize("tweet_id, expected_token", [
("1874097816571961839", "4jjngwkifa"),
("1674700676612386816", "42586mwa3uv"),
("1877747914073620506", "4jv4aahw36n"),
("1876710769913450647", "4jruzjz5lux"),
("1346554693649113090", "39ibqxei7mo")
])
def test_reverse_engineer_token(self, tweet_id, expected_token):
# see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2
# and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
generated_token = self.archiver.generate_token(tweet_id)
assert expected_token == generated_token
@pytest.mark.download
def test_youtube_dlp_archiver(self, make_item):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_yt_dlp(make_item(url), url, "1874097816571961839")
assert post
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc),
"twitter-ytdl"
)
@pytest.mark.download
def test_syndication_archiver(self, make_item):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_syndication(make_item(url), url, "1874097816571961839")
assert post
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)
)
@pytest.mark.download
def test_download_nonexistend_tweet(self, make_item):
# this tweet does not exist
url = "https://x.com/Bellingcat/status/17197025860711058"
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.download
def test_download_malformed_tweetid(self, make_item):
# this tweet does not exist
url = "https://x.com/Bellingcat/status/1719702586071100058"
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.download
def test_download_tweet_no_media(self, make_item):
item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
post = self.archiver.download(item)
self.assertValidResponseMetadata(
post,
"Onion rings are just vegetable donuts.",
datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc),
"twitter-ytdl"
)
@pytest.mark.download
def test_download_video(self, make_item):
url = "https://x.com/bellingcat/status/1871552600346415571"
post = self.archiver.download(make_item(url))
self.assertValidResponseMetadata(
post,
"This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services https://t.co/SfBUq0hSD0 https://t.co/rIHx0WlKp8",
datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc)
)
@pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented")
@pytest.mark.download
@pytest.mark.parametrize("url, title, timestamp, image_hash", [
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
])
def test_download_sensitive_media(self, url, title, timestamp, image_hash, make_item):
"""Download tweets with sensitive media"""
post = self.archiver.download(make_item(url))
self.assertValidResponseMetadata(
post,
title,
timestamp
)
assert len(post.media) == 1
assert post.media[0].hash == image_hash

12
tests/conftest.py Normal file
View File

@@ -0,0 +1,12 @@
import pytest
from auto_archiver.core.metadata import Metadata
@pytest.fixture
def make_item():
def _make_item(url: str, **kwargs) -> Metadata:
item = Metadata().set_url(url)
for key, value in kwargs.items():
item.set(key, value)
return item
return _make_item

View File

@@ -0,0 +1 @@
test1

View File

@@ -0,0 +1 @@
test2

View File

View File

@@ -0,0 +1,22 @@
from auto_archiver.databases.csv_db import CSVDb
from auto_archiver.core import Metadata
def test_store_item(tmp_path):
"""Tests storing an item in the CSV database"""
temp_db = tmp_path / "temp_db.csv"
db = CSVDb({
"csv_db": {"csv_file": temp_db.as_posix()}
})
item = Metadata().set_url("http://example.com").set_title("Example").set_content("Example content").success("my-archiver")
db.done(item)
with open(temp_db, "r", encoding="utf-8") as f:
assert f.read().strip() == f"status,metadata,media\nmy-archiver: success,\"{{'_processed_at': {repr(item.get('_processed_at'))}, 'url': 'http://example.com', 'title': 'Example', 'content': 'Example content'}}\",[]"
# TODO: csv db doesn't have a fetch method - need to add it (?)
# assert db.fetch(item) == item

View File

View File

@@ -0,0 +1,55 @@
import pytest
from auto_archiver.enrichers.hash_enricher import HashEnricher
from auto_archiver.core import Metadata, Media
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
("SHA-256", "tests/data/testfile_2.txt", "60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"),
("SHA3-512", "tests/data/testfile_1.txt", "d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e"),
("SHA3-512", "tests/data/testfile_2.txt", "e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6")
])
def test_calculate_hash(algorithm, filename, expected_hash):
# test SHA-256
he = HashEnricher({"algorithm": algorithm, "chunksize": 1})
assert he.calculate_hash(filename) == expected_hash
def test_default_config_values():
he = HashEnricher(config={})
assert he.algorithm == "SHA-256"
assert he.chunksize == 16000000
def test_invalid_chunksize():
with pytest.raises(AssertionError):
he = HashEnricher({"chunksize": "-100"})
def test_invalid_algorithm():
with pytest.raises(AssertionError):
HashEnricher({"algorithm": "SHA-123"})
def test_config():
# test default config
c = HashEnricher.configs()
assert c["algorithm"]["default"] == "SHA-256"
assert c["chunksize"]["default"] == 16000000
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
assert c["algorithm"]["help"] == "hash algorithm to use"
assert c["chunksize"]["help"] == "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
def test_hash_media():
he = HashEnricher({"algorithm": "SHA-256", "chunksize": 1})
# generate metadata with two test files
m = Metadata().set_url("https://example.com")
# noop - the metadata has no media. Shouldn't fail
he.enrich(m)
m.add_media(Media("tests/data/testfile_1.txt"))
m.add_media(Media("tests/data/testfile_2.txt"))
he.enrich(m)
assert m.media[0].get("hash") == "SHA-256:1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"
assert m.media[1].get("hash") == "SHA-256:60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"

View File

View File

@@ -0,0 +1,17 @@
from auto_archiver.core.context import ArchivingContext
from auto_archiver.formatters.html_formatter import HtmlFormatter
from auto_archiver.core import Metadata, Media
def test_format():
formatter = HtmlFormatter({})
metadata = Metadata().set("content", "Hello, world!").set_url('https://example.com')
final_media = formatter.format(metadata)
assert isinstance(final_media, Media)
assert ".html" in final_media.filename
with open (final_media.filename, "r", encoding="utf-8") as f:
content = f.read()
assert "Hello, world!" in content
assert final_media.mimetype == "text/html"
assert "SHA-256:" in final_media.get('hash')