fixed issue where some videos and photos weren't being scraped (because they weren't in a post containing a 'tgme_widget_message_text' div

This commit is contained in:
Tristan Lee
2022-04-17 06:50:43 -05:00
parent 1e4e0c278d
commit b276c3cc27

View File

@@ -65,14 +65,8 @@ class TelegramPost(snscrape.base.Item):
class Medium:
pass
@dataclasses.dataclass
class Photo(Medium):
previewUrl: str
fullUrl: str
@dataclasses.dataclass
class Image(Medium):
url: str
@dataclasses.dataclass
@@ -81,6 +75,12 @@ class Video(Medium):
duration: float
url: typing.Optional[str] = None
@dataclasses.dataclass
class VoiceMessage(Medium):
url: str
duration: str
bars:typing.List[float]
@dataclasses.dataclass
class Gif(Medium):
thumbnailUrl: str
@@ -117,70 +117,81 @@ class TelegramChannelScraper(snscrape.base.Scraper):
url = rawUrl.replace('//t.me/', '//t.me/s/')
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
media = []
outlinks = []
forwarded = None
forwardedUrl = None
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
content = message.get_text(separator="\n")
for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = video_player.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0]
videoTag = video_player.find('video')
if videoTag is None:
videoUrl = None
else:
videoUrl = videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = video_player.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = video_player.find('time').text.split(':')
mKwargs['duration'] = sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
media.append(cls(**mKwargs))
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwardedUrl = forward_tag['href']
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
forwarded = Channel(username = forwardedName)
outlinks = []
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
if len(imageUrls) == 1:
media.append(Image(url = imageUrls[0]))
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
if len(imageUrls) == 1:
media.append(Image(url = imageUrls[0]))
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if href not in outlinks:
outlinks.append(href)
else:
content = None
outlinks = []
media = []
outlinks = []
for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name)
continue
if link['href'] == rawUrl or link['href'] == url:
style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '':
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '')
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0]))
# resp = self._get(image[0])
# encoded_string = base64.b64encode(resp.content)
# Individual photo or video link
continue
href = urllib.parse.urljoin(pageUrl, link['href'])
if (href not in outlinks) and (href != rawUrl):
outlinks.append(href)
for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voice_player.find('audio')['src']
durationStr = voice_player.find('time').text.split(':')
duration = durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')]
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = video_player.find('i')
if iTag is None:
videoUrl = None
videoThumbnailUrl = None
else:
style = iTag['style']
videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0]
videoTag = video_player.find('video')
if videoTag is None:
videoUrl = None
else:
videoUrl = videoTag['src']
mKwargs = {
'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl,
}
timeTag = video_player.find('time')
if timeTag is None:
cls = Gif
else:
cls = Video
durationStr = video_player.find('time').text.split(':')
mKwargs['duration'] = durationStrToSeconds(durationStr)
media.append(cls(**mKwargs))
linkPreview = None
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
kwargs = {}
@@ -197,6 +208,9 @@ class TelegramChannelScraper(snscrape.base.Scraper):
else:
_logger.warning(f'Could not process link preview image on {url}')
linkPreview = LinkPreview(**kwargs)
if kwargs['href'] in outlinks:
outlinks.remove(kwargs['href'])
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
if viewsSpan is None:
views = None
@@ -220,7 +234,6 @@ class TelegramChannelScraper(snscrape.base.Scraper):
else:
break
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
print(f'nextPageUrl: {nextPageUrl}')
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
@@ -289,6 +302,9 @@ def parse_num(s):
else:
return int(s), 1
def durationStrToSeconds(durationStr):
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
def telegramResponseOkCallback(r):
if r.status_code == 200:
return (True, None)