From b276c3cc27f35e57f287fb0b815e19b94b31487f Mon Sep 17 00:00:00 2001 From: Tristan Lee Date: Sun, 17 Apr 2022 06:50:43 -0500 Subject: [PATCH] fixed issue where some videos and photos weren't being scraped (because they weren't in a post containing a 'tgme_widget_message_text' div --- snscrape/modules/telegram.py | 134 ++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 59 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index ad49066..c3d055d 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -65,14 +65,8 @@ class TelegramPost(snscrape.base.Item): class Medium: pass - @dataclasses.dataclass class Photo(Medium): - previewUrl: str - fullUrl: str - -@dataclasses.dataclass -class Image(Medium): url: str @dataclasses.dataclass @@ -81,6 +75,12 @@ class Video(Medium): duration: float url: typing.Optional[str] = None +@dataclasses.dataclass +class VoiceMessage(Medium): + url: str + duration: str + bars:typing.List[float] + @dataclasses.dataclass class Gif(Medium): thumbnailUrl: str @@ -117,70 +117,81 @@ class TelegramChannelScraper(snscrape.base.Scraper): url = rawUrl.replace('//t.me/', '//t.me/s/') date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') media = [] + outlinks = [] forwarded = None forwardedUrl = None + if (message := post.find('div', class_ = 'tgme_widget_message_text')): content = message.get_text(separator="\n") - for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): - iTag = video_player.find('i') - if iTag is None: - videoUrl = None - videoThumbnailUrl = None - else: - style = iTag['style'] - videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0] - videoTag = video_player.find('video') - if videoTag is None: - videoUrl = None - else: - videoUrl = videoTag['src'] - mKwargs = { - 'thumbnailUrl': videoThumbnailUrl, - 'url': videoUrl, - } - timeTag = video_player.find('time') - if timeTag is None: - cls = Gif - else: - cls = Video - durationStr = video_player.find('time').text.split(':') - mKwargs['duration'] = sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) - media.append(cls(**mKwargs)) if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): forwardedUrl = forward_tag['href'] forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] forwarded = Channel(username = forwardedName) - outlinks = [] - for link in post.find_all('a'): - if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): - # Author links at the top (avatar and name) - continue - if link['href'] == rawUrl or link['href'] == url: - style = link.attrs.get('style', '') - # Generic filter of links to the post itself, catches videos, photos, and the date link - if style != '': - imageUrls = re.findall('url\(\'(.*?)\'\)', style) - if len(imageUrls) == 1: - media.append(Image(url = imageUrls[0])) - continue - if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): - style = link.attrs.get('style', '') - imageUrls = re.findall('url\(\'(.*?)\'\)', style) - if len(imageUrls) == 1: - media.append(Image(url = imageUrls[0])) - # resp = self._get(image[0]) - # encoded_string = base64.b64encode(resp.content) - # Individual photo or video link - continue - href = urllib.parse.urljoin(pageUrl, link['href']) - if href not in outlinks: - outlinks.append(href) else: content = None - outlinks = [] - media = [] + + outlinks = [] + for link in post.find_all('a'): + if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): + # Author links at the top (avatar and name) + continue + if link['href'] == rawUrl or link['href'] == url: + style = link.attrs.get('style', '') + # Generic filter of links to the post itself, catches videos, photos, and the date link + if style != '': + imageUrls = re.findall('url\(\'(.*?)\'\)', style) + if len(imageUrls) == 1: + media.append(Photo(url = imageUrls[0])) + continue + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): + style = link.attrs.get('style', '') + imageUrls = re.findall('url\(\'(.*?)\'\)', style) + if len(imageUrls) == 1: + media.append(Photo(url = imageUrls[0])) + # resp = self._get(image[0]) + # encoded_string = base64.b64encode(resp.content) + # Individual photo or video link + continue + href = urllib.parse.urljoin(pageUrl, link['href']) + if (href not in outlinks) and (href != rawUrl): + outlinks.append(href) + + for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): + audioUrl = voice_player.find('audio')['src'] + durationStr = voice_player.find('time').text.split(':') + duration = durationStrToSeconds(durationStr) + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')] + + media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) + + for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): + iTag = video_player.find('i') + if iTag is None: + videoUrl = None + videoThumbnailUrl = None + else: + style = iTag['style'] + videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0] + videoTag = video_player.find('video') + if videoTag is None: + videoUrl = None + else: + videoUrl = videoTag['src'] + mKwargs = { + 'thumbnailUrl': videoThumbnailUrl, + 'url': videoUrl, + } + timeTag = video_player.find('time') + if timeTag is None: + cls = Gif + else: + cls = Video + durationStr = video_player.find('time').text.split(':') + mKwargs['duration'] = durationStrToSeconds(durationStr) + media.append(cls(**mKwargs)) + linkPreview = None if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): kwargs = {} @@ -197,6 +208,9 @@ class TelegramChannelScraper(snscrape.base.Scraper): else: _logger.warning(f'Could not process link preview image on {url}') linkPreview = LinkPreview(**kwargs) + if kwargs['href'] in outlinks: + outlinks.remove(kwargs['href']) + viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') if viewsSpan is None: views = None @@ -220,7 +234,6 @@ class TelegramChannelScraper(snscrape.base.Scraper): else: break nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) - print(f'nextPageUrl: {nextPageUrl}') r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback) if r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') @@ -289,6 +302,9 @@ def parse_num(s): else: return int(s), 1 +def durationStrToSeconds(durationStr): + return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) + def telegramResponseOkCallback(r): if r.status_code == 200: return (True, None)