Compare commits

...

39 Commits

Author SHA1 Message Date
Miguel Sozinho Ramalho
c640cc898a Merge pull request #385 from bellingcat/dev
1.2.0 dependencies, small bugs, 1st time contributors
2026-01-08 15:55:40 +00:00
msramalho
3e2c0b564b wiki fix 2026-01-08 15:49:42 +00:00
msramalho
5fd23baa55 this is ruff 2026-01-08 15:48:08 +00:00
msramalho
8a450310c7 version bump for new release 2026-01-08 15:41:27 +00:00
msramalho
bef8a14089 pyperclip version bump closes #339 2026-01-08 15:40:17 +00:00
msramalho
cd0b093e7a browsertrix-crawler to 1.9.2 see #383 2026-01-08 15:33:40 +00:00
msramalho
096c9d09ef fix for unexpected types for json.dump 2026-01-08 15:18:19 +00:00
Miguel Sozinho Ramalho
df3521e9ca Merge pull request #377 from m4cd4r4/fix/improve-deleted-post-detection
Fix #335: Add comprehensive deletion detection for removed/unavailable content
2026-01-08 15:06:21 +00:00
msramalho
a89d0193e4 removes patch file 2026-01-08 15:02:00 +00:00
msramalho
536cbd905f puts tests file in correct directory 2026-01-08 14:55:40 +00:00
msramalho
a936921c4e updates new utils file and test 2026-01-08 14:54:06 +00:00
Miguel Sozinho Ramalho
68f672a4fa Merge branch 'dev' into fix/improve-deleted-post-detection 2026-01-08 14:36:17 +00:00
Miguel Sozinho Ramalho
4ee0ad1cf8 Merge pull request #359 from mjgaughan/specify-medatada-feature
implementing default metadata omission/user metadata selection
2026-01-08 14:34:50 +00:00
msramalho
bac809451c expands tests to included non predefined metadata keys 2026-01-08 14:33:16 +00:00
msramalho
53dc9904ce refactorws PR to obey standard code approach 2026-01-08 14:30:26 +00:00
Miguel Sozinho Ramalho
c1f312d42a Merge branch 'dev' into specify-medatada-feature 2026-01-08 14:04:42 +00:00
msramalho
23c9dfe717 updating dependencies 2026-01-08 13:53:44 +00:00
m4cd4r4
d02e7e0f02 Add comprehensive deletion detection for removed/unavailable content
Implements issue #335: improve detection of deleted/missing posts

## Changes

### New Deletion Detection System
- Created `deletion_detection.py` utility module with platform-specific
  indicators for Twitter, Facebook, Instagram, TikTok, YouTube, Reddit,
  VK, and Telegram
- Detects deletion via HTML content, page titles, error messages, and
  video metadata
- Stores detailed deletion context (indicator, source, platform) in
  metadata for investigators

### Integration Points
- **Antibot Extractor**: Checks HTML and page titles after page load;
  resolves TODO about detecting deleted videos
- **Generic Extractor**: Checks yt-dlp video data and error messages
  for deletion indicators
- **Twitter Dropin**: Enhanced detection when user/created_at fields
  are missing

### Test Coverage
- Comprehensive test suite covering all platforms
- Tests for HTML, title, error message, and metadata detection
- Validates that normal content is not falsely flagged

## Impact for Conflict Documentation

This fix is critical for evidence preservation in war-torn regions:
- Investigators can now document that evidence existed but was deleted
- Prevents wasted archival attempts on deleted content
- Tracks patterns of content removal
- Preserves metadata about what was deleted and when

Twitter example: Detects "Hmm...this page doesn't exist. Try searching
for something else" and flags content as deleted_or_unavailable.
2025-12-17 18:40:58 +08:00
Miguel Sozinho Ramalho
56526a9ac7 Merge pull request #365 from bellingcat/dev
Facebook reels fix
2025-10-23 10:40:43 +01:00
msramalho
3a22cc28c0 skip tiktok antibot test in CI 2025-10-23 10:17:14 +01:00
msramalho
dbb3dfa04f fixes wikipedia test 2025-10-23 10:04:44 +01:00
msramalho
01bdb35f5d version bump 2025-10-23 09:51:31 +01:00
msramalho
43cbc6ac56 generic extractor improvements 2025-10-23 09:51:14 +01:00
msramalho
9c7cab1ae2 dependencies update 2025-10-22 21:07:12 +01:00
msramalho
a9a0bae083 dependencies update 2025-10-22 18:11:36 +01:00
Miguel Sozinho Ramalho
97d133ce79 Merge pull request #357 from bellingcat/dev
small improvements on tiktok and verison bumps
2025-10-22 16:02:26 +01:00
msramalho
432ee3dcfd version bump 2025-10-22 15:50:50 +01:00
mgaughan
94e0803fb3 implementing default metadata omission/user metadata selection 2025-09-22 20:16:40 -05:00
msramalho
794b4f6052 Merge branch 'dev' of https://github.com/bellingcat/auto-archiver into dev 2025-09-11 15:06:27 +01:00
msramalho
965d7d41dd dependency updates 2025-09-11 15:06:25 +01:00
Miguel Sozinho Ramalho
e73faa70cc Merge pull request #352 from mjgaughan/developer-documentation-updates
updating the style-checking code in the documentation
2025-08-11 10:42:53 +01:00
mgaughan
80beab9f23 ruff-fix -> ruff-clean; there is no ruff-fix in the Makefile. Maybe the command /should/ be ruff-fix to align with the underlying ruff command; for later discussion. This at least reconciles the documentation to the Makefile 2025-08-05 21:36:32 -04:00
Miguel Sozinho Ramalho
200cea4e12 Merge pull request #345 from mjgaughan/main
Correction of small documentation typos
2025-07-29 09:36:10 +01:00
mgaughan
1256fde159 updating location of .env.test.example in documentation 2025-07-23 13:04:48 -04:00
mgaughan
65e222e177 fixing typo in documentation pytest -> poetry 2025-07-22 17:20:59 -04:00
mgaughan
f2eb9ef784 correcting to double-dash in the poetry install documentation 2025-07-21 17:55:48 -04:00
msramalho
2081c16555 embed retry into timestamping 2025-07-10 14:49:53 +01:00
msramalho
d3efd7121c avoid empty metadata comments 2025-07-06 14:05:17 +01:00
msramalho
9d3cd5774b an improved approach for #295 2025-07-06 14:04:01 +01:00
22 changed files with 2048 additions and 1316 deletions

