From 5648e957d0d1f759f68a2dfe21a137bf7c0ca608 Mon Sep 17 00:00:00 2001 From: Tristan Lee Date: Wed, 27 Apr 2022 16:41:24 -0500 Subject: [PATCH] improved consistency of code formatting and added _STYLE_MEDIA_URL_PATTERN as variable --- snscrape/modules/telegram.py | 64 +++++++++++++++++------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 9cd7573..ab44561 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -13,7 +13,7 @@ import base64 _logger = logging.getLogger(__name__) _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') - +_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)') @dataclasses.dataclass class LinkPreview: @@ -45,6 +45,7 @@ class Channel(snscrape.base.Entity): def __str__(self): return f'https://t.me/s/{self.username}' + @dataclasses.dataclass class TelegramPost(snscrape.base.Item): url: str @@ -64,30 +65,36 @@ class TelegramPost(snscrape.base.Item): def __str__(self): return self.url + class Medium: pass + @dataclasses.dataclass class Photo(Medium): url: str + @dataclasses.dataclass class Video(Medium): thumbnailUrl: str duration: float url: typing.Optional[str] = None + @dataclasses.dataclass class VoiceMessage(Medium): url: str duration: str bars:typing.List[float] + @dataclasses.dataclass class Gif(Medium): thumbnailUrl: str url: typing.Optional[str] = None + class TelegramChannelScraper(snscrape.base.Scraper): name = 'telegram-channel' @@ -120,11 +127,13 @@ class TelegramChannelScraper(snscrape.base.Scraper): date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') media = [] outlinks = [] + mentions = [] + hashtags = [] forwarded = None forwardedUrl = None - if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): - forwardedUrl = forward_tag['href'] + if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): + forwardedUrl = forwardTag['href'] forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] forwarded = Channel(username = forwardedName) @@ -133,9 +142,6 @@ class TelegramChannelScraper(snscrape.base.Scraper): else: content = None - outlinks = [] - mentions = [] - hashtags = [] for link in post.find_all('a'): if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): # Author links at the top (avatar and name) @@ -144,13 +150,13 @@ class TelegramChannelScraper(snscrape.base.Scraper): style = link.attrs.get('style', '') # Generic filter of links to the post itself, catches videos, photos, and the date link if style != '': - imageUrls = re.findall('url\(\'(.*?)\'\)', style) + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) if len(imageUrls) == 1: media.append(Photo(url = imageUrls[0])) continue if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): style = link.attrs.get('style', '') - imageUrls = re.findall('url\(\'(.*?)\'\)', style) + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) if len(imageUrls) == 1: media.append(Photo(url = imageUrls[0])) # resp = self._get(image[0]) @@ -167,37 +173,34 @@ class TelegramChannelScraper(snscrape.base.Scraper): if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): outlinks.append(href) - for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): - audioUrl = voice_player.find('audio')['src'] - durationStr = voice_player.find('time').text.split(':') + for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): + audioUrl = voicePlayer.find('audio')['src'] + durationStr = voicePlayer.find('time').text duration = durationStrToSeconds(durationStr) - barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')] + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) - for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): - iTag = video_player.find('i') + for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): + iTag = videoPlayer.find('i') if iTag is None: videoUrl = None videoThumbnailUrl = None else: style = iTag['style'] - videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0] - videoTag = video_player.find('video') - if videoTag is None: - videoUrl = None - else: - videoUrl = videoTag['src'] + videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] + videoTag = videoPlayer.find('video') + videoUrl = None if videoTag is None else videoTag['src'] mKwargs = { 'thumbnailUrl': videoThumbnailUrl, 'url': videoUrl, } - timeTag = video_player.find('time') + timeTag = videoPlayer.find('time') if timeTag is None: cls = Gif else: cls = Video - durationStr = video_player.find('time').text.split(':') + durationStr = videoPlayer.find('time').text mKwargs['duration'] = durationStrToSeconds(durationStr) media.append(cls(**mKwargs)) @@ -221,10 +224,7 @@ class TelegramChannelScraper(snscrape.base.Scraper): outlinks.remove(kwargs['href']) viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') - if viewsSpan is None: - views = None - else: - views = parse_num(viewsSpan.text) + views = None if viewsSpan is None else parse_num(viewsSpan.text) yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) @@ -318,16 +318,14 @@ def parse_num(s): return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) elif s.endswith('K'): return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) - else: - return int(s), 1 + return int(s), 1 def durationStrToSeconds(durationStr): - return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) + durationList = durationStr.split(':') + return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))]) def telegramResponseOkCallback(r): if r.status_code == 200: return (True, None) - elif r.status_code // 100 == 5: - return (False, f'status code: {r.status_code}') - else: - return (False, None) \ No newline at end of file + return (False, f'{r.status_code=}') + \ No newline at end of file