Generate archivers for Telegram posts with images; move generation to function in base_archiver

This commit is contained in:
Logan Williams
2022-02-28 08:41:45 +01:00
parent 63a2847ac9
commit 2d50703489
3 changed files with 79 additions and 58 deletions

View File

@@ -9,6 +9,7 @@ import hashlib
from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import TimeoutException
from loguru import logger from loguru import logger
import time import time
import requests
from storages import Storage from storages import Storage
from utils import mkdir_if_not_exists from utils import mkdir_if_not_exists
@@ -43,6 +44,55 @@ class Archiver(ABC):
def get_netloc(self, url): def get_netloc(self, url):
return urlparse(url).netloc return urlparse(url).netloc
def generate_media_page(self, urls, url, object):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
page = f'''<html><head><title>{url}</title></head>
<body>
<h2>Archived media from {self.name}</h2>
<h3><a href="{url}">{url}</a></h3><ul>'''
thumbnail = None
for media_url in urls:
path = urlparse(media_url).path
key = self.get_key(path.replace("/", "_"))
if '.' not in path:
key += '.jpg'
filename = 'tmp/' + key
d = requests.get(media_url, headers=headers)
with open(filename, 'wb') as f:
f.write(d.content)
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
if thumbnail is None:
thumbnail = cdn_url
page += f'''<li><a href="{cdn_url}">{media_url}</a>: {hash}</li>'''
page += f"</ul><h2>{self.name} object data:</h2><code>{object}</code>"
page += f"</body></html>"
page_key = self.get_key(urlparse(url).path.replace("/", "_") + ".html")
page_filename = 'tmp/' + page_key
page_cdn = self.storage.get_cdn_url(page_key)
with open(page_filename, "w") as f:
f.write(page)
page_hash = self.get_hash(page_filename)
self.storage.upload(page_filename, page_key, extra_args={
'ACL': 'public-read', 'ContentType': 'text/html'})
return (page_cdn, page_hash, thumbnail)
def get_key(self, filename): def get_key(self, filename):
""" """
@@ -52,6 +102,11 @@ class Archiver(ABC):
_id, extension = os.path.splitext(tail) # returns [filename, .ext] _id, extension = os.path.splitext(tail) # returns [filename, .ext]
if 'unknown_video' in _id: if 'unknown_video' in _id:
_id = _id.replace('unknown_video', 'jpg') _id = _id.replace('unknown_video', 'jpg')
# long filenames can cause problems, so trim them if necessary
if len(_id) > 128:
_id = _id[-128:]
return f'{self.name}_{_id}{extension}' return f'{self.name}_{_id}{extension}'
def get_hash(self, filename): def get_hash(self, filename):
@@ -127,7 +182,8 @@ class Archiver(ABC):
thumb_index = key_folder + 'index.html' thumb_index = key_folder + 'index.html'
self.storage.upload(index_fname, thumb_index, extra_args={'ACL': 'public-read', 'ContentType': 'text/html'}) self.storage.upload(index_fname, thumb_index, extra_args={
'ACL': 'public-read', 'ContentType': 'text/html'})
shutil.rmtree(thumbnails_folder) shutil.rmtree(thumbnails_folder)
thumb_index_cdn_url = self.storage.get_cdn_url(thumb_index) thumb_index_cdn_url = self.storage.get_cdn_url(thumb_index)

View File

@@ -1,6 +1,9 @@
import os import os
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from loguru import logger
import re
import html
from .base_archiver import Archiver, ArchiveResult from .base_archiver import Archiver, ArchiveResult
@@ -24,12 +27,24 @@ class TelegramArchiver(Archiver):
if url[-8:] != "?embed=1": if url[-8:] != "?embed=1":
url += "?embed=1" url += "?embed=1"
screenshot = self.get_screenshot(url)
t = requests.get(url, headers=headers) t = requests.get(url, headers=headers)
s = BeautifulSoup(t.content, 'html.parser') s = BeautifulSoup(t.content, 'html.parser')
video = s.find("video") video = s.find("video")
if video is None: if video is None:
return False # could not find video logger.warning("could not find video")
image_tags = s.find_all(class_="js-message_photo")
images = []
for im in image_tags:
urls = [u.replace("'", "") for u in re.findall('url\((.*?)\)', im['style'])]
images += urls
page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content)))
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=s.find_all('time')[1].get('datetime'))
video_url = video.get('src') video_url = video.get('src')
video_id = video_url.split('/')[-1].split('?')[0] video_id = video_url.split('/')[-1].split('?')[0]
@@ -50,7 +65,6 @@ class TelegramArchiver(Archiver):
self.storage.upload(filename, key) self.storage.upload(filename, key)
hash = self.get_hash(filename) hash = self.get_hash(filename)
screenshot = self.get_screenshot(url)
# extract duration from HTML # extract duration from HTML
duration = s.find_all('time')[0].contents[0] duration = s.find_all('time')[0].contents[0]

View File

@@ -13,10 +13,6 @@ class TwitterArchiver(Archiver):
if 'twitter.com' != self.get_netloc(url): if 'twitter.com' != self.get_netloc(url):
return False return False
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
tweet_id = urlparse(url).path.split('/') tweet_id = urlparse(url).path.split('/')
if 'status' in tweet_id: if 'status' in tweet_id:
i = tweet_id.index('status') i = tweet_id.index('status')
@@ -35,67 +31,22 @@ class TwitterArchiver(Archiver):
if tweet.media is None: if tweet.media is None:
return False return False
archived_media = [] urls = []
for media in tweet.media: for media in tweet.media:
if type(media) == Video: if type(media) == Video:
variant = max( variant = max(
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate) [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
media_url = variant.url urls.append(variant.url)
elif type(media) == Gif: elif type(media) == Gif:
media_url = media.variants[0].url urls.append(media.variants[0].url)
elif type(media) == Photo: elif type(media) == Photo:
media_url = media.fullUrl urls.append(media.fullUrl)
else: else:
logger.warning(f"Could not get media URL of {media}") logger.warning(f"Could not get media URL of {media}")
media_url = None
if media_url is not None: page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
path = urlparse(media_url).path
key = self.get_key(path.replace("/", "_"))
if '.' not in path:
key += '.jpg'
filename = 'tmp/' + key
d = requests.get(media_url, headers=headers)
with open(filename, 'wb') as f:
f.write(d.content)
self.storage.upload(filename, key)
hash = self.get_hash(filename)
archived_media.append((self.storage.get_cdn_url(key), hash))
page = f'''<html><head><title>{url}</title></head>
<body>
<h2>Archived media from tweet</h2>
<h3><a href="{url}">{url}</a></h3><ul>'''
for media in archived_media:
page += f'''<li><a href="{media[0]}">{media[0]}</a>: {media[1]}</li>'''
page += f"<h2>Tweet data:</h2><code>{tweet.json()}</code>"
page += f"</body></html>"
page_key = self.get_key(urlparse(url).path.replace("/", "_") + ".html")
page_filename = 'tmp/' + key
page_cdn = self.storage.get_cdn_url(page_key)
with open(page_filename, "w") as f:
f.write(page)
page_hash = self.get_hash(page_filename)
self.storage.upload(page_filename, page_key, extra_args={
'ACL': 'public-read', 'ContentType': 'text/html'})
screenshot = self.get_screenshot(url) screenshot = self.get_screenshot(url)
if (len(archived_media) > 0): return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date)
thumbnail = archived_media[0][0]
else:
thumbnail = None
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail)