Merge pull request #2 from bellingcat/telegram-media

Implemented JustAnotherArchivist's requested changes to Telegram scraper from PR
This commit is contained in:
Tristan Lee
2022-05-09 07:23:39 -07:00
committed by GitHub

View File

@@ -13,7 +13,7 @@ import base64
_logger = logging.getLogger(__name__)
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')
@dataclasses.dataclass
class LinkPreview:
@@ -24,29 +24,12 @@ class LinkPreview:
image: typing.Optional[str] = None
@dataclasses.dataclass
class TelegramPost(snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: list
images: list
videos: list
forwarded: str
linkPreview: typing.Optional[LinkPreview] = None
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
def __str__(self):
return self.url
@dataclasses.dataclass
class Channel(snscrape.base.Entity):
username: str
title: str
verified: bool
photo: str
title: typing.Optional[str] = None
verified: typing.Optional[bool] = None
photo: typing.Optional[str] = None
description: typing.Optional[str] = None
members: typing.Optional[int] = None
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
@@ -63,6 +46,55 @@ class Channel(snscrape.base.Entity):
return f'https://t.me/s/{self.username}'
@dataclasses.dataclass
class TelegramPost(snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: typing.List[str] = None
mentions: typing.List[str] = None
hashtags: typing.List[str] = None
forwarded: typing.Optional['Channel'] = None
forwardedUrl: typing.Optional[str] = None
media: typing.Optional[typing.List['Medium']] = None
views: typing.Optional[int] = None
linkPreview: typing.Optional[LinkPreview] = None
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
def __str__(self):
return self.url
class Medium:
pass
@dataclasses.dataclass
class Photo(Medium):
url: str
@dataclasses.dataclass
class Video(Medium):
thumbnailUrl: str
duration: float
url: typing.Optional[str] = None
@dataclasses.dataclass
class VoiceMessage(Medium):
url: str
duration: str
bars:typing.List[float]
@dataclasses.dataclass
class Gif(Medium):
thumbnailUrl: str
url: typing.Optional[str] = None
class TelegramChannelScraper(snscrape.base.Scraper):
name = 'telegram-channel'
@@ -93,48 +125,85 @@ class TelegramChannelScraper(snscrape.base.Scraper):
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
url = rawUrl.replace('//t.me/', '//t.me/s/')
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
images = []
videos = []
media = []
outlinks = []
mentions = []
hashtags = []
forwarded = None
forwardedUrl = None
if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwardedUrl = forwardTag['href']
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
forwarded = Channel(username = forwardedName)
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
content = message.get_text(separator="\n")
for video_tag in post.find_all('video'):
videos.append(video_tag['src'])
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
outlinks = []
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
image = re.findall('url\(\'(.*?)\'\)', style)
if len(image) == 1:
images.append(image[0])
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
image = re.findall('url\(\'(.*?)\'\)', style)
if len(image) == 1:
images.append(image[0])
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if href not in outlinks:
outlinks.append(href)
else:
content = None
outlinks = []
images = []
videos = []
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
if link.text.startswith('@'):
mentions.append(link.text.strip('@'))
continue
if link.text.startswith('#'):
hashtags.append(link.text.strip('#'))
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href)
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voicePlayer.find('audio')['src']
durationStr = voicePlayer.find('time').text
duration = durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = videoPlayer.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = videoPlayer.find('video')
videoUrl = None if videoTag is None else videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = videoPlayer.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = videoPlayer.find('time').text
mKwargs['duration'] = durationStrToSeconds(durationStr)
media.append(cls(**mKwargs))
linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
@@ -151,20 +220,40 @@ class TelegramChannelScraper(snscrape.base.Scraper):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
views = None if viewsSpan is None else parse_num(viewsSpan.text)
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
def get_items(self):
r, soup = self._initial_page()
if '/s/' not in r.url:
_logger.warning('No public post list for this user')
return
nextPageUrl = ''
while True:
yield from self._soup_to_items(soup, r.url)
try:
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
# if message 1 is the first message in the page, terminate scraping
break
except:
pass
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
break
# some pages are missing a "tme_messages_more" tag, causing early termination
if '=' not in nextPageUrl:
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
if nextPostIndex > 20:
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
r = self._get(nextPageUrl, headers = self._headers)
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
@@ -204,15 +293,6 @@ class TelegramChannelScraper(snscrape.base.Scraper):
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
kwargs['description'] = descriptionDiv.text
def parse_num(s):
s = s.replace(' ', '')
if s.endswith('M'):
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
elif s.endswith('K'):
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
else:
return int(s), 1
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
type_ = div.find('span', class_ = 'counter_type').text
@@ -231,3 +311,21 @@ class TelegramChannelScraper(snscrape.base.Scraper):
@classmethod
def _cli_from_args(cls, args):
return cls._cli_construct(args, args.channel)
def parse_num(s):
s = s.replace(' ', '')
if s.endswith('M'):
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
elif s.endswith('K'):
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
return int(s), 1
def durationStrToSeconds(durationStr):
durationList = durationStr.split(':')
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
def telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)
return (False, f'{r.status_code=}')