improved consistency of code formatting and added _STYLE_MEDIA_URL_PATTERN as variable

This commit is contained in:
Tristan Lee
2022-04-27 16:41:24 -05:00
parent 21f7b620ec
commit 5648e957d0

View File

@@ -13,7 +13,7 @@ import base64
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')
@dataclasses.dataclass @dataclasses.dataclass
class LinkPreview: class LinkPreview:
@@ -45,6 +45,7 @@ class Channel(snscrape.base.Entity):
def __str__(self): def __str__(self):
return f'https://t.me/s/{self.username}' return f'https://t.me/s/{self.username}'
@dataclasses.dataclass @dataclasses.dataclass
class TelegramPost(snscrape.base.Item): class TelegramPost(snscrape.base.Item):
url: str url: str
@@ -64,30 +65,36 @@ class TelegramPost(snscrape.base.Item):
def __str__(self): def __str__(self):
return self.url return self.url
class Medium: class Medium:
pass pass
@dataclasses.dataclass @dataclasses.dataclass
class Photo(Medium): class Photo(Medium):
url: str url: str
@dataclasses.dataclass @dataclasses.dataclass
class Video(Medium): class Video(Medium):
thumbnailUrl: str thumbnailUrl: str
duration: float duration: float
url: typing.Optional[str] = None url: typing.Optional[str] = None
@dataclasses.dataclass @dataclasses.dataclass
class VoiceMessage(Medium): class VoiceMessage(Medium):
url: str url: str
duration: str duration: str
bars:typing.List[float] bars:typing.List[float]
@dataclasses.dataclass @dataclasses.dataclass
class Gif(Medium): class Gif(Medium):
thumbnailUrl: str thumbnailUrl: str
url: typing.Optional[str] = None url: typing.Optional[str] = None
class TelegramChannelScraper(snscrape.base.Scraper): class TelegramChannelScraper(snscrape.base.Scraper):
name = 'telegram-channel' name = 'telegram-channel'
@@ -120,11 +127,13 @@ class TelegramChannelScraper(snscrape.base.Scraper):
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
media = [] media = []
outlinks = [] outlinks = []
mentions = []
hashtags = []
forwarded = None forwarded = None
forwardedUrl = None forwardedUrl = None
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
forwardedUrl = forward_tag['href'] forwardedUrl = forwardTag['href']
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
forwarded = Channel(username = forwardedName) forwarded = Channel(username = forwardedName)
@@ -133,9 +142,6 @@ class TelegramChannelScraper(snscrape.base.Scraper):
else: else:
content = None content = None
outlinks = []
mentions = []
hashtags = []
for link in post.find_all('a'): for link in post.find_all('a'):
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
# Author links at the top (avatar and name) # Author links at the top (avatar and name)
@@ -144,13 +150,13 @@ class TelegramChannelScraper(snscrape.base.Scraper):
style = link.attrs.get('style', '') style = link.attrs.get('style', '')
# Generic filter of links to the post itself, catches videos, photos, and the date link # Generic filter of links to the post itself, catches videos, photos, and the date link
if style != '': if style != '':
imageUrls = re.findall('url\(\'(.*?)\'\)', style) imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1: if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0])) media.append(Photo(url = imageUrls[0]))
continue continue
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
style = link.attrs.get('style', '') style = link.attrs.get('style', '')
imageUrls = re.findall('url\(\'(.*?)\'\)', style) imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
if len(imageUrls) == 1: if len(imageUrls) == 1:
media.append(Photo(url = imageUrls[0])) media.append(Photo(url = imageUrls[0]))
# resp = self._get(image[0]) # resp = self._get(image[0])
@@ -167,37 +173,34 @@ class TelegramChannelScraper(snscrape.base.Scraper):
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
outlinks.append(href) outlinks.append(href)
for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
audioUrl = voice_player.find('audio')['src'] audioUrl = voicePlayer.find('audio')['src']
durationStr = voice_player.find('time').text.split(':') durationStr = voicePlayer.find('time').text
duration = durationStrToSeconds(durationStr) duration = durationStrToSeconds(durationStr)
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')] barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
iTag = video_player.find('i') iTag = videoPlayer.find('i')
if iTag is None: if iTag is None:
videoUrl = None videoUrl = None
videoThumbnailUrl = None videoThumbnailUrl = None
else: else:
style = iTag['style'] style = iTag['style']
videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0] videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
videoTag = video_player.find('video') videoTag = videoPlayer.find('video')
if videoTag is None: videoUrl = None if videoTag is None else videoTag['src']
videoUrl = None
else:
videoUrl = videoTag['src']
mKwargs = { mKwargs = {
'thumbnailUrl': videoThumbnailUrl, 'thumbnailUrl': videoThumbnailUrl,
'url': videoUrl, 'url': videoUrl,
} }
timeTag = video_player.find('time') timeTag = videoPlayer.find('time')
if timeTag is None: if timeTag is None:
cls = Gif cls = Gif
else: else:
cls = Video cls = Video
durationStr = video_player.find('time').text.split(':') durationStr = videoPlayer.find('time').text
mKwargs['duration'] = durationStrToSeconds(durationStr) mKwargs['duration'] = durationStrToSeconds(durationStr)
media.append(cls(**mKwargs)) media.append(cls(**mKwargs))
@@ -221,10 +224,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
outlinks.remove(kwargs['href']) outlinks.remove(kwargs['href'])
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
if viewsSpan is None: views = None if viewsSpan is None else parse_num(viewsSpan.text)
views = None
else:
views = parse_num(viewsSpan.text)
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
@@ -318,16 +318,14 @@ def parse_num(s):
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
elif s.endswith('K'): elif s.endswith('K'):
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
else: return int(s), 1
return int(s), 1
def durationStrToSeconds(durationStr): def durationStrToSeconds(durationStr):
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) durationList = durationStr.split(':')
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
def telegramResponseOkCallback(r): def telegramResponseOkCallback(r):
if r.status_code == 200: if r.status_code == 200:
return (True, None) return (True, None)
elif r.status_code // 100 == 5: return (False, f'{r.status_code=}')
return (False, f'status code: {r.status_code}')
else:
return (False, None)