mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 02:28:29 +03:00
@@ -9,7 +9,6 @@ import re
|
|||||||
import snscrape.base
|
import snscrape.base
|
||||||
import typing
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
import base64
|
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
||||||
@@ -57,7 +56,7 @@ class TelegramPost(snscrape.base.Item):
|
|||||||
forwarded: typing.Optional['Channel'] = None
|
forwarded: typing.Optional['Channel'] = None
|
||||||
forwardedUrl: typing.Optional[str] = None
|
forwardedUrl: typing.Optional[str] = None
|
||||||
media: typing.Optional[typing.List['Medium']] = None
|
media: typing.Optional[typing.List['Medium']] = None
|
||||||
views: typing.Optional[int] = None
|
views: typing.Optional[snscrape.base.IntWithGranularity] = None
|
||||||
linkPreview: typing.Optional[LinkPreview] = None
|
linkPreview: typing.Optional[LinkPreview] = None
|
||||||
|
|
||||||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
||||||
@@ -176,7 +175,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
|
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
|
||||||
audioUrl = voicePlayer.find('audio')['src']
|
audioUrl = voicePlayer.find('audio')['src']
|
||||||
durationStr = voicePlayer.find('time').text
|
durationStr = voicePlayer.find('time').text
|
||||||
duration = durationStrToSeconds(durationStr)
|
duration = _durationStrToSeconds(durationStr)
|
||||||
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
|
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
|
||||||
|
|
||||||
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
|
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
|
||||||
@@ -201,7 +200,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
cls = Video
|
cls = Video
|
||||||
durationStr = videoPlayer.find('time').text
|
durationStr = videoPlayer.find('time').text
|
||||||
mKwargs['duration'] = durationStrToSeconds(durationStr)
|
mKwargs['duration'] = _durationStrToSeconds(durationStr)
|
||||||
media.append(cls(**mKwargs))
|
media.append(cls(**mKwargs))
|
||||||
|
|
||||||
linkPreview = None
|
linkPreview = None
|
||||||
@@ -224,7 +223,12 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
outlinks.remove(kwargs['href'])
|
outlinks.remove(kwargs['href'])
|
||||||
|
|
||||||
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
||||||
views = None if viewsSpan is None else parse_num(viewsSpan.text)
|
views = None if viewsSpan is None else _parse_num(viewsSpan.text)
|
||||||
|
|
||||||
|
outlinks = outlinks if outlinks else None
|
||||||
|
media = media if media else None
|
||||||
|
mentions = mentions if mentions else None
|
||||||
|
hashtags = hashtags if hashtags else None
|
||||||
|
|
||||||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
|
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
|
||||||
|
|
||||||
@@ -253,7 +257,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
||||||
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
|
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
@@ -266,8 +270,12 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
||||||
if membersDiv.text.endswith((' members', ' subscribers')):
|
if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')):
|
||||||
kwargs['members'] = int(''.join(membersDiv.text.split(' ')[:-1]))
|
membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1])
|
||||||
|
if membersStr == 'no':
|
||||||
|
kwargs['members'] = 0
|
||||||
|
else:
|
||||||
|
kwargs['members'] = int(membersStr)
|
||||||
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
|
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
|
||||||
if photoImg is not None:
|
if photoImg is not None:
|
||||||
kwargs['photo'] = photoImg.attrs['src']
|
kwargs['photo'] = photoImg.attrs['src']
|
||||||
@@ -294,7 +302,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
kwargs['description'] = descriptionDiv.text
|
kwargs['description'] = descriptionDiv.text
|
||||||
|
|
||||||
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
||||||
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
value, granularity = _parse_num(div.find('span', class_ = 'counter_value').text)
|
||||||
type_ = div.find('span', class_ = 'counter_type').text
|
type_ = div.find('span', class_ = 'counter_type').text
|
||||||
if type_ == 'members':
|
if type_ == 'members':
|
||||||
# Already extracted more accurately from /channel, skip
|
# Already extracted more accurately from /channel, skip
|
||||||
@@ -312,7 +320,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.channel)
|
return cls._cli_construct(args, args.channel)
|
||||||
|
|
||||||
def parse_num(s):
|
def _parse_num(s):
|
||||||
s = s.replace(' ', '')
|
s = s.replace(' ', '')
|
||||||
if s.endswith('M'):
|
if s.endswith('M'):
|
||||||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
||||||
@@ -320,11 +328,11 @@ def parse_num(s):
|
|||||||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
||||||
return int(s), 1
|
return int(s), 1
|
||||||
|
|
||||||
def durationStrToSeconds(durationStr):
|
def _durationStrToSeconds(durationStr):
|
||||||
durationList = durationStr.split(':')
|
durationList = durationStr.split(':')
|
||||||
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
|
return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(durationList))])
|
||||||
|
|
||||||
def telegramResponseOkCallback(r):
|
def _telegramResponseOkCallback(r):
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
return (True, None)
|
return (True, None)
|
||||||
return (False, f'{r.status_code=}')
|
return (False, f'{r.status_code=}')
|
||||||
|
|||||||
@@ -178,15 +178,11 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
continue
|
continue
|
||||||
if 'data-video' in a.attrs:
|
if 'data-video' in a.attrs:
|
||||||
# Video
|
# Video
|
||||||
if 'data-link-attr' in a.attrs:
|
|
||||||
hrefUrl = urllib.parse.unquote(a.attrs['data-link-attr'].split('to=')[1].split('&')[0])
|
|
||||||
else:
|
|
||||||
hrefUrl = f'https://vk.com{a["href"]}'
|
|
||||||
video = Video(
|
video = Video(
|
||||||
id = a['data-video'],
|
id = a['data-video'],
|
||||||
list = a['data-list'],
|
list = a['data-list'],
|
||||||
duration = int(a['data-duration']),
|
duration = int(a['data-duration']),
|
||||||
url = hrefUrl,
|
url = f'https://vk.com{a["href"]}',
|
||||||
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
@@ -407,4 +403,4 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.username)
|
return cls._cli_construct(args, args.username)
|
||||||
Reference in New Issue
Block a user