mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Implement media archiving after the initial scrape for Twitter and Telethon
This commit is contained in:
1
Pipfile
1
Pipfile
@@ -21,6 +21,7 @@ pytesseract = "*"
|
||||
pyexiftool = {git = "https://github.com/smarnach/pyexiftool.git"}
|
||||
instaloader = "*"
|
||||
gspread = "*"
|
||||
cryptg = "*"
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
|
||||
@@ -234,6 +234,16 @@ class Scraper:
|
||||
|
||||
return archived_url
|
||||
|
||||
def archive_files(self, result: ScraperResult) -> ScraperResult:
|
||||
for url in result.archived_urls:
|
||||
if result.archived_urls[url] is None:
|
||||
media_blob, content_type, key = self.url_to_blob(url)
|
||||
archived_url = self.archive_blob(media_blob, content_type, key)
|
||||
result.archived_urls[url] = archived_url
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def can_handle(self, channel: Channel) -> bool:
|
||||
"""Whether or not the scraper can scrape the specified channel.
|
||||
|
||||
@@ -353,6 +363,36 @@ class ScraperController:
|
||||
if not handled:
|
||||
logger.warning(f"No handler found for Channel {channel}")
|
||||
|
||||
@logger.catch(reraise = True)
|
||||
def archive_unarchived_media(self):
|
||||
if self.session is None:
|
||||
logger.error("No DB session")
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
|
||||
posts = session.query(ScraperResult).filter(ScraperResult.archived_urls.like("%null%")).all()
|
||||
|
||||
logger.info(f"Found {len(posts)} posts without media. Archiving now")
|
||||
|
||||
for post in posts:
|
||||
handled = False
|
||||
|
||||
for scraper in self.scrapers:
|
||||
if scraper.__version__ == post.scraper:
|
||||
handled = True
|
||||
logger.info(f"{scraper} is archiving media for {post}")
|
||||
post = scraper.archive_files(post)
|
||||
|
||||
session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls})
|
||||
session.commit()
|
||||
break
|
||||
|
||||
if not handled:
|
||||
logger.warning(f"No handler found for post scraped with {post.scraper}")
|
||||
|
||||
session.commit()
|
||||
|
||||
def connect_to_db(self, engine):
|
||||
"""Connect the specified SQLAlchemy engine to the controller.
|
||||
"""
|
||||
|
||||
@@ -4,9 +4,11 @@ import os
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
from telethon.sync import TelegramClient
|
||||
from telethon.tl import types
|
||||
|
||||
from cisticola.base import Channel, ScraperResult
|
||||
from cisticola.scraper.base import Scraper
|
||||
@@ -23,12 +25,79 @@ class TelegramTelethonScraper(Scraper):
|
||||
username = username.split('s/')[1]
|
||||
return username
|
||||
|
||||
def archive_files(self, result: ScraperResult, client : TelegramClient = None) -> ScraperResult:
|
||||
if len(result.archived_urls.keys()) == 0:
|
||||
return result
|
||||
|
||||
if client is None:
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
phone = os.environ['TELEGRAM_PHONE']
|
||||
|
||||
with TelegramClient(phone, api_id, api_hash) as client:
|
||||
return self.archive_files(result, client)
|
||||
|
||||
if len(list(result.archived_urls.keys())) != 1:
|
||||
logger.warning(f"Expected 1 key in archived_urls, found {result.archived_keys}")
|
||||
else:
|
||||
key = list(result.archived_urls.keys())[0]
|
||||
|
||||
if result.archived_urls[key] is None:
|
||||
raw = json.loads(result.raw_data)
|
||||
|
||||
message = client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']])
|
||||
|
||||
blob = None
|
||||
if len(message) > 0:
|
||||
blob, output_file_with_ext = self.archive_post_media(message[0], client)
|
||||
else:
|
||||
logger.warning("No message retrieved")
|
||||
|
||||
if blob is not None:
|
||||
# TODO specify Content-Type
|
||||
archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext)
|
||||
result.archived_urls[key] = archived_url
|
||||
return result
|
||||
else:
|
||||
logger.warning("Downloaded blob was None")
|
||||
|
||||
return result
|
||||
|
||||
def archive_post_media(self, post : types.Message, client : TelegramClient = None):
|
||||
logger.debug(f"Archiving post {post}")
|
||||
|
||||
if post.media is None:
|
||||
return None, None
|
||||
|
||||
logger.debug(f"Archiving media {post.media}")
|
||||
|
||||
if client is None:
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
phone = os.environ['TELEGRAM_PHONE']
|
||||
|
||||
with TelegramClient(phone, api_id, api_hash) as client:
|
||||
return self.archive_post_media(post, client=client)
|
||||
|
||||
key = f'{post.peer_id.channel_id}_{post.id}'
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
output_file = Path(temp_dir, key)
|
||||
|
||||
client.download_media(post.media, output_file)
|
||||
|
||||
output_file_with_ext = os.listdir(temp_dir)[0]
|
||||
filename = Path(temp_dir, output_file_with_ext)
|
||||
|
||||
with open(filename, 'rb') as f:
|
||||
blob = f.read()
|
||||
return (blob, output_file_with_ext)
|
||||
|
||||
def can_handle(self, channel):
|
||||
if channel.platform == "Telegram" and channel.public and not channel.chat:
|
||||
return True
|
||||
|
||||
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
|
||||
|
||||
username = self.get_username_from_url(channel.url)
|
||||
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
@@ -36,34 +105,27 @@ class TelegramTelethonScraper(Scraper):
|
||||
phone = os.environ['TELEGRAM_PHONE']
|
||||
|
||||
with TelegramClient(phone, api_id, api_hash) as client:
|
||||
|
||||
for post in client.iter_messages(username):
|
||||
post_url = f'{channel.url}/{post.id}'
|
||||
|
||||
logger.info(f"Archiving post {post_url} from {post.date}")
|
||||
|
||||
if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc):
|
||||
logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}')
|
||||
break
|
||||
|
||||
post_url = f'{channel.url}/{post.id}'
|
||||
key = f'{username}_{post.id}'
|
||||
|
||||
archived_urls = {}
|
||||
logger.info(f"Archiving post {post_url}")
|
||||
|
||||
if archive_media:
|
||||
if post.media is not None:
|
||||
archived_urls[post_url] = None
|
||||
|
||||
if post.media is not None:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
output_file = Path(temp_dir, key)
|
||||
client.download_media(post.media, output_file)
|
||||
|
||||
output_file_with_ext = os.listdir(temp_dir)[0]
|
||||
filename = Path(temp_dir, output_file_with_ext)
|
||||
|
||||
with open(filename, 'rb') as f:
|
||||
blob = f.read()
|
||||
|
||||
# TODO specify Content-Type
|
||||
archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext)
|
||||
archived_urls[post_url] = archived_url
|
||||
if archive_media:
|
||||
blob, output_file_with_ext = self.archive_post_media(post, client)
|
||||
if blob is not None:
|
||||
# TODO specify Content-Type
|
||||
archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext)
|
||||
archived_urls[post_url] = archived_url
|
||||
|
||||
yield ScraperResult(
|
||||
scraper=self.__version__,
|
||||
|
||||
@@ -28,31 +28,33 @@ class TwitterScraper(Scraper):
|
||||
|
||||
archived_urls = {}
|
||||
|
||||
if archive_media:
|
||||
media_list = []
|
||||
if tweet.media:
|
||||
media_list += tweet.media
|
||||
media_list = []
|
||||
if tweet.media:
|
||||
media_list += tweet.media
|
||||
|
||||
if tweet.retweetedTweet and tweet.retweetedTweet.media:
|
||||
media_list += tweet.retweetedTweet.media
|
||||
if tweet.retweetedTweet and tweet.retweetedTweet.media:
|
||||
media_list += tweet.retweetedTweet.media
|
||||
|
||||
if tweet.quotedTweet and tweet.quotedTweet.media:
|
||||
media_list += tweet.quotedTweet.media
|
||||
if tweet.quotedTweet and tweet.quotedTweet.media:
|
||||
media_list += tweet.quotedTweet.media
|
||||
|
||||
for media in media_list:
|
||||
if type(media) == Video:
|
||||
variant = max(
|
||||
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
|
||||
url = variant.url
|
||||
elif type(media) == Gif:
|
||||
url = media.variants[0].url
|
||||
elif type(media) == Photo:
|
||||
url = media.fullUrl
|
||||
else:
|
||||
logger.warning(f"Could not get media URL of {media}")
|
||||
url = None
|
||||
for media in media_list:
|
||||
if type(media) == Video:
|
||||
variant = max(
|
||||
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
|
||||
url = variant.url
|
||||
elif type(media) == Gif:
|
||||
url = media.variants[0].url
|
||||
elif type(media) == Photo:
|
||||
url = media.fullUrl
|
||||
else:
|
||||
logger.warning(f"Could not get media URL of {media}")
|
||||
url = None
|
||||
|
||||
if url is not None and url not in archived_urls:
|
||||
if url is not None and url not in archived_urls:
|
||||
archived_urls[url] = None
|
||||
|
||||
if archive_media:
|
||||
media_blob, content_type, key = self.url_to_blob(url)
|
||||
archived_url = self.archive_blob(media_blob, content_type, key)
|
||||
archived_urls[url] = archived_url
|
||||
|
||||
21
test.py
21
test.py
@@ -28,7 +28,6 @@ scrapers = [
|
||||
GettrScraper(),
|
||||
OdyseeScraper(),
|
||||
RumbleScraper(),
|
||||
TelegramSnscrapeScraper(),
|
||||
TelegramTelethonScraper(),
|
||||
TwitterScraper()]
|
||||
|
||||
@@ -43,15 +42,15 @@ session = session_generator()
|
||||
gc = gspread.service_account(filename='service_account.json')
|
||||
|
||||
# Open a sheet from a spreadsheet in one go
|
||||
wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1yxd6-2Mp0jZ8r9XJklb39WE-iIMrKRyA2kymJcIfGis/edit#gid=0")
|
||||
wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1k5VgqREoA3v1r7bkVq7TOTRDtdYqTMWkQnsZpRbntpw/edit#gid=0")
|
||||
channels = wks.worksheet("channels").get_all_records()
|
||||
|
||||
for c in channels:
|
||||
del c['followers']
|
||||
|
||||
for k in c.keys():
|
||||
if c[k] == 'TRUE': c[k] = True
|
||||
if c[k] == 'FALSE': c[k] = False
|
||||
if c[k] == 'TRUE' or c[k] == 'yes': c[k] = True
|
||||
if c[k] == 'FALSE' or c[k] == 'no': c[k] = False
|
||||
|
||||
# check to see if this already exists,
|
||||
channel = session.query(Channel).filter_by(platform_id=c['platform_id'], platform=c['platform']).first()
|
||||
@@ -63,11 +62,13 @@ for c in channels:
|
||||
session.commit()
|
||||
|
||||
controller.connect_to_db(engine)
|
||||
controller.scrape_all_channels(archive_media = True)
|
||||
controller.scrape_all_channels(archive_media = False)
|
||||
|
||||
transformer = TwitterTransformer()
|
||||
controller.archive_unarchived_media()
|
||||
|
||||
etl_controller = ETLController()
|
||||
etl_controller.register_transformer(transformer)
|
||||
etl_controller.connect_to_db(engine)
|
||||
etl_controller.transform_all_untransformed()
|
||||
# transformer = TwitterTransformer()
|
||||
|
||||
# etl_controller = ETLController()
|
||||
# etl_controller.register_transformer(transformer)
|
||||
# etl_controller.connect_to_db(engine)
|
||||
# etl_controller.transform_all_untransformed()
|
||||
|
||||
Reference in New Issue
Block a user