mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-12 20:38:29 +03:00
implemented Media dataclasses for Telegram, and added variable for extracting a post's view count
This commit is contained in:
@@ -30,9 +30,9 @@ class TelegramPost(snscrape.base.Item):
|
|||||||
date: datetime.datetime
|
date: datetime.datetime
|
||||||
content: str
|
content: str
|
||||||
outlinks: list
|
outlinks: list
|
||||||
images: list
|
media: typing.Optional[typing.List['Medium']]
|
||||||
videos: list
|
|
||||||
forwarded: str
|
forwarded: str
|
||||||
|
views: int = None
|
||||||
linkPreview: typing.Optional[LinkPreview] = None
|
linkPreview: typing.Optional[LinkPreview] = None
|
||||||
|
|
||||||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
||||||
@@ -62,6 +62,29 @@ class Channel(snscrape.base.Entity):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f'https://t.me/s/{self.username}'
|
return f'https://t.me/s/{self.username}'
|
||||||
|
|
||||||
|
class Medium:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Photo(Medium):
|
||||||
|
previewUrl: str
|
||||||
|
fullUrl: str
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Image(Medium):
|
||||||
|
url: str
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Video(Medium):
|
||||||
|
thumbnailUrl: str
|
||||||
|
duration: float
|
||||||
|
url: typing.Optional[str] = None
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Gif(Medium):
|
||||||
|
thumbnailUrl: str
|
||||||
|
url: typing.Optional[str] = None
|
||||||
|
|
||||||
class TelegramChannelScraper(snscrape.base.Scraper):
|
class TelegramChannelScraper(snscrape.base.Scraper):
|
||||||
name = 'telegram-channel'
|
name = 'telegram-channel'
|
||||||
@@ -93,18 +116,34 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
|
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
|
||||||
url = rawUrl.replace('//t.me/', '//t.me/s/')
|
url = rawUrl.replace('//t.me/', '//t.me/s/')
|
||||||
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
|
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
|
||||||
images = []
|
media = []
|
||||||
videos = []
|
|
||||||
forwarded = None
|
forwarded = None
|
||||||
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
|
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
|
||||||
content = message.get_text(separator="\n")
|
content = message.get_text(separator="\n")
|
||||||
|
|
||||||
for video_tag in post.find_all('video'):
|
for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
|
||||||
videos.append(video_tag['src'])
|
|
||||||
|
|
||||||
|
style = video_player.find('i')['style']
|
||||||
|
videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)
|
||||||
|
videoTag = video_player.find('video')
|
||||||
|
if videoTag is None:
|
||||||
|
videoUrl = None
|
||||||
|
else:
|
||||||
|
videoUrl = videoTag['src']
|
||||||
|
mKwargs = {
|
||||||
|
'thumbnailUrl': videoThumbnailUrl,
|
||||||
|
'url': videoUrl,
|
||||||
|
}
|
||||||
|
timeTag = video_player.find('time')
|
||||||
|
if timeTag is None:
|
||||||
|
cls = Gif
|
||||||
|
else:
|
||||||
|
cls = Video
|
||||||
|
durationStr = video_player.find('time').text.split(':')
|
||||||
|
mKwargs['duration'] = sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
|
||||||
|
media.append(cls(**mKwargs))
|
||||||
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
|
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
|
||||||
forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
|
forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
|
||||||
|
|
||||||
outlinks = []
|
outlinks = []
|
||||||
for link in post.find_all('a'):
|
for link in post.find_all('a'):
|
||||||
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
|
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
|
||||||
@@ -114,15 +153,15 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
style = link.attrs.get('style', '')
|
style = link.attrs.get('style', '')
|
||||||
# Generic filter of links to the post itself, catches videos, photos, and the date link
|
# Generic filter of links to the post itself, catches videos, photos, and the date link
|
||||||
if style != '':
|
if style != '':
|
||||||
image = re.findall('url\(\'(.*?)\'\)', style)
|
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
|
||||||
if len(image) == 1:
|
if len(imageUrls) == 1:
|
||||||
images.append(image[0])
|
media.append(Image(url = imageUrls[0]))
|
||||||
continue
|
continue
|
||||||
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
|
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
|
||||||
style = link.attrs.get('style', '')
|
style = link.attrs.get('style', '')
|
||||||
image = re.findall('url\(\'(.*?)\'\)', style)
|
imageUrls = re.findall('url\(\'(.*?)\'\)', style)
|
||||||
if len(image) == 1:
|
if len(imageUrls) == 1:
|
||||||
images.append(image[0])
|
media.append(Image(url = imageUrls[0]))
|
||||||
# resp = self._get(image[0])
|
# resp = self._get(image[0])
|
||||||
# encoded_string = base64.b64encode(resp.content)
|
# encoded_string = base64.b64encode(resp.content)
|
||||||
# Individual photo or video link
|
# Individual photo or video link
|
||||||
@@ -133,8 +172,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
content = None
|
content = None
|
||||||
outlinks = []
|
outlinks = []
|
||||||
images = []
|
media = []
|
||||||
videos = []
|
|
||||||
linkPreview = None
|
linkPreview = None
|
||||||
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
|
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
@@ -151,7 +189,13 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
_logger.warning(f'Could not process link preview image on {url}')
|
_logger.warning(f'Could not process link preview image on {url}')
|
||||||
linkPreview = LinkPreview(**kwargs)
|
linkPreview = LinkPreview(**kwargs)
|
||||||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded)
|
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
||||||
|
if viewsSpan is None:
|
||||||
|
views = None
|
||||||
|
else:
|
||||||
|
views = parse_num(viewsSpan.text)
|
||||||
|
|
||||||
|
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, media = media, forwarded = forwarded, views = views)
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
r, soup = self._initial_page()
|
r, soup = self._initial_page()
|
||||||
@@ -204,15 +248,6 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
|
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
|
||||||
kwargs['description'] = descriptionDiv.text
|
kwargs['description'] = descriptionDiv.text
|
||||||
|
|
||||||
def parse_num(s):
|
|
||||||
s = s.replace(' ', '')
|
|
||||||
if s.endswith('M'):
|
|
||||||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
|
||||||
elif s.endswith('K'):
|
|
||||||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
|
||||||
else:
|
|
||||||
return int(s), 1
|
|
||||||
|
|
||||||
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
||||||
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
||||||
type_ = div.find('span', class_ = 'counter_type').text
|
type_ = div.find('span', class_ = 'counter_type').text
|
||||||
@@ -231,3 +266,12 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.channel)
|
return cls._cli_construct(args, args.channel)
|
||||||
|
|
||||||
|
def parse_num(s):
|
||||||
|
s = s.replace(' ', '')
|
||||||
|
if s.endswith('M'):
|
||||||
|
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
||||||
|
elif s.endswith('K'):
|
||||||
|
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
||||||
|
else:
|
||||||
|
return int(s), 1
|
||||||
Reference in New Issue
Block a user