View File

@@ -1,4 +1,4 @@
FROM webrecorder/browsertrix-crawler:1.6.3 AS base
FROM webrecorder/browsertrix-crawler:1.9.2 AS base
ENV RUNNING_IN_DOCKER=1 \
LANG=C.UTF-8 \

View File

@@ -21,7 +21,7 @@ This allows you to run the auto-archiver without the `poetry run` prefix.
### Optional Development Packages
Install development packages (used for unit tests etc.) using:
`poetry install -with dev`
`poetry install --with dev`
```{toctree}
@@ -33,4 +33,4 @@ docs
release
settings_page
style_guide
```
```

View File

@@ -50,7 +50,7 @@ Note not all warnings can be fixed automatically.
Most fixes are safe, but some non-standard practices such as dynamic loading are not picked up by linters. Ensure you check any modifications by this before committing them.
```shell
make ruff-fix
make ruff-clean
```
**Changing Configurations ⚙️**
@@ -67,4 +67,4 @@ One example is to extend the selected rules for linting the `pyproject.toml` fil
extend-select = ["B"]
```
Then re-run the `make ruff-check` command to see the new rules in action.
Then re-run the `make ruff-check` command to see the new rules in action.

View File

@@ -8,7 +8,7 @@
## Running Tests
1. Make sure you've installed the dev dependencies with `pytest install --with dev`
1. Make sure you've installed the dev dependencies with `poetry install --with dev`
2. Tests can be run as follows:
```{code} bash
#### Command prefix of 'poetry run' removed here for simplicity
@@ -26,7 +26,7 @@ pytest -ra -v tests/test_file.py
pytest -ra -v tests/test_file.py::test_function_name
```
3. Some tests require environment variables to be set. You can use the example `.env.test.example` file as a template. Copy it to `.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
3. Some tests require environment variables to be set. You can use the example `tests/.env.test.example` file as a template. Copy it to `tests/.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
```{code} bash
cp .env.test.example .env.test
```
cp tests/.env.test.example tests/.env.test
```

2542
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.1.2"
version = "1.2.0"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"
@@ -50,15 +50,15 @@ dependencies = [
"retrying (>=0.0.0)",
"rich-argparse (>=1.6.0,<2.0.0)",
"ruamel-yaml (>=0.18.10,<0.19.0)",
"rfc3161-client (==1.0.3)",
"cryptography (>44.0.1,<45.0.0)",
"rfc3161-client (>=1.0.5)",
"cryptography (>=46.0.3)",
"opentimestamps (>=0.4.5,<0.5.0)",
"bgutil-ytdlp-pot-provider (>=1.0.0)",
"yt-dlp[curl-cffi,default] (>=2025.5.22,<2026.0.0)",
"secretstorage (>=3.3.3,<4.0.0)",
"seleniumbase (>=4.36.4,<5.0.0)",
"pyautogui (>=0.9.54,<0.10.0)",
"pyperclip (==1.8.2)",
"pyperclip (>=1.9.0)",
]
[tool.poetry.group.dev.dependencies]

View File

@@ -16,6 +16,7 @@ from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
from auto_archiver.modules.antibot_extractor_enricher.dropins.default import DefaultDropin
from auto_archiver.utils.misc import random_str
from auto_archiver.utils.url import is_relevant_url
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
class AntibotExtractorEnricher(Extractor, Enricher):
@@ -97,7 +98,16 @@ class AntibotExtractorEnricher(Extractor, Enricher):
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
dropin = self._get_suitable_dropin(url, sb)
dropin.open_page(url)
if not dropin.open_page(url):
# Check for deletion indicators
page_title = sb.get_title()
html_source = sb.get_page_source()
deletion_info = detect_deletion(html_content=html_source, page_title=page_title, url=url)
if deletion_info:
flag_as_deleted(to_enrich, deletion_info)
return to_enrich
logger.warning("Failed to open drop-in page (not detected as deleted)")
return False
if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)):
logger.warning("Skipping since auth wall or CAPTCHA was detected")
@@ -106,7 +116,15 @@ class AntibotExtractorEnricher(Extractor, Enricher):
sb.wait_for_ready_state_complete()
sb.sleep(1) # margin for the page to load completely
to_enrich.set_title(sb.get_title())
page_title = sb.get_title()
html_source = sb.get_page_source()
# Check if the page indicates content was deleted
deletion_info = detect_deletion(html_content=html_source, page_title=page_title, url=url)
if deletion_info:
flag_as_deleted(to_enrich, deletion_info)
to_enrich.set_title(page_title)
self._enrich_html_source_code(sb, to_enrich)
self._enrich_full_page_screenshot(sb, to_enrich)

View File

@@ -0,0 +1 @@
*.py

View File

@@ -1,17 +1,20 @@
from contextlib import suppress
from typing import Mapping
from auto_archiver.utils.custom_logger import logger
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
class TikTokDropin(Dropin):
"""
A class to handle TikTok drop-in functionality for the antibot extractor enricher module.
"""
def documentation() -> Mapping[str, str]:
return {
"name": "TikTok Dropin",
"description": "Handles TikTok posts and works without authentication.",
"description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.",
"site": "tiktok.com",
}
@@ -33,6 +36,9 @@ class TikTokDropin(Dropin):
# TODO: implement login logic
if url != self.sb.get_current_url():
return False
if self.sb.is_text_visible("Video currently unavailable"):
logger.debug("Video may have been removed or is private.")
return False
return True
def hit_auth_wall(self) -> bool:

View File

@@ -4,6 +4,7 @@ import datetime
import os
import importlib
import subprocess
import traceback
import zipfile
from typing import Generator, Type
@@ -20,6 +21,7 @@ from auto_archiver.core.extractor import Extractor
from auto_archiver.core import Metadata, Media
from auto_archiver.utils import get_datetime_from_str
from auto_archiver.utils.misc import ydl_entry_to_filename
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
from .dropin import GenericDropin
@@ -305,9 +307,9 @@ class GenericExtractor(Extractor):
result.set_url(url)
if "description" in video_data and not result.get("content"):
result.set_content(video_data.get("description"))
result.set_content(video_data.pop("description"))
# extract comments if enabled
if self.comments and video_data.get("comments", []) is not None:
if self.comments and video_data.get("comments", None) is not None:
result.set(
"comments",
[
@@ -406,9 +408,9 @@ class GenericExtractor(Extractor):
logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}")
result.add_media(new_media)
except Exception as e:
logger.error(f"Error processing entry {entry}: {e}")
logger.error(f"Error processing entry {str(entry)[:256]}: {e} {traceback.format_exc()}")
if not len(result.media):
logger.info(f"No media found for entry {entry}, skipping.")
logger.info(f"No media found for entry {str(entry)[:256]}, skipping.")
return False
return self.add_metadata(data, info_extractor, url, result)
@@ -483,6 +485,13 @@ class GenericExtractor(Extractor):
# don't download since it can be a live stream
data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False)
# Check for deletion indicators in video data
deletion_info = detect_deletion(video_data=data, url=url)
if deletion_info:
result = Metadata()
flag_as_deleted(result, deletion_info)
return result
result = _helper_for_successful_extract_info(data, info_extractor, url, ydl)
except MaxDownloadsReached:
@@ -502,6 +511,13 @@ class GenericExtractor(Extractor):
try:
result = self.get_metadata_for_post(info_extractor, url, ydl)
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
# Check if the error indicates deletion
deletion_info = detect_deletion(error_message=str(post_e), url=url)
if deletion_info:
result = Metadata()
flag_as_deleted(result, deletion_info)
return result
if "NSFW tweet requires authentication." in str(post_e):
logger.warning(str(post_e))
return False
@@ -516,7 +532,7 @@ class GenericExtractor(Extractor):
)
return False
if result:
if result and not result.is_success():
extractor_name = "yt-dlp"
if info_extractor:
extractor_name += f"_{info_extractor.ie_key()}"
@@ -535,7 +551,6 @@ class GenericExtractor(Extractor):
if url.startswith("https://ya.ru"):
url = url.replace("https://ya.ru", "https://yandex.ru")
item.set("replaced_url", url)
logger.debug(f"{skip_proxy=}, {self.proxy_on_failure_only=}, {self.proxy=}")
# proxy_on_failure_only logic
if self.proxy and self.proxy_on_failure_only and not skip_proxy:
@@ -605,9 +620,9 @@ class GenericExtractor(Extractor):
validated_options
) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
result: Metadata = None
for info_extractor in self.suitable_extractors(url):
result = self.download_for_extractor(info_extractor, url, ydl)
if result:
return result
return False
local_result: Metadata = self.download_for_extractor(info_extractor, url, ydl)
if local_result:
result = result.merge(local_result) if result else local_result
return result if result else False

View File

@@ -1,3 +1,4 @@
import re
import requests
from auto_archiver.utils.custom_logger import logger
@@ -14,12 +15,16 @@ class Tiktok(GenericDropin):
It's useful for capturing content that requires a login, like sensitive content.
"""
# Regex pattern to match TikTok photo post URLs
PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+"
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or (
re.match(self.PHOTO_URL_REGEX, url) is not None
)
def extract_post(self, url: str, ie_instance):
logger.debug("Using Tikwm API to attempt to download tiktok video")
@@ -28,56 +33,91 @@ class Tiktok(GenericDropin):
r = requests.get(endpoint)
if r.status_code != 200:
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com")
try:
json_response = r.json()
except ValueError:
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
raise ValueError("Failed to parse JSON response from tikwm.com")
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}")
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
api_data["video_url"] = video_url
play_url = api_data.pop("play", api_data.pop("wmplay", None))
if play_url and "mime_type=audio" in play_url:
play_url = None
if play_url:
api_data["video_url"] = play_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
return [
"video_url",
"title",
"create_time",
"author",
"cover",
"origin_cover",
"ai_dynamic_cover",
"duration",
"size",
"wm_size",
"music",
"music_info",
"play_count",
"digg_count",
"comment_count",
"share_count",
"download_count",
"collect_count",
"anchors",
"anchors_extras",
"is_ad",
"commerce_info",
"commercial_video_info",
"item_comment_settings",
"mentioned_users",
] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
video_url = post.pop("video_url")
is_success = False
# get the cover if possible
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
for image_url in post.pop("images", []):
if image_downloaded := archiver.download_from_url(image_url):
result.add_media(Media(image_downloaded))
is_success = True # this is an images post and we got it/them
# get the video if present, could be an image post
if video_url := post.pop("video_url", None):
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
is_success = True # this is a video post and we got it
# add remaining metadata
result.set_title(post.get("title", ""))
result.set_title(post.pop("title", ""))
if created_at := post.get("create_time", None):
if created_at := post.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.get("author", None):
if author := post.pop("author", None):
result.set("author", author)
result.set("api_data", post)
result.set("api_data", {k: v for k, v in post.items() if v})
if is_success:
result.success("yt-dlp_TikTok")
else:
raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.")
return result

View File

@@ -7,6 +7,7 @@ from slugify import slugify
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
from auto_archiver.core.extractor import Extractor
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
from auto_archiver.modules.generic_extractor.dropin import GenericDropin, InfoExtractor
@@ -37,7 +38,15 @@ class Twitter(GenericDropin):
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError("Error retreiving post. Are you sure it exists?")
# Check for deletion indicators
deletion_info = detect_deletion(
video_data=tweet, url=url, error_message="Missing user or created_at fields"
)
if deletion_info:
flag_as_deleted(result, deletion_info)
return result
raise ValueError("Error retrieving post. Are you sure it exists?")
timestamp = get_datetime_from_str(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")

View File

@@ -3,6 +3,13 @@
"type": ["enricher"],
"requires_setup": True,
"dependencies": {"python": ["loguru"], "bin": ["exiftool"]},
"configs": {
"look_for_keys": {
"default": [],
"help": "list of lowercased metadata keys that will be included in the enriched metadata. Special keys: 'author', 'datetimes', 'location' to include related metadata fields. The default empty list `[]` means all metadata will be included.",
"type": "list",
},
},
"description": """
Extracts metadata information from files using ExifTool.

View File

@@ -16,6 +16,8 @@ class MetadataEnricher(Enricher):
for i, m in enumerate(to_enrich.media):
if len(md := self.get_metadata(m.filename)):
if self.look_for_keys != []:
md = self.select_metadata(md, self.look_for_keys)
to_enrich.media[i].set("metadata", md)
def get_metadata(self, filename: str) -> dict:
@@ -23,7 +25,6 @@ class MetadataEnricher(Enricher):
# Run ExifTool command to extract metadata from the file
cmd = ["exiftool", filename]
result = subprocess.run(cmd, capture_output=True, text=True)
# Process the output to extract individual metadata fields
metadata = {}
for line in result.stdout.splitlines():
@@ -35,3 +36,33 @@ class MetadataEnricher(Enricher):
except Exception as e:
logger.error(f"Error occurred: {e}: {traceback.format_exc()}")
return {}
def select_metadata(self, all_md, requested_metadata_keys):
"""
coordinates the selection of metadata from the general exiftool output to the user-specified grocery list
"""
# defining the batches of metadata that get pulled for special terms
author_key_terms = ["author", "producer", "creator"]
datetime_key_terms = ["date", "time"]
location_key_terms = ["gps", "latitude", "longitude"]
specified_md = {}
for md_key in all_md.keys():
md_key_lower = md_key.lower()
# checking for special baskets within the grocery list of requested metadata
if ("author" in requested_metadata_keys) and any(
term in md_key_lower and len(all_md[md_key]) for term in author_key_terms
):
specified_md[md_key] = all_md[md_key]
if ("datetime" in requested_metadata_keys) and any(
term in md_key_lower and len(all_md[md_key]) for term in datetime_key_terms
):
specified_md[md_key] = all_md[md_key]
if ("location" in requested_metadata_keys) and any(
term in md_key_lower and len(all_md[md_key]) for term in location_key_terms
):
specified_md[md_key] = all_md[md_key]
# if the metadata value is requested directly
if md_key_lower in requested_metadata_keys or md_key in requested_metadata_keys and len(all_md[md_key]):
specified_md[md_key] = all_md[md_key]
return specified_md

View File

@@ -4,12 +4,12 @@ from importlib.metadata import version
import hashlib
from slugify import slugify
from retrying import retry
import requests
from auto_archiver.utils.custom_logger import logger
from rfc3161_client import (decode_timestamp_response,TimestampRequestBuilder,TimeStampResponse, VerifierBuilder)
from rfc3161_client import (decode_timestamp_response, TimestampRequestBuilder, TimeStampResponse, VerifierBuilder)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
from rfc3161_client.tsp import SignedData
from cryptography import x509
from cryptography.hazmat.primitives import serialization
@@ -60,7 +60,6 @@ class TimestampingEnricher(Enricher):
logger.debug(f"No hashes found")
return
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
@@ -75,7 +74,7 @@ class TimestampingEnricher(Enricher):
logger.debug(f"Timestamping with {tsa_url=}")
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities`
root_cert = self.verify_signed(signed, message)
@@ -113,7 +112,7 @@ class TimestampingEnricher(Enricher):
f.write(timestamp_token)
return tst_path
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
@@ -136,7 +135,7 @@ class TimestampingEnricher(Enricher):
if not cert_authorities:
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
timestamp_certs = self.tst_certs(timestamp_response)
intermediate_certs = timestamp_certs[1:-1]
@@ -148,7 +147,7 @@ class TimestampingEnricher(Enricher):
message_hash = hashlib.sha256(message).digest()
else:
raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")
for certificate in cert_authorities:
builder = VerifierBuilder()
builder.add_root_certificate(certificate)
@@ -158,7 +157,6 @@ class TimestampingEnricher(Enricher):
verifier = builder.build()
try:
verifier.verify(timestamp_response, message_hash)
return certificate
@@ -171,23 +169,38 @@ class TimestampingEnricher(Enricher):
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
timestamp_request = (
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
try:
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def sign_with_retry():
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
response.raise_for_status()
return response
try:
response = sign_with_retry()
except requests.RequestException as e:
logger.error(f"Error while sending request to {tsa_url=}: {e}")
raise
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def decode_with_retry(response):
return decode_timestamp_response(response.content)
# Check that we can parse the response but do not *verify* it
try:
timestamp_response = decode_timestamp_response(response.content)
timestamp_response = decode_with_retry(response)
except ValueError as e:
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
raise
return timestamp_response
def tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
@@ -196,7 +209,7 @@ class TimestampingEnricher(Enricher):
if len(certs) == 1:
return certs
while(len(ordered_certs) < len(certs)):
while (len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if cert.subject == c.issuer]:
@@ -220,7 +233,7 @@ class TimestampingEnricher(Enricher):
cert_chain = []
for i, cert in enumerate(certificates):
cert_fn = os.path.join(self.tmp_dir, f"{i+1} {str(cert.serial_number)[:20]}.crt")
cert_fn = os.path.join(self.tmp_dir, f"{i + 1} {str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))

View File

@@ -2,6 +2,13 @@ from loguru import logger
import json
def type_serializer(obj):
"""Fallback function for objects json can't handle."""
if isinstance(obj, type):
return obj.__name__
return str(obj)
def extract_location(record, short=False):
"""Extracts the file name, function name, and line number from the log record."""
if short:
@@ -35,11 +42,11 @@ def serialize_for_console(record):
subset.pop("time", None)
if not subset:
return ""
return json.dumps(subset, ensure_ascii=False)
return json.dumps(subset, ensure_ascii=False, default=type_serializer)
def serialize(record):
return json.dumps(extract_log_data(record), ensure_ascii=False)
return json.dumps(extract_log_data(record), ensure_ascii=False, default=type_serializer)
def patching(record):

View File

@@ -0,0 +1,273 @@
"""
Deletion Detection Utilities
Provides a best-effort detection of deleted, missing, or unavailable content
across various social media platforms based on presence of expected keywords.
This module helps identify removed content, helps to:
- Document content that existed but was deleted
- Track patterns of content removal
- Preserve metadata about missing content
"""
from typing import Optional, Dict, List
from auto_archiver.utils.custom_logger import logger
from urllib.parse import urlparse
class DeletionIndicators:
"""
Platform-specific indicators that content has been deleted or is unavailable, alongside generic indicators.
"""
# Twitter/X deletion indicators
TWITTER = [
"Hmm...this page doesn't exist",
"Try searching for something else",
"This Tweet is unavailable",
"This account doesn't exist",
"This Tweet has been deleted",
"This account has been suspended",
"Sorry, that page doesn't exist",
"The Tweet you're looking for isn't available",
]
# Facebook deletion indicators
FACEBOOK = [
"This content isn't available",
"Sorry, this content isn't available",
"This content is no longer available",
"The link you followed may be broken",
"Page Not Found",
"Content Not Found",
"This content is no longer on Facebook",
]
# Instagram deletion indicators
INSTAGRAM = [
"Sorry, this page isn't available",
"The link you followed may be broken",
"Media not found or unavailable",
"This post is no longer available",
"This account is private",
]
# TikTok deletion indicators
TIKTOK = [
"Couldn't find this account",
"This video is no longer available",
"This video is currently unavailable",
"Video not found",
"This video may have been deleted",
]
# YouTube deletion indicators
YOUTUBE = [
"This video isn't available anymore",
"Video unavailable",
"This video has been removed",
"This video is no longer available",
"This video is private",
"This video has been removed by the uploader",
"This video has been deleted",
]
# Reddit deletion indicators
REDDIT = [
"this post has been removed",
"this comment has been removed",
"[removed]",
"[deleted]",
"page not found",
"there doesn't seem to be anything here",
]
# VK deletion indicators
VK = [
"Post deleted",
"Page not found",
"Content unavailable",
"Access denied",
]
# Telegram deletion indicators
TELEGRAM = [
"Message not found",
"Deleted message",
"Channel is private",
]
# Generic indicators (work across platforms)
GENERIC = [
"has been removed",
"no longer available",
"content removed",
"access denied",
"page not found",
]
@classmethod
def all_indicators(cls) -> List[str]:
"""Returns all deletion indicators from all platforms."""
return (
cls.TWITTER
+ cls.FACEBOOK
+ cls.INSTAGRAM
+ cls.TIKTOK
+ cls.YOUTUBE
+ cls.REDDIT
+ cls.VK
+ cls.TELEGRAM
+ cls.GENERIC
)
@classmethod
def for_url(cls, url: str) -> List[str]:
"""Returns platform-specific indicators based on URL domain."""
platform = _extract_platform(url)
indicators_map = {
"twitter": cls.TWITTER + cls.GENERIC,
"facebook": cls.FACEBOOK + cls.GENERIC,
"instagram": cls.INSTAGRAM + cls.GENERIC,
"tiktok": cls.TIKTOK + cls.GENERIC,
"youtube": cls.YOUTUBE + cls.GENERIC,
"reddit": cls.REDDIT + cls.GENERIC,
"vk": cls.VK + cls.GENERIC,
"telegram": cls.TELEGRAM + cls.GENERIC,
}
return indicators_map.get(platform, cls.GENERIC)
def detect_deletion(
html_content: str = None,
page_title: str = None,
error_message: str = None,
url: str = None,
video_data: dict = None,
) -> Optional[Dict[str, any]]:
"""
Best-effort deletion detection across multiple signals.
Checks HTML content, page titles, error messages, and video metadata for
indicators that content has been deleted or is unavailable.
Args:
html_content: Raw HTML source of the page
page_title: Browser page title
error_message: Any error message from the extractor
url: The URL being archived (for platform-specific detection)
video_data: Video metadata from yt-dlp or other extractors
Returns:
Dictionary with deletion details if detected, None otherwise.
Format: {
"is_deleted": True,
"indicator": "specific text that was found",
"source": "html|title|error|metadata",
"platform": "twitter|facebook|etc"
}
"""
# Determine indicators to check based on URL
if url:
indicators = DeletionIndicators.for_url(url)
platform = _extract_platform(url)
else:
indicators = DeletionIndicators.all_indicators()
platform = "unknown"
# Check HTML content
if html_content:
for indicator in indicators:
if indicator.lower() in html_content.lower():
logger.info(f"Deletion detected in HTML: '{indicator}' found for {url}")
return {"is_deleted": True, "indicator": indicator, "source": "html_content", "platform": platform}
# Check page title
if page_title:
for indicator in indicators:
if indicator.lower() in page_title.lower():
logger.info(f"Deletion detected in page title: '{indicator}' found for {url}")
return {"is_deleted": True, "indicator": indicator, "source": "page_title", "platform": platform}
# Check error messages
if error_message:
for indicator in indicators:
if indicator.lower() in str(error_message).lower():
logger.info(f"Deletion detected in error: '{indicator}' found for {url}")
return {"is_deleted": True, "indicator": indicator, "source": "error_message", "platform": platform}
# Check video metadata (from yt-dlp)
if video_data:
# Check if yt-dlp flagged it as unavailable
if video_data.get("availability") in ["unavailable", "private", "deleted"]:
logger.info(f"Deletion detected in metadata: availability={video_data.get('availability')}")
return {
"is_deleted": True,
"indicator": f"availability: {video_data.get('availability')}",
"source": "video_metadata",
"platform": platform,
}
# Check description/title for deletion indicators
for key in ["title", "description", "fulltitle"]:
if key in video_data:
for indicator in indicators:
if indicator.lower() in str(video_data[key]).lower():
logger.info(f"Deletion detected in {key}: '{indicator}'")
return {
"is_deleted": True,
"indicator": indicator,
"source": f"video_metadata_{key}",
"platform": platform,
}
return None
def _extract_platform(url: str) -> str:
"""Extracts platform name from URL."""
parsed = urlparse(url)
domain = parsed.netloc
if "twitter.com" in domain or "x.com" in domain:
return "twitter"
elif "facebook.com" in domain or "fb.com" in domain:
return "facebook"
elif "instagram.com" in domain:
return "instagram"
elif "tiktok.com" in domain:
return "tiktok"
elif "youtube.com" in domain or "youtu.be" in domain:
return "youtube"
elif "reddit.com" in domain:
return "reddit"
elif "vk.com" in domain:
return "vk"
elif "t.me" in domain:
return "telegram"
return "unknown"
def flag_as_deleted(metadata, deletion_info: Dict[str, any]) -> None:
"""
Flags metadata object as deleted/unavailable.
Adds tentative deletion information to the metadata object.
Args:
metadata: Metadata object to update
deletion_info: Dictionary from detect_deletion()
"""
metadata.set("deletion_detected", True)
metadata.set("deletion_indicator", deletion_info.get("indicator"))
metadata.set("deletion_source", deletion_info.get("source"))
metadata.set("deletion_platform", deletion_info.get("platform"))
metadata.status = "deleted_or_unavailable"
logger.debug(
f"Content marked as deleted/unavailable: "
f"platform={deletion_info.get('platform')}, "
f"indicator='{deletion_info.get('indicator')}', "
f"source={deletion_info.get('source')}"
)

View File

@@ -56,6 +56,19 @@ def test_enrich_sets_metadata(enricher, mocker):
assert metadata.media == [media1, media2]
def test_enrich_no_metadata_selection(enricher, mocker):
media1 = mocker.Mock(filename="img1.jpg")
media2 = mocker.Mock(filename="img2.jpg")
metadata = mocker.Mock()
metadata.media = [media1, media2]
enricher.get_metadata = lambda f: {"key": "value"} if f == "img1.jpg" else {}
enricher.look_for_keys = ["no-key"]
enricher.enrich(metadata)
media1.set.assert_called_once_with("metadata", {})
media2.set.assert_not_called()
assert metadata.media == [media1, media2]
def test_enrich_empty_media(enricher, mocker):
metadata = mocker.Mock()
metadata.media = []
@@ -71,7 +84,9 @@ def test_get_metadata_error_handling(enricher, mocker):
assert "Error occurred: " in mock_log.call_args[0][0]
def test_metadata_pickle(enricher, unpickle, mocker):
# TODO depends on the expected functionality
"""
def test_default_metadata_pickle(enricher, unpickle, mocker):
mock_run = mocker.patch("subprocess.run")
# Uses pickled values
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
@@ -79,6 +94,39 @@ def test_metadata_pickle(enricher, unpickle, mocker):
expected = unpickle("metadata_enricher_ytshort_expected.pickle")
enricher.enrich(metadata)
expected_media = expected.media
print(expected_media)
actual_media = metadata.media
assert len(expected_media) == len(actual_media)
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")
"""
def test_metadata_pickle_megapixel(enricher, unpickle, mocker):
mock_run = mocker.patch("subprocess.run")
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
metadata = unpickle("metadata_enricher_ytshort_input.pickle")
enricher.look_for_keys = ["megapixels"]
enricher.enrich(metadata)
actual_media = metadata.media
assert actual_media[0].properties.get("metadata") == {"Megapixels": "0.922"}
def test_metadata_specify_datetime_and_metapixels(enricher, unpickle, mocker):
mock_run = mocker.patch("subprocess.run")
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
metadata = unpickle("metadata_enricher_ytshort_input.pickle")
enricher.look_for_keys = ["datetime", "megapixels", "image height"]
enricher.enrich(metadata)
actual_media = metadata.media
assert actual_media[0].properties.get("metadata") == {
"File Modification Date/Time": "2025:02:18 19:42:50+00:00",
"File Access Date/Time": "2025:02:18 19:42:50+00:00",
"File Inode Change Date/Time": "2025:02:18 19:42:50+00:00",
"Megapixels": "0.922",
"Image Height": "720",
}

View File

@@ -5,6 +5,9 @@ from auto_archiver.modules.antibot_extractor_enricher.antibot_extractor_enricher
from .test_extractor_base import TestExtractorBase
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
class DummySB:
def __init__(self, url="", title="", visible_texts=None, visible_elements=None):
self._url = url
@@ -51,7 +54,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
@pytest.mark.download
@pytest.mark.parametrize(
"url,in_title,in_text,image_count,video_count",
"url,in_title,in_text,image_count,video_count,skip_ci",
[
(
"https://en.wikipedia.org/wiki/Western_barn_owl",
@@ -59,6 +62,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Tyto alba",
5,
0,
False,
),
(
"https://www.bellingcat.com/news/2025/04/29/open-sources-show-myanmar-junta-airstrike-damages-despite-post-earthquake-ceasefire/",
@@ -66,6 +70,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Bellingcat has geolocated",
5,
0,
False,
),
(
"https://www.bellingcat.com/news/2025/03/27/gaza-israel-palestine-shot-killed-injured-destroyed-dangerous-drone-journalists-in-gaza/",
@@ -73,6 +78,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"continued the work of Gazan journalists",
5,
1,
False,
),
(
"https://www.bellingcat.com/about/general-information",
@@ -80,6 +86,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Stichting Bellingcat",
0, # SVGs are ignored
0,
False,
),
(
"https://vk.com/wikipedia?from=search&w=wall-36156673_20451",
@@ -87,6 +94,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"16 сентября 1985 года лейблом EMI Records.",
5,
0,
False,
),
(
"https://www.tiktok.com/@tracy_2424/photo/7418200173953830162",
@@ -94,13 +102,19 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Dito ko lang",
1,
0,
True,
),
],
)
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):
def test_download_pages_with_media(
self, setup_module, make_item, url, in_title, in_text, image_count, video_count, skip_ci
):
"""
Test downloading pages with media.
"""
if CI and skip_ci:
pytest.skip("Skipping test in CI environment")
self.extractor = setup_module(
self.extractor_module,
self.config

View File

@@ -48,8 +48,6 @@ class TestGenericExtractor(TestExtractorBase):
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
],
)
def test_suitable_extractors(self, url, suitable_extractors):
@@ -148,6 +146,7 @@ class TestGenericExtractor(TestExtractorBase):
def test_bluesky_download_video(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
result = self.extractor.download(item)
assert result.get_url() == "https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i"
assert result is not False
@pytest.mark.skipif(not TEST_TRUTH_SOCIAL, reason="Truth social download tests disabled in environment variables.")

View File

@@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
("https://vt.tiktok.com/ZSMTJeqRP/", True),
("https://tiktok.com/@user/photo/123?lang=en", True),
],
)
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
@@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
mock_get.return_value.json.side_effect = Exception
with caplog.at_level("ERROR"):
@@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called()
assert mock_get.call_count == 2
assert mock_get.return_value.json.call_count == 2
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
@pytest.mark.parametrize(
"response",
@@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
assert "failed to get a valid response from tikwm.com" in caplog.text
assert "Unable to download with tikwm.com: " in caplog.text
@pytest.mark.parametrize(
"response,has_vid",
"response,is_success",
[
({"data": {"id": 123}}, False),
({"data": {"wmplay": "url"}}, True),
({"data": {"play": "url"}}, True),
({"data": {"id": 123, "images": []}}, False),
({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True),
({"data": {"play": "url", "images": ["img1.jpg"]}}, True),
({"data": {"images": ["img1.jpg"]}}, True),
],
)
def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker):
def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker):
data = {k: v for k, v in response.get("data", {}).items()}
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"msg": "success", **response}
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
if not has_vid:
assert result is False
else:
total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0)
if is_success:
assert result.is_success()
assert len(result.media) == 1
assert len(result.media) == total_media
else:
assert result is False
mock_get.assert_called()
assert mock_get.call_count == 1 + int(has_vid)
assert mock_get.call_count == 1 + total_media
mock_get.return_value.json.assert_called_once()
def test_correct_data_extracted(self, mock_get, make_item):
@@ -142,7 +140,9 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert len(result.media) == 2
assert result.get_title() == "Title"
assert result.get("author") == "Author"
assert result.get("api_data") == {"other": "data", "id": 123}
assert result.get("other") == "data"
assert result.get("comments") is None
assert result.get("api_data") == {"id": 123, "other": "data"}
assert result.media[1].get("duration") == 60
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)

