mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-11 20:58:29 +03:00
telegram archiver
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
# we need to explicitly expose the available imports here
|
||||
from .base_archiver import Archiver, ArchiveResult
|
||||
from .archiver import Archiverv2
|
||||
from .telegram_archiver import TelegramArchiver
|
||||
# from .telegram_archiver import TelegramArchiver
|
||||
# from .telethon_archiver import TelethonArchiver
|
||||
# from .tiktok_archiver import TiktokArchiver
|
||||
from .wayback_archiver import WaybackArchiver
|
||||
@@ -15,4 +15,5 @@ from .telethon_archiverv2 import TelethonArchiver
|
||||
from .twitter_archiverv2 import TwitterArchiver
|
||||
from .twitter_api_archiverv2 import TwitterApiArchiver
|
||||
from .instagram_archiverv2 import InstagramArchiver
|
||||
from .tiktok_archiverv2 import TiktokArchiver
|
||||
from .tiktok_archiverv2 import TiktokArchiver
|
||||
from .telegram_archiverv2 import TelegramArchiver
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
from metadata import Metadata
|
||||
from steps.step import Step
|
||||
import mimetypes, requests
|
||||
@@ -23,7 +24,7 @@ class Archiverv2(Step):
|
||||
# used when archivers need to login or do other one-time setup
|
||||
pass
|
||||
|
||||
def clean_url(self, url:str) -> str:
|
||||
def clean_url(self, url: str) -> str:
|
||||
# used to clean unnecessary URL parameters
|
||||
return url
|
||||
|
||||
@@ -37,13 +38,23 @@ class Archiverv2(Step):
|
||||
return mime.split("/")[0]
|
||||
return ""
|
||||
|
||||
def download_from_url(self, url:str, to_filename:str) -> None:
|
||||
def download_from_url(self, url: str, to_filename: str = None, item: Metadata = None) -> str:
|
||||
"""
|
||||
downloads a URL to provided filename, or inferred from URL, returns local filename, if item is present will use its tmp_dir
|
||||
"""
|
||||
if not to_filename:
|
||||
to_filename = url.split('/')[-1].split('?')[0]
|
||||
if len(to_filename) > 64:
|
||||
to_filename = to_filename[-64:]
|
||||
if item:
|
||||
to_filename = os.path.join(item.get_tmp_dir(), to_filename)
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
|
||||
}
|
||||
d = requests.get(url, headers=headers)
|
||||
with open(to_filename, 'wb') as f:
|
||||
f.write(d.content)
|
||||
return to_filename
|
||||
|
||||
@abstractmethod
|
||||
def download(self, item: Metadata) -> Metadata: pass
|
||||
|
||||
75
src/archivers/telegram_archiverv2.py
Normal file
75
src/archivers/telegram_archiverv2.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import requests, re
|
||||
|
||||
import html
|
||||
from bs4 import BeautifulSoup
|
||||
from loguru import logger
|
||||
|
||||
from metadata import Metadata
|
||||
from media import Media
|
||||
from .archiver import Archiverv2
|
||||
|
||||
|
||||
class TelegramArchiver(Archiverv2):
|
||||
"""
|
||||
Archiver for telegram that does not require login, but the telethon_archiver is much more advised, will only return if at least one image or one video is found
|
||||
"""
|
||||
name = "telegram_archiver"
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
super().__init__(config)
|
||||
|
||||
@staticmethod
|
||||
def configs() -> dict:
|
||||
return {}
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
url = item.get_url()
|
||||
# detect URLs that we definitely cannot handle
|
||||
if 't.me' != item.netloc:
|
||||
return False
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
|
||||
}
|
||||
|
||||
# TODO: check if we can do this more resilient to variable URLs
|
||||
if url[-8:] != "?embed=1":
|
||||
url += "?embed=1"
|
||||
|
||||
t = requests.get(url, headers=headers)
|
||||
s = BeautifulSoup(t.content, 'html.parser')
|
||||
|
||||
result = Metadata()
|
||||
result.set_content(html.escape(str(t.content)))
|
||||
if (timestamp := (s.find_all('time') or [{}])[0].get('datetime')):
|
||||
result.set_timestamp(timestamp)
|
||||
|
||||
video = s.find("video")
|
||||
if video is None:
|
||||
logger.warning("could not find video")
|
||||
image_tags = s.find_all(class_="js-message_photo")
|
||||
|
||||
image_urls = []
|
||||
for im in image_tags:
|
||||
urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
|
||||
image_urls += urls
|
||||
|
||||
if not len(image_urls): return False
|
||||
for img_url in image_urls:
|
||||
result.add_media(Media(self.download_from_url(img_url)))
|
||||
else:
|
||||
video_url = video.get('src')
|
||||
m_video = Media(self.download_from_url(video_url))
|
||||
# extract duration from HTML
|
||||
try:
|
||||
duration = s.find_all('time')[0].contents[0]
|
||||
if ':' in duration:
|
||||
duration = float(duration.split(
|
||||
':')[0]) * 60 + float(duration.split(':')[1])
|
||||
else:
|
||||
duration = float(duration)
|
||||
m_video.set("duration", duration)
|
||||
except: pass
|
||||
result.add_media(m_video)
|
||||
|
||||
return result.success("telegram")
|
||||
@@ -117,7 +117,7 @@ class TelethonArchiver(Archiverv2):
|
||||
|
||||
if post is None: return False
|
||||
logger.info(f"fetched telegram {post.id=}")
|
||||
|
||||
|
||||
media_posts = self._get_media_posts_in_group(chat, post)
|
||||
logger.debug(f'got {len(media_posts)=} for {url=}')
|
||||
|
||||
@@ -126,7 +126,7 @@ class TelethonArchiver(Archiverv2):
|
||||
group_id = post.grouped_id if post.grouped_id is not None else post.id
|
||||
title = post.message
|
||||
for mp in media_posts:
|
||||
if len(mp.message) > len(title): title = mp.message # save the longest text found (usually only 1)
|
||||
if len(mp.message) > len(title): title = mp.message # save the longest text found (usually only 1)
|
||||
|
||||
# media can also be in entities
|
||||
if mp.entities:
|
||||
@@ -134,8 +134,7 @@ class TelethonArchiver(Archiverv2):
|
||||
if len(other_media_urls):
|
||||
logger.debug(f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}")
|
||||
for i, om_url in enumerate(other_media_urls):
|
||||
filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{i}')
|
||||
self.download_from_url(om_url, filename)
|
||||
filename = self.download_from_url(om_url, f'{chat}_{group_id}_{i}', item)
|
||||
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
|
||||
|
||||
filename_dest = os.path.join(tmp_dir, f'{chat}_{group_id}', str(mp.id))
|
||||
|
||||
@@ -74,8 +74,7 @@ class TwitterApiArchiver(TwitterArchiver, Archiverv2):
|
||||
continue
|
||||
logger.info(f"Found media {media}")
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}{ext}')
|
||||
self.download_from_url(media.get("src"), media.filename)
|
||||
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
|
||||
result.add_media(media)
|
||||
|
||||
result.set_content(json.dumps({
|
||||
|
||||
@@ -69,8 +69,7 @@ class TwitterArchiver(Archiverv2):
|
||||
logger.warning(f"Could not get media URL of {tweet_media}")
|
||||
continue
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}{ext}')
|
||||
self.download_from_url(media.get("src"), media.filename)
|
||||
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
|
||||
result.add_media(media)
|
||||
|
||||
return result.success("twitter")
|
||||
@@ -103,8 +102,7 @@ class TwitterArchiver(Archiverv2):
|
||||
for u in urls:
|
||||
media = Media()
|
||||
media.set("src", u)
|
||||
media.filename = os.path.join(item.get_tmp_dir(), f'{slugify(url)}_{i}')
|
||||
self.download_from_url(u, media.filename)
|
||||
media.filename = self.download_from_url(u, f'{slugify(url)}_{i}', item)
|
||||
result.add_media(media)
|
||||
|
||||
# .set_title(tweet["TODO"])
|
||||
|
||||
@@ -4,9 +4,9 @@ from ast import List, Set
|
||||
from typing import Any, Union, Dict
|
||||
from dataclasses import dataclass, field
|
||||
import datetime, mimetypes
|
||||
from urllib.parse import urlparse
|
||||
from loguru import logger
|
||||
# import json
|
||||
|
||||
from dateutil.parser import parse as parse_dt
|
||||
from media import Media
|
||||
|
||||
|
||||
@@ -66,6 +66,10 @@ class Metadata:
|
||||
def is_success(self) -> bool:
|
||||
return "success" in self.status
|
||||
|
||||
@property # getter .netloc
|
||||
def netloc(self) -> str:
|
||||
return urlparse(self.get_url()).netloc
|
||||
|
||||
|
||||
# custom getter/setters
|
||||
|
||||
@@ -96,6 +100,8 @@ class Metadata:
|
||||
return self.get("tmp_dir")
|
||||
|
||||
def set_timestamp(self, timestamp: datetime.datetime) -> Metadata:
|
||||
if type(timestamp) == str:
|
||||
timestamp = parse_dt(timestamp)
|
||||
assert type(timestamp) == datetime.datetime, "set_timestamp expects a datetime instance"
|
||||
return self.set("timestamp", timestamp)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user