From a7eb54d226b5397a7a3c1cd502f22a8a31f719fd Mon Sep 17 00:00:00 2001 From: Tristan Lee Date: Wed, 30 Mar 2022 21:07:17 -0500 Subject: [PATCH] implemented Media dataclasses for Telegram, and added variable for extracting a post's view count --- snscrape/modules/telegram.py | 94 ++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 0f093ee..0484c9c 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -30,9 +30,9 @@ class TelegramPost(snscrape.base.Item): date: datetime.datetime content: str outlinks: list - images: list - videos: list + media: typing.Optional[typing.List['Medium']] forwarded: str + views: int = None linkPreview: typing.Optional[LinkPreview] = None outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') @@ -62,6 +62,29 @@ class Channel(snscrape.base.Entity): def __str__(self): return f'https://t.me/s/{self.username}' +class Medium: + pass + + +@dataclasses.dataclass +class Photo(Medium): + previewUrl: str + fullUrl: str + +@dataclasses.dataclass +class Image(Medium): + url: str + +@dataclasses.dataclass +class Video(Medium): + thumbnailUrl: str + duration: float + url: typing.Optional[str] = None + +@dataclasses.dataclass +class Gif(Medium): + thumbnailUrl: str + url: typing.Optional[str] = None class TelegramChannelScraper(snscrape.base.Scraper): name = 'telegram-channel' @@ -93,18 +116,34 @@ class TelegramChannelScraper(snscrape.base.Scraper): _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') url = rawUrl.replace('//t.me/', '//t.me/s/') date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') - images = [] - videos = [] + media = [] forwarded = None if (message := post.find('div', class_ = 'tgme_widget_message_text')): content = message.get_text(separator="\n") - for video_tag in post.find_all('video'): - videos.append(video_tag['src']) + for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): + style = video_player.find('i')['style'] + videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style) + videoTag = video_player.find('video') + if videoTag is None: + videoUrl = None + else: + videoUrl = videoTag['src'] + mKwargs = { + 'thumbnailUrl': videoThumbnailUrl, + 'url': videoUrl, + } + timeTag = video_player.find('time') + if timeTag is None: + cls = Gif + else: + cls = Video + durationStr = video_player.find('time').text.split(':') + mKwargs['duration'] = sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) + media.append(cls(**mKwargs)) if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0] - outlinks = [] for link in post.find_all('a'): if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): @@ -114,15 +153,15 @@ class TelegramChannelScraper(snscrape.base.Scraper): style = link.attrs.get('style', '') # Generic filter of links to the post itself, catches videos, photos, and the date link if style != '': - image = re.findall('url\(\'(.*?)\'\)', style) - if len(image) == 1: - images.append(image[0]) + imageUrls = re.findall('url\(\'(.*?)\'\)', style) + if len(imageUrls) == 1: + media.append(Image(url = imageUrls[0])) continue if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): style = link.attrs.get('style', '') - image = re.findall('url\(\'(.*?)\'\)', style) - if len(image) == 1: - images.append(image[0]) + imageUrls = re.findall('url\(\'(.*?)\'\)', style) + if len(imageUrls) == 1: + media.append(Image(url = imageUrls[0])) # resp = self._get(image[0]) # encoded_string = base64.b64encode(resp.content) # Individual photo or video link @@ -133,8 +172,7 @@ class TelegramChannelScraper(snscrape.base.Scraper): else: content = None outlinks = [] - images = [] - videos = [] + media = [] linkPreview = None if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): kwargs = {} @@ -151,7 +189,13 @@ class TelegramChannelScraper(snscrape.base.Scraper): else: _logger.warning(f'Could not process link preview image on {url}') linkPreview = LinkPreview(**kwargs) - yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded) + viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') + if viewsSpan is None: + views = None + else: + views = parse_num(viewsSpan.text) + + yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, media = media, forwarded = forwarded, views = views) def get_items(self): r, soup = self._initial_page() @@ -204,15 +248,6 @@ class TelegramChannelScraper(snscrape.base.Scraper): if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): kwargs['description'] = descriptionDiv.text - def parse_num(s): - s = s.replace(' ', '') - if s.endswith('M'): - return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) - elif s.endswith('K'): - return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) - else: - return int(s), 1 - for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) type_ = div.find('span', class_ = 'counter_type').text @@ -231,3 +266,12 @@ class TelegramChannelScraper(snscrape.base.Scraper): @classmethod def _cli_from_args(cls, args): return cls._cli_construct(args, args.channel) + +def parse_num(s): + s = s.replace(' ', '') + if s.endswith('M'): + return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) + elif s.endswith('K'): + return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) + else: + return int(s), 1 \ No newline at end of file