View File

@@ -0,0 +1,147 @@
"""
Tests for deletion detection utilities.
These tests verify the current best-effort by the auto-archiver
to detect when content has been deleted or is unavailable across
various platforms.
"""
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted, DeletionIndicators
from auto_archiver.core.metadata import Metadata
class TestDeletionIndicators:
"""Test the deletion indicator lists for various platforms."""
def test_twitter_indicators(self):
"""Verify Twitter deletion indicators are comprehensive."""
assert "Hmm...this page doesn't exist" in DeletionIndicators.TWITTER
assert "Try searching for something else" in DeletionIndicators.TWITTER
assert "This Tweet is unavailable" in DeletionIndicators.TWITTER
def test_platform_specific_indicators(self):
"""Test that platform-specific indicators are returned based on URL."""
twitter_indicators = DeletionIndicators.for_url("https://twitter.com/user/status/123")
assert any("page doesn't exist" in ind.lower() for ind in twitter_indicators)
instagram_indicators = DeletionIndicators.for_url("https://instagram.com/p/ABC123")
assert any("page isn't available" in ind.lower() for ind in instagram_indicators)
class TestDetectDeletion:
"""Test the detect_deletion function with various inputs."""
def test_detect_deletion_in_html_twitter(self):
"""Test detection of Twitter's deleted post page."""
html = "<html><body>Hmm...this page doesn't exist. Try searching for something else.</body></html>"
url = "https://twitter.com/user/status/123"
result = detect_deletion(html_content=html, url=url)
assert result is not None
assert result["is_deleted"] is True
assert result["platform"] == "twitter"
assert result["source"] == "html_content"
assert "page doesn't exist" in result["indicator"].lower()
def test_detect_deletion_in_page_title(self):
"""Test detection via page title."""
title = "Page Not Found"
url = "https://facebook.com/post/123"
result = detect_deletion(page_title=title, url=url)
assert result is not None
assert result["is_deleted"] is True
assert result["source"] == "page_title"
def test_detect_deletion_in_error_message(self):
"""Test detection via error messages."""
error = "yt_dlp.utils.DownloadError: This video is no longer available"
url = "https://youtube.com/watch?v=abc123"
result = detect_deletion(error_message=error, url=url)
assert result is not None
assert result["is_deleted"] is True
assert result["platform"] == "youtube"
assert result["source"] == "error_message"
def test_detect_deletion_in_video_metadata(self):
"""Test detection via yt-dlp video metadata."""
video_data = {"availability": "unavailable", "title": "Private video"}
url = "https://youtube.com/watch?v=test123"
result = detect_deletion(video_data=video_data, url=url)
assert result is not None
assert result["is_deleted"] is True
assert result["source"] == "video_metadata"
assert "availability" in result["indicator"]
def test_no_deletion_detected(self):
"""Test that normal content is not flagged as deleted."""
html = "<html><body><h1>Welcome to my page</h1><p>This is normal content.</p></body></html>"
title = "My Normal Page"
url = "https://example.com/page"
result = detect_deletion(html_content=html, page_title=title, url=url)
assert result is None
def test_instagram_media_not_found(self):
"""Test Instagram-specific deletion message."""
error = "Media not found or unavailable"
url = "https://instagram.com/p/ABC123"
result = detect_deletion(error_message=error, url=url)
assert result is not None
assert result["platform"] == "instagram"
assert "not found" in result["indicator"].lower()
def test_reddit_removed_content(self):
"""Test Reddit [removed] and [deleted] markers."""
html = "<div class='comment'>[removed]</div>"
url = "https://reddit.com/r/test/comments/abc123"
result = detect_deletion(html_content=html, url=url)
assert result is not None
assert result["platform"] == "reddit"
class TestFlagAsDeleted:
"""Test the flag_as_deleted function."""
def test_flag_metadata_as_deleted(self):
"""Verify that metadata is properly flagged with deletion info."""
metadata = Metadata()
deletion_info = {
"is_deleted": True,
"indicator": "This Tweet is unavailable",
"source": "html_content",
"platform": "twitter",
}
flag_as_deleted(metadata, deletion_info)
assert metadata.get("deletion_detected") is True
assert metadata.get("deletion_indicator") == "This Tweet is unavailable"
assert metadata.get("deletion_source") == "html_content"
assert metadata.get("deletion_platform") == "twitter"
assert metadata.status == "deleted_or_unavailable"
def test_metadata_contains_deletion_context(self):
"""Verify investigators have full context about the deletion."""
metadata = Metadata()
deletion_info = {
"is_deleted": True,
"indicator": "Video has been removed by the uploader",
"source": "error_message",
"platform": "youtube",
}
flag_as_deleted(metadata, deletion_info)
assert "deletion_indicator" in metadata.metadata
assert "uploader" in metadata.get("deletion_indicator")