From ea2c266fa272f4f911dd8a08084ff097c28875f0 Mon Sep 17 00:00:00 2001
From: msramalho <19508417+msramalho@users.noreply.github.com>
Date: Thu, 19 Jan 2023 00:27:11 +0000
Subject: [PATCH] clean up and wacz WIP
---
src/archivers/__init__.py | 4 +-
src/archivers/archiver.py | 19 ++-
src/archivers/instagram_archiver.py | 140 --------------------
src/archivers/telegram_archiver.py | 89 -------------
src/archivers/telethon_archiver.py | 125 -----------------
src/archivers/tiktok_archiver.py | 72 ----------
src/archivers/twitter_api_archiver.py | 75 -----------
src/archivers/twitter_archiver.py | 105 ---------------
src/archivers/vk_archiver.py | 74 -----------
src/archivers/wayback_archiver.py | 89 -------------
src/archivers/youtubedl_archiver.py | 118 -----------------
src/configs/v2config.py | 9 +-
src/enrichers/__init__.py | 5 +-
src/enrichers/wacz_enricher.py | 70 ++++++++++
src/enrichers/wayback_enricher.py | 36 +++--
src/formatters/templates/html_template.html | 1 -
src/formatters/templates/macros.html | 15 ++-
src/media.py | 2 +-
src/storages/s3.py | 3 +-
19 files changed, 141 insertions(+), 910 deletions(-)
delete mode 100644 src/archivers/instagram_archiver.py
delete mode 100644 src/archivers/telegram_archiver.py
delete mode 100644 src/archivers/telethon_archiver.py
delete mode 100644 src/archivers/tiktok_archiver.py
delete mode 100644 src/archivers/twitter_api_archiver.py
delete mode 100644 src/archivers/twitter_archiver.py
delete mode 100644 src/archivers/vk_archiver.py
delete mode 100644 src/archivers/wayback_archiver.py
delete mode 100644 src/archivers/youtubedl_archiver.py
create mode 100644 src/enrichers/wacz_enricher.py
diff --git a/src/archivers/__init__.py b/src/archivers/__init__.py
index 51d3546..22e142f 100644
--- a/src/archivers/__init__.py
+++ b/src/archivers/__init__.py
@@ -1,16 +1,16 @@
# we need to explicitly expose the available imports here
from .base_archiver import Archiver, ArchiveResult
-from .archiver import Archiverv2
# from .telegram_archiver import TelegramArchiver
# from .telethon_archiver import TelethonArchiver
# from .tiktok_archiver import TiktokArchiver
-from .wayback_archiver import WaybackArchiver
+# from .wayback_archiver import WaybackArchiver
# from .youtubedl_archiver import YoutubeDLArchiver
# from .twitter_archiver import TwitterArchiver
# from .vk_archiver import VkArchiver
# from .twitter_api_archiver import TwitterApiArchiver
# from .instagram_archiver import InstagramArchiver
+from .archiver import Archiverv2
from .telethon_archiverv2 import TelethonArchiver
from .twitter_archiverv2 import TwitterArchiver
from .twitter_api_archiverv2 import TwitterApiArchiver
diff --git a/src/archivers/archiver.py b/src/archivers/archiver.py
index 369dd60..7682e11 100644
--- a/src/archivers/archiver.py
+++ b/src/archivers/archiver.py
@@ -15,9 +15,8 @@ class Archiverv2(Step):
# without this STEP.__init__ is not called
super().__init__(config)
- # only for typing...
-
def init(name: str, config: dict) -> Archiverv2:
+ # only for typing...
return Step.init(name, config, Archiverv2)
def setup(self) -> None:
@@ -58,3 +57,19 @@ class Archiverv2(Step):
@abstractmethod
def download(self, item: Metadata) -> Metadata: pass
+
+ # TODO: how to fix allow predictable key
+ # def get_key(self, filename):
+ # """
+ # returns a key in the format "[archiverName]_[filename]" includes extension
+ # """
+ # tail = os.path.split(filename)[1] # returns filename.ext from full path
+ # _id, extension = os.path.splitext(tail) # returns [filename, .ext]
+ # if 'unknown_video' in _id:
+ # _id = _id.replace('unknown_video', 'jpg')
+
+ # # long filenames can cause problems, so trim them if necessary
+ # if len(_id) > 128:
+ # _id = _id[-128:]
+
+ # return f'{self.name}_{_id}{extension}'
\ No newline at end of file
diff --git a/src/archivers/instagram_archiver.py b/src/archivers/instagram_archiver.py
deleted file mode 100644
index 62db876..0000000
--- a/src/archivers/instagram_archiver.py
+++ /dev/null
@@ -1,140 +0,0 @@
-import re, os, shutil, html, traceback
-import instaloader # https://instaloader.github.io/as-module.html
-from loguru import logger
-
-from .base_archiver import Archiver, ArchiveResult
-from configs import Config
-from storages import Storage
-
-
-class InstagramArchiver(Archiver):
- """
- Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, )
- """
- name = "instagram"
- DOWNLOAD_FOLDER = "instaloader"
- # NB: post should be tested before profile
- # https://regex101.com/r/MGPquX/1
- post_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(?:p|reel)\/(\w+)")
- # https://regex101.com/r/6Wbsxa/1
- profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)")
-
- def __init__(self, storage: Storage, config: Config):
- super().__init__(storage, config)
- self.insta = instaloader.Instaloader(download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.DOWNLOAD_FOLDER, filename_pattern="{date_utc}_UTC_{target}__{typename}")
- if config.instagram_config:
- try:
- self.insta.load_session_from_file(config.instagram_config.username, config.instagram_config.session_file)
- except Exception as e:
- logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
- try:
- self.insta.login(config.instagram_config.username, config.instagram_config.
- password)
- #TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758
- self.insta.save_session_to_file(config.instagram_config.session_file)
- except Exception as e2:
- logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
-
-
-
- def download(self, url, check_if_exists=False):
- post_matches = self.post_pattern.findall(url)
- profile_matches = self.profile_pattern.findall(url)
-
- # return if not a valid instagram link
- if not len(post_matches) and not len(profile_matches):
- return
-
- # check if already uploaded
- key = self.get_html_key(url)
- if check_if_exists and self.storage.exists(key):
- # only s3 storage supports storage.exists as not implemented on gd
- cdn_url = self.storage.get_cdn_url(key)
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
- return self.generateArchiveResult(status='already archived', cdn_url=cdn_url, screenshot=screenshot, wacz=wacz)
-
- try:
- # process if post
- if len(post_matches):
- return self.download_post(url, post_matches[0])
-
- # process if profile
- if len(profile_matches):
- return self.download_profile(url, profile_matches[0])
- finally:
- shutil.rmtree(self.DOWNLOAD_FOLDER, ignore_errors=True)
-
- def download_post(self, url, post_id):
- logger.debug(f"Instagram {post_id=} detected in {url=}")
-
- post = instaloader.Post.from_shortcode(self.insta.context, post_id)
- if self.insta.download_post(post, target=post.owner_username):
- return self.upload_downloaded_content(url, post.title, post._asdict(), post.date)
-
- def download_profile(self, url, username):
- # gets posts, posts where username is tagged, igtv postss, stories, and highlights
- logger.debug(f"Instagram {username=} detected in {url=}")
-
- profile = instaloader.Profile.from_username(self.insta.context, username)
- try:
- for post in profile.get_posts():
- try: self.insta.download_post(post, target=f"profile_post_{post.owner_username}")
- except Exception as e: logger.error(f"Failed to download post: {post.shortcode}: {e}")
- except Exception as e: logger.error(f"Failed profile.get_posts: {e}")
-
- try:
- for post in profile.get_tagged_posts():
- try: self.insta.download_post(post, target=f"tagged_post_{post.owner_username}")
- except Exception as e: logger.error(f"Failed to download tagged post: {post.shortcode}: {e}")
- except Exception as e: logger.error(f"Failed profile.get_tagged_posts: {e}")
-
- try:
- for post in profile.get_igtv_posts():
- try: self.insta.download_post(post, target=f"igtv_post_{post.owner_username}")
- except Exception as e: logger.error(f"Failed to download igtv post: {post.shortcode}: {e}")
- except Exception as e: logger.error(f"Failed profile.get_igtv_posts: {e}")
-
- try:
- for story in self.insta.get_stories([profile.userid]):
- for item in story.get_items():
- try: self.insta.download_storyitem(item, target=f"story_item_{story.owner_username}")
- except Exception as e: logger.error(f"Failed to download story item: {item}: {e}")
- except Exception as e: logger.error(f"Failed get_stories: {e}")
-
- try:
- for highlight in self.insta.get_highlights(profile.userid):
- for item in highlight.get_items():
- try: self.insta.download_storyitem(item, target=f"highlight_item_{highlight.owner_username}")
- except Exception as e: logger.error(f"Failed to download highlight item: {item}: {e}")
- except Exception as e: logger.error(f"Failed get_highlights: {e}")
-
- return self.upload_downloaded_content(url, f"@{username}", profile._asdict(), None)
-
- def upload_downloaded_content(self, url, title, content, date):
- status = "success"
- try:
- uploaded_media = []
- for f in os.listdir(self.DOWNLOAD_FOLDER):
- if os.path.isfile((filename := os.path.join(self.DOWNLOAD_FOLDER, f))):
- key = self.get_key(filename)
- self.storage.upload(filename, key)
- hash = self.get_hash(filename)
- cdn_url = self.storage.get_cdn_url(key)
- uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
- assert len(uploaded_media) > 1, "No uploaded media found"
-
- uploaded_media.sort(key=lambda m:m["key"], reverse=True)
-
- page_cdn, page_hash, _ = self.generate_media_page_html(url, uploaded_media, html.escape(str(content)))
- except Exception as e:
- logger.error(f"Could not fetch instagram post {url} due to: {e}")
- status = "error"
- finally:
- shutil.rmtree(self.DOWNLOAD_FOLDER, ignore_errors=True)
-
- if status == "success":
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
-
- return self.generateArchiveResult(status=status, cdn_url=page_cdn, title=title, timestamp=date, hash=page_hash, screenshot=screenshot, wacz=wacz)
diff --git a/src/archivers/telegram_archiver.py b/src/archivers/telegram_archiver.py
deleted file mode 100644
index c6d8747..0000000
--- a/src/archivers/telegram_archiver.py
+++ /dev/null
@@ -1,89 +0,0 @@
-import os, requests, re
-
-import html
-from bs4 import BeautifulSoup
-from loguru import logger
-
-from .base_archiver import Archiver, ArchiveResult
-from storages import Storage
-
-
-class TelegramArchiver(Archiver):
- name = "telegram"
-
- def download(self, url, check_if_exists=False):
- # detect URLs that we definitely cannot handle
- if 't.me' != self.get_netloc(url):
- return False
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
- }
- status = "success"
-
- original_url = url
-
- # TODO: check if we can do this more resilient to variable URLs
- if url[-8:] != "?embed=1":
- url += "?embed=1"
-
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
-
- t = requests.get(url, headers=headers)
- s = BeautifulSoup(t.content, 'html.parser')
- video = s.find("video")
-
- if video is None:
- logger.warning("could not find video")
- image_tags = s.find_all(class_="js-message_photo")
-
- images = []
- for im in image_tags:
- urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
- images += urls
-
- page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content)))
- time_elements = s.find_all('time')
- timestamp = time_elements[0].get('datetime') if len(time_elements) else None
-
- return self.generateArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, wacz=wacz)
-
- video_url = video.get('src')
- video_id = video_url.split('/')[-1].split('?')[0]
- key = self.get_key(video_id)
-
- filename = os.path.join(Storage.TMP_FOLDER, key)
-
- if check_if_exists and self.storage.exists(key):
- status = 'already archived'
-
- v = requests.get(video_url, headers=headers)
-
- with open(filename, 'wb') as f:
- f.write(v.content)
-
- if status != 'already archived':
- self.storage.upload(filename, key)
-
- hash = self.get_hash(filename)
-
- # extract duration from HTML
- try:
- duration = s.find_all('time')[0].contents[0]
- if ':' in duration:
- duration = float(duration.split(
- ':')[0]) * 60 + float(duration.split(':')[1])
- else:
- duration = float(duration)
- except:
- duration = ""
-
- # process thumbnails
- key_thumb, thumb_index = self.get_thumbnails(
- filename, key, duration=duration)
- os.remove(filename)
-
- cdn_url = self.storage.get_cdn_url(key)
- return self.generateArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index,
- duration=duration, title=original_url, timestamp=s.find_all('time')[1].get('datetime'), hash=hash, screenshot=screenshot, wacz=wacz)
diff --git a/src/archivers/telethon_archiver.py b/src/archivers/telethon_archiver.py
deleted file mode 100644
index a2cbf0a..0000000
--- a/src/archivers/telethon_archiver.py
+++ /dev/null
@@ -1,125 +0,0 @@
-import os, re, html
-from loguru import logger
-from telethon.sync import TelegramClient
-from telethon.errors import ChannelInvalidError
-
-from storages import Storage
-from .base_archiver import Archiver, ArchiveResult
-from configs import Config
-from utils import getattr_or
-
-
-class TelethonArchiver(Archiver):
- name = "telethon"
- link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
-
- def __init__(self, storage: Storage, config: Config):
- super().__init__(storage, config)
- if config.telegram_config:
- c = config.telegram_config
- self.client = TelegramClient("./anon.session", c.api_id, c.api_hash)
- self.bot_token = c.bot_token
-
- def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
- """
- Searches for Telegram posts that are part of the same group of uploads
- The search is conducted around the id of the original post with an amplitude
- of `max_amp` both ways
- Returns a list of [post] where each post has media and is in the same grouped_id
- """
- if getattr_or(original_post, "grouped_id") is None:
- return [original_post] if getattr_or(original_post, "media") else []
-
- search_ids = [i for i in range(original_post.id - max_amp, original_post.id + max_amp + 1)]
- posts = self.client.get_messages(chat, ids=search_ids)
- media = []
- for post in posts:
- if post is not None and post.grouped_id == original_post.grouped_id and post.media is not None:
- media.append(post)
- return media
-
- def download(self, url, check_if_exists=False):
- if not hasattr(self, "client"):
- logger.warning('Missing Telethon config')
- return False
-
- # detect URLs that we definitely cannot handle
- matches = self.link_pattern.findall(url)
- if not len(matches):
- return False
-
- status = "success"
-
- # app will ask (stall for user input!) for phone number and auth code if anon.session not found
- with self.client.start(bot_token=self.bot_token):
- matches = list(matches[0])
- chat, post_id = matches[1], matches[2]
-
- post_id = int(post_id)
-
- try:
- post = self.client.get_messages(chat, ids=post_id)
- except ValueError as e:
- logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
- return False
- except ChannelInvalidError as e:
- logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}")
- return False
-
- if post is None: return False
-
- media_posts = self._get_media_posts_in_group(chat, post)
- logger.debug(f'got {len(media_posts)=} for {url=}')
-
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
-
- if len(media_posts) > 0:
- key = self.get_html_key(url)
-
- if check_if_exists and self.storage.exists(key):
- # only s3 storage supports storage.exists as not implemented on gd
- cdn_url = self.storage.get_cdn_url(key)
- return self.generateArchiveResult(status='already archived', cdn_url=cdn_url, title=post.message, timestamp=post.date, screenshot=screenshot, wacz=wacz)
-
- key_thumb, thumb_index = None, None
- group_id = post.grouped_id if post.grouped_id is not None else post.id
- uploaded_media = []
- message = post.message
- for mp in media_posts:
- if len(mp.message) > len(message): message = mp.message
-
- # media can also be in entities
- if mp.entities:
- other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image"]]
- logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}")
- for om_url in other_media_urls:
- filename = os.path.join(Storage.TMP_FOLDER, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}')
- self.download_from_url(om_url, filename)
- key = filename.split(Storage.TMP_FOLDER)[1]
- self.storage.upload(filename, key)
- hash = self.get_hash(filename)
- cdn_url = self.storage.get_cdn_url(key)
- uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
-
- filename_dest = os.path.join(Storage.TMP_FOLDER, f'{chat}_{group_id}', str(mp.id))
- filename = self.client.download_media(mp.media, filename_dest)
- if not filename:
- logger.debug(f"Empty media found, skipping {str(mp)=}")
- continue
-
- key = filename.split(Storage.TMP_FOLDER)[1]
- self.storage.upload(filename, key)
- hash = self.get_hash(filename)
- cdn_url = self.storage.get_cdn_url(key)
- uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
- if key_thumb is None:
- key_thumb, thumb_index = self.get_thumbnails(filename, key)
- os.remove(filename)
-
- page_cdn, page_hash, _ = self.generate_media_page_html(url, uploaded_media, html.escape(str(post)))
-
- return self.generateArchiveResult(status=status, cdn_url=page_cdn, title=message, timestamp=post.date, hash=page_hash, screenshot=screenshot, thumbnail=key_thumb, thumbnail_index=thumb_index, wacz=wacz)
-
- page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(str(post)))
- return self.generateArchiveResult(status=status, cdn_url=page_cdn, title=post.message, timestamp=getattr_or(post, "date"), hash=page_hash, screenshot=screenshot, wacz=wacz)
diff --git a/src/archivers/tiktok_archiver.py b/src/archivers/tiktok_archiver.py
deleted file mode 100644
index 55cb97e..0000000
--- a/src/archivers/tiktok_archiver.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import os, traceback
-import tiktok_downloader
-from loguru import logger
-
-from .base_archiver import Archiver, ArchiveResult
-from storages import Storage
-
-
-class TiktokArchiver(Archiver):
- name = "tiktok"
-
- def download(self, url, check_if_exists=False):
- if 'tiktok.com' not in url:
- return False
-
- status = 'success'
-
- try:
- info = tiktok_downloader.info_post(url)
- key = self.get_key(f'{info.id}.mp4')
- filename = os.path.join(Storage.TMP_FOLDER, key)
- logger.info(f'found video {key=}')
-
- if check_if_exists and self.storage.exists(key):
- status = 'already archived'
-
- media = tiktok_downloader.snaptik(url).get_media()
-
- if len(media) <= 0:
- if status == 'already archived':
- return self.generateArchiveResult(status='Could not download media, but already archived', cdn_url=self.storage.get_cdn_url(key))
- else:
- return self.generateArchiveResult(status='Could not download media')
-
- logger.info(f'downloading video {key=}')
- media[0].download(filename)
-
- if status != 'already archived':
- logger.info(f'uploading video {key=}')
- self.storage.upload(filename, key)
-
- try:
- key_thumb, thumb_index = self.get_thumbnails(filename, key, duration=info.duration)
- except Exception as e:
- logger.error(e)
- key_thumb = ''
- thumb_index = 'error creating thumbnails'
-
- hash = self.get_hash(filename)
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
-
- try: os.remove(filename)
- except FileNotFoundError:
- logger.info(f'tmp file not found thus not deleted {filename}')
- cdn_url = self.storage.get_cdn_url(key)
- timestamp = info.create.isoformat() if hasattr(info, "create") else None
-
- return self.generateArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb,
- thumbnail_index=thumb_index, duration=getattr(info, "duration", 0), title=getattr(info, "caption", ""),
- timestamp=timestamp, hash=hash, screenshot=screenshot, wacz=wacz)
-
- except tiktok_downloader.Except.InvalidUrl as e:
- status = 'Invalid URL'
- logger.warning(f'Invalid URL on {url} {e}\n{traceback.format_exc()}')
- return self.generateArchiveResult(status=status)
-
- except:
- error = traceback.format_exc()
- status = 'Other Tiktok error: ' + str(error)
- logger.warning(f'Other Tiktok error' + str(error))
- return self.generateArchiveResult(status=status)
diff --git a/src/archivers/twitter_api_archiver.py b/src/archivers/twitter_api_archiver.py
deleted file mode 100644
index da56d31..0000000
--- a/src/archivers/twitter_api_archiver.py
+++ /dev/null
@@ -1,75 +0,0 @@
-
-import json
-from datetime import datetime
-from loguru import logger
-from pytwitter import Api
-
-from storages.base_storage import Storage
-from configs import Config
-from .base_archiver import ArchiveResult
-from .twitter_archiver import TwitterArchiver
-
-
-class TwitterApiArchiver(TwitterArchiver):
- name = "twitter_api"
-
- def __init__(self, storage: Storage, config: Config):
- super().__init__(storage, config)
- c = config.twitter_config
-
- if c.bearer_token:
- self.api = Api(bearer_token=c.bearer_token)
- elif c.consumer_key and c.consumer_secret and c.access_token and c.access_secret:
- self.api = Api(
- consumer_key=c.consumer_key, consumer_secret=c.consumer_secret, access_token=c.access_token, access_secret=c.access_secret)
-
- def download(self, url, check_if_exists=False):
- if not hasattr(self, "api"):
- logger.warning('Missing Twitter API config')
- return False
-
- username, tweet_id = self.get_username_tweet_id(url)
- if not username: return False
-
- tweet = self.api.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
- timestamp = datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
-
- # check if exists
- key = self.get_html_key(url)
- if check_if_exists and self.storage.exists(key):
- # only s3 storage supports storage.exists as not implemented on gd
- cdn_url = self.storage.get_cdn_url(key)
- screenshot = self.get_screenshot(url)
- return self.generateArchiveResult(status='already archived', cdn_url=cdn_url, title=tweet.data.text, timestamp=timestamp, screenshot=screenshot)
-
- urls = []
- if tweet.includes:
- for m in tweet.includes.media:
- if m.url:
- urls.append(m.url)
- elif hasattr(m, "variants"):
- var_url = self.choose_variant(m.variants)
- urls.append(var_url)
- else:
- urls.append(None) # will trigger error
-
- for u in urls:
- if u is None:
- logger.debug(f"Should not have gotten None url for {tweet.includes.media=} so going to download_alternative in twitter_archiver")
- return self.download_alternative(url, tweet_id)
- logger.debug(f"found {urls=}")
-
- output = json.dumps({
- "id": tweet.data.id,
- "text": tweet.data.text,
- "created_at": tweet.data.created_at,
- "author_id": tweet.data.author_id,
- "geo": tweet.data.geo,
- "lang": tweet.data.lang,
- "media": urls
- }, ensure_ascii=False, indent=4)
-
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
- page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, output)
- return self.generateArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet.data.text, wacz=wacz)
diff --git a/src/archivers/twitter_archiver.py b/src/archivers/twitter_archiver.py
deleted file mode 100644
index f1f22c0..0000000
--- a/src/archivers/twitter_archiver.py
+++ /dev/null
@@ -1,105 +0,0 @@
-import html, re, requests
-from datetime import datetime
-from loguru import logger
-from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
-
-from .base_archiver import Archiver, ArchiveResult
-
-class TwitterArchiver(Archiver):
- """
- This Twitter Archiver uses unofficial scraping methods, and it works as
- an alternative to TwitterApiArchiver when no API credentials are provided.
- """
-
- name = "twitter"
- link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
-
- def get_username_tweet_id(self, url):
- # detect URLs that we definitely cannot handle
- matches = self.link_pattern.findall(url)
- if not len(matches): return False, False
-
- username, tweet_id = matches[0] # only one URL supported
- logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
-
- return username, tweet_id
-
- def download(self, url, check_if_exists=False):
- username, tweet_id = self.get_username_tweet_id(url)
- if not username: return False
-
- scr = TwitterTweetScraper(tweet_id)
-
- try:
- tweet = next(scr.get_items())
- except Exception as ex:
- logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
- return self.download_alternative(url, tweet_id)
-
- if tweet.media is None:
- logger.debug(f'No media found, archiving tweet text only')
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
- page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(tweet.json()))
- return self.generateArchiveResult(status="success", cdn_url=page_cdn, title=tweet.content, timestamp=tweet.date, hash=page_hash, screenshot=screenshot, wacz=wacz)
-
- urls = []
-
- for media in tweet.media:
- if type(media) == Video:
- variant = max(
- [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
- urls.append(variant.url)
- elif type(media) == Gif:
- urls.append(media.variants[0].url)
- elif type(media) == Photo:
- urls.append(media.fullUrl.replace('name=large', 'name=orig'))
- else:
- logger.warning(f"Could not get media URL of {media}")
-
- page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
-
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
-
- return self.generateArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date, title=tweet.content, wacz=wacz)
-
- def download_alternative(self, url, tweet_id):
- # https://stackoverflow.com/a/71867055/6196010
- logger.debug(f"Trying twitter hack for {url=}")
- hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
- r = requests.get(hack_url)
- if r.status_code != 200: return False
- tweet = r.json()
-
- urls = []
- for p in tweet["photos"]:
- urls.append(p["url"])
-
- # 1 tweet has 1 video max
- if "video" in tweet:
- v = tweet["video"]
- urls.append(self.choose_variant(v.get("variants", [])))
-
- logger.debug(f"Twitter hack got {urls=}")
-
- timestamp = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
- screenshot = self.get_screenshot(url)
- wacz = self.get_wacz(url)
- page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, r.text)
- return self.generateArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet["text"], wacz=wacz)
-
- def choose_variant(self, variants):
- # choosing the highest quality possible
- variant, width, height = None, 0, 0
- for var in variants:
- if var["type"] == "video/mp4":
- width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
- if width_height:
- w, h = int(width_height[1]), int(width_height[2])
- if w > width or h > height:
- width, height = w, h
- variant = var.get("src", variant)
- else:
- variant = var.get("src") if not variant else variant
- return variant
diff --git a/src/archivers/vk_archiver.py b/src/archivers/vk_archiver.py
deleted file mode 100644
index 1d38fa9..0000000
--- a/src/archivers/vk_archiver.py
+++ /dev/null
@@ -1,74 +0,0 @@
-import re, json, mimetypes, os
-
-from loguru import logger
-from vk_url_scraper import VkScraper, DateTimeEncoder
-
-from storages import Storage
-from .base_archiver import Archiver, ArchiveResult
-from configs import Config
-
-
-class VkArchiver(Archiver):
- """"
- VK videos are handled by YTDownloader, this archiver gets posts text and images.
- Currently only works for /wall posts
- """
- name = "vk"
- wall_pattern = re.compile(r"(wall.{0,1}\d+_\d+)")
- photo_pattern = re.compile(r"(photo.{0,1}\d+_\d+)")
-
- def __init__(self, storage: Storage, config: Config):
- super().__init__(storage, config)
- if config.vk_config != None:
- self.vks = VkScraper(config.vk_config.username, config.vk_config.password)
-
- def download(self, url, check_if_exists=False):
- if not hasattr(self, "vks") or self.vks is None:
- logger.debug("VK archiver was not supplied with credentials.")
- return False
-
- key = self.get_html_key(url)
- # if check_if_exists and self.storage.exists(key):
- # screenshot = self.get_screenshot(url)
- # cdn_url = self.storage.get_cdn_url(key)
- # return self.generateArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot)
-
- results = self.vks.scrape(url) # some urls can contain multiple wall/photo/... parts and all will be fetched
- if len(results) == 0:
- return False
-
- def dump_payload(p): return json.dumps(p, ensure_ascii=False, indent=4, cls=DateTimeEncoder)
- textual_output = ""
- title, datetime = results[0]["text"], results[0]["datetime"]
- urls_found = []
- for res in results:
- textual_output += f"id: {res['id']}
time utc: {res['datetime']}
text: {res['text']}
payload: {dump_payload(res['payload'])}
Made with bellingcat/auto-archiver