mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 02:28:29 +03:00
merged master into more-tg-info to update upstream PR
This commit is contained in:
@@ -133,12 +133,22 @@ def _dump_stack_and_locals(trace, exc = None):
|
|||||||
fp.write('Stack:\n')
|
fp.write('Stack:\n')
|
||||||
for frameRecord in trace:
|
for frameRecord in trace:
|
||||||
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
|
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
|
||||||
for line in frameRecord.code_context:
|
if frameRecord.code_context is not None:
|
||||||
fp.write(f' {line.strip()}\n')
|
for line in frameRecord.code_context:
|
||||||
|
fp.write(f' {line.strip()}\n')
|
||||||
fp.write('\n')
|
fp.write('\n')
|
||||||
|
|
||||||
for frameRecord in trace:
|
modules = [inspect.getmodule(frameRecord[0]) for frameRecord in trace]
|
||||||
module = inspect.getmodule(frameRecord[0])
|
for i, (module, frameRecord) in enumerate(zip(modules, trace)):
|
||||||
|
if module is None:
|
||||||
|
# Module-less frame, e.g. dataclass.__init__
|
||||||
|
for j in reversed(range(i)):
|
||||||
|
if modules[j] is not None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# No previous module scope
|
||||||
|
continue
|
||||||
|
module = modules[j]
|
||||||
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
|
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
|
||||||
continue
|
continue
|
||||||
locals_ = frameRecord[0].f_locals
|
locals_ = frameRecord[0].f_locals
|
||||||
|
|||||||
@@ -163,16 +163,19 @@ class Scraper:
|
|||||||
return self._get_entity()
|
return self._get_entity()
|
||||||
|
|
||||||
def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None, allowRedirects = True, proxies = None):
|
def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None, allowRedirects = True, proxies = None):
|
||||||
proxies = proxies or self._proxies
|
proxies = proxies or self._proxies or {}
|
||||||
for attempt in range(self._retries + 1):
|
for attempt in range(self._retries + 1):
|
||||||
# The request is newly prepared on each retry because of potential cookie updates.
|
# The request is newly prepared on each retry because of potential cookie updates.
|
||||||
req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers))
|
req = self._session.prepare_request(requests.Request(method, url, params = params, data = data, headers = headers))
|
||||||
|
environmentSettings = self._session.merge_environment_settings(req.url, proxies, None, None, None)
|
||||||
logger.info(f'Retrieving {req.url}')
|
logger.info(f'Retrieving {req.url}')
|
||||||
logger.debug(f'... with headers: {headers!r}')
|
logger.debug(f'... with headers: {headers!r}')
|
||||||
if data:
|
if data:
|
||||||
logger.debug(f'... with data: {data!r}')
|
logger.debug(f'... with data: {data!r}')
|
||||||
|
if environmentSettings:
|
||||||
|
logger.debug(f'... with environmentSettings: {environmentSettings!r}')
|
||||||
try:
|
try:
|
||||||
r = self._session.send(req, allow_redirects = allowRedirects, timeout = timeout, proxies = proxies)
|
r = self._session.send(req, allow_redirects = allowRedirects, timeout = timeout, **environmentSettings)
|
||||||
except requests.exceptions.RequestException as exc:
|
except requests.exceptions.RequestException as exc:
|
||||||
if attempt < self._retries:
|
if attempt < self._retries:
|
||||||
retrying = ', retrying'
|
retrying = ', retrying'
|
||||||
|
|||||||
@@ -96,6 +96,8 @@ class _InstagramCommonScraper(snscrape.base.Scraper):
|
|||||||
def _check_json_callback(self, r):
|
def _check_json_callback(self, r):
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
return False, f'status code {r.status_code}'
|
return False, f'status code {r.status_code}'
|
||||||
|
if r.url.startswith('https://www.instagram.com/accounts/login/'):
|
||||||
|
raise snscrape.base.ScraperException('Redirected to login page')
|
||||||
try:
|
try:
|
||||||
obj = json.loads(r.text)
|
obj = json.loads(r.text)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ _logger = logging.getLogger(__name__)
|
|||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Submission(snscrape.base.Item):
|
class Submission(snscrape.base.Item):
|
||||||
author: typing.Optional[str] # E.g. submission hf7k6
|
author: typing.Optional[str] # E.g. submission hf7k6
|
||||||
created: datetime.datetime
|
date: datetime.datetime
|
||||||
id: str
|
id: str
|
||||||
link: typing.Optional[str]
|
link: typing.Optional[str]
|
||||||
selftext: typing.Optional[str]
|
selftext: typing.Optional[str]
|
||||||
@@ -28,6 +28,8 @@ class Submission(snscrape.base.Item):
|
|||||||
title: str
|
title: str
|
||||||
url: str
|
url: str
|
||||||
|
|
||||||
|
created = snscrape.base._DeprecatedProperty('created', lambda self: self.date, 'date')
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.url
|
return self.url
|
||||||
|
|
||||||
@@ -36,12 +38,14 @@ class Submission(snscrape.base.Item):
|
|||||||
class Comment(snscrape.base.Item):
|
class Comment(snscrape.base.Item):
|
||||||
author: typing.Optional[str]
|
author: typing.Optional[str]
|
||||||
body: str
|
body: str
|
||||||
created: datetime.datetime
|
date: datetime.datetime
|
||||||
id: str
|
id: str
|
||||||
parentId: typing.Optional[str]
|
parentId: typing.Optional[str]
|
||||||
subreddit: typing.Optional[str]
|
subreddit: typing.Optional[str]
|
||||||
url: str
|
url: str
|
||||||
|
|
||||||
|
created = snscrape.base._DeprecatedProperty('created', lambda self: self.date, 'date')
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.url
|
return self.url
|
||||||
|
|
||||||
@@ -111,7 +115,7 @@ class _RedditPushshiftScraper(snscrape.base.Scraper):
|
|||||||
|
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'author': d.get('author'),
|
'author': d.get('author'),
|
||||||
'created': datetime.datetime.fromtimestamp(d['created_utc'], datetime.timezone.utc),
|
'date': datetime.datetime.fromtimestamp(d['created_utc'], datetime.timezone.utc),
|
||||||
'url': f'https://old.reddit.com{permalink}',
|
'url': f'https://old.reddit.com{permalink}',
|
||||||
'subreddit': d.get('subreddit'),
|
'subreddit': d.get('subreddit'),
|
||||||
}
|
}
|
||||||
@@ -192,7 +196,7 @@ class _RedditPushshiftSearchScraper(_RedditPushshiftScraper):
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Return newer first; if both have the same creation datetime, return the comment first
|
# Return newer first; if both have the same creation datetime, return the comment first
|
||||||
if tipSubmission.created > tipComment.created:
|
if tipSubmission.date > tipComment.date:
|
||||||
yield tipSubmission
|
yield tipSubmission
|
||||||
try:
|
try:
|
||||||
tipSubmission = next(submissionsIter)
|
tipSubmission = next(submissionsIter)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import urllib.parse
|
|||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
||||||
|
_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class LinkPreview:
|
class LinkPreview:
|
||||||
@@ -23,29 +23,12 @@ class LinkPreview:
|
|||||||
image: typing.Optional[str] = None
|
image: typing.Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class TelegramPost(snscrape.base.Item):
|
|
||||||
url: str
|
|
||||||
date: datetime.datetime
|
|
||||||
content: str
|
|
||||||
outlinks: list
|
|
||||||
images: list
|
|
||||||
video: str
|
|
||||||
forwarded: str
|
|
||||||
linkPreview: typing.Optional[LinkPreview] = None
|
|
||||||
|
|
||||||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.url
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Channel(snscrape.base.Entity):
|
class Channel(snscrape.base.Entity):
|
||||||
username: str
|
username: str
|
||||||
title: str
|
title: typing.Optional[str] = None
|
||||||
verified: bool
|
verified: typing.Optional[bool] = None
|
||||||
photo: str
|
photo: typing.Optional[str] = None
|
||||||
description: typing.Optional[str] = None
|
description: typing.Optional[str] = None
|
||||||
members: typing.Optional[int] = None
|
members: typing.Optional[int] = None
|
||||||
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
|
photos: typing.Optional[snscrape.base.IntWithGranularity] = None
|
||||||
@@ -62,6 +45,55 @@ class Channel(snscrape.base.Entity):
|
|||||||
return f'https://t.me/s/{self.username}'
|
return f'https://t.me/s/{self.username}'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class TelegramPost(snscrape.base.Item):
|
||||||
|
url: str
|
||||||
|
date: datetime.datetime
|
||||||
|
content: str
|
||||||
|
outlinks: typing.List[str] = None
|
||||||
|
mentions: typing.List[str] = None
|
||||||
|
hashtags: typing.List[str] = None
|
||||||
|
forwarded: typing.Optional['Channel'] = None
|
||||||
|
forwardedUrl: typing.Optional[str] = None
|
||||||
|
media: typing.Optional[typing.List['Medium']] = None
|
||||||
|
views: typing.Optional[int] = None
|
||||||
|
linkPreview: typing.Optional[LinkPreview] = None
|
||||||
|
|
||||||
|
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.url
|
||||||
|
|
||||||
|
|
||||||
|
class Medium:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Photo(Medium):
|
||||||
|
url: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Video(Medium):
|
||||||
|
thumbnailUrl: str
|
||||||
|
duration: float
|
||||||
|
url: typing.Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class VoiceMessage(Medium):
|
||||||
|
url: str
|
||||||
|
duration: str
|
||||||
|
bars:typing.List[float]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Gif(Medium):
|
||||||
|
thumbnailUrl: str
|
||||||
|
url: typing.Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class TelegramChannelScraper(snscrape.base.Scraper):
|
class TelegramChannelScraper(snscrape.base.Scraper):
|
||||||
name = 'telegram-channel'
|
name = 'telegram-channel'
|
||||||
|
|
||||||
@@ -92,18 +124,86 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
|
_logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
|
||||||
url = rawUrl.replace('//t.me/', '//t.me/s/')
|
url = rawUrl.replace('//t.me/', '//t.me/s/')
|
||||||
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
|
date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
|
||||||
images = []
|
media = []
|
||||||
video = None
|
outlinks = []
|
||||||
|
mentions = []
|
||||||
|
hashtags = []
|
||||||
forwarded = None
|
forwarded = None
|
||||||
|
forwardedUrl = None
|
||||||
|
|
||||||
|
if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
|
||||||
|
forwardedUrl = forwardTag['href']
|
||||||
|
forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
|
||||||
|
forwarded = Channel(username = forwardedName)
|
||||||
|
|
||||||
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
|
if (message := post.find('div', class_ = 'tgme_widget_message_text')):
|
||||||
content = message.get_text(separator="\n")
|
content = message.get_text(separator="\n")
|
||||||
|
else:
|
||||||
|
content = None
|
||||||
|
|
||||||
for video_tag in post.find_all('video'):
|
for link in post.find_all('a'):
|
||||||
video = video_tag['src']
|
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
|
||||||
|
# Author links at the top (avatar and name)
|
||||||
|
continue
|
||||||
|
if link['href'] == rawUrl or link['href'] == url:
|
||||||
|
style = link.attrs.get('style', '')
|
||||||
|
# Generic filter of links to the post itself, catches videos, photos, and the date link
|
||||||
|
if style != '':
|
||||||
|
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
|
||||||
|
if len(imageUrls) == 1:
|
||||||
|
media.append(Photo(url = imageUrls[0]))
|
||||||
|
continue
|
||||||
|
if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
|
||||||
|
style = link.attrs.get('style', '')
|
||||||
|
imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
|
||||||
|
if len(imageUrls) == 1:
|
||||||
|
media.append(Photo(url = imageUrls[0]))
|
||||||
|
# resp = self._get(image[0])
|
||||||
|
# encoded_string = base64.b64encode(resp.content)
|
||||||
|
# Individual photo or video link
|
||||||
|
continue
|
||||||
|
if link.text.startswith('@'):
|
||||||
|
mentions.append(link.text.strip('@'))
|
||||||
|
continue
|
||||||
|
if link.text.startswith('#'):
|
||||||
|
hashtags.append(link.text.strip('#'))
|
||||||
|
continue
|
||||||
|
href = urllib.parse.urljoin(pageUrl, link['href'])
|
||||||
|
if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
|
||||||
|
outlinks.append(href)
|
||||||
|
|
||||||
if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
|
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
|
||||||
forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
|
audioUrl = voicePlayer.find('audio')['src']
|
||||||
|
durationStr = voicePlayer.find('time').text
|
||||||
|
duration = durationStrToSeconds(durationStr)
|
||||||
|
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
|
||||||
|
|
||||||
|
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
|
||||||
|
|
||||||
|
for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
|
||||||
|
iTag = videoPlayer.find('i')
|
||||||
|
if iTag is None:
|
||||||
|
videoUrl = None
|
||||||
|
videoThumbnailUrl = None
|
||||||
|
else:
|
||||||
|
style = iTag['style']
|
||||||
|
videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
|
||||||
|
videoTag = videoPlayer.find('video')
|
||||||
|
videoUrl = None if videoTag is None else videoTag['src']
|
||||||
|
mKwargs = {
|
||||||
|
'thumbnailUrl': videoThumbnailUrl,
|
||||||
|
'url': videoUrl,
|
||||||
|
}
|
||||||
|
timeTag = videoPlayer.find('time')
|
||||||
|
if timeTag is None:
|
||||||
|
cls = Gif
|
||||||
|
else:
|
||||||
|
cls = Video
|
||||||
|
durationStr = videoPlayer.find('time').text
|
||||||
|
mKwargs['duration'] = durationStrToSeconds(durationStr)
|
||||||
|
media.append(cls(**mKwargs))
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
outlinks = []
|
outlinks = []
|
||||||
for link in post.find_all('a'):
|
for link in post.find_all('a'):
|
||||||
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
|
if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
|
||||||
@@ -131,6 +231,8 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
outlinks = []
|
outlinks = []
|
||||||
images = []
|
images = []
|
||||||
video = None
|
video = None
|
||||||
|
=======
|
||||||
|
>>>>>>> master
|
||||||
linkPreview = None
|
linkPreview = None
|
||||||
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
|
if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
@@ -147,20 +249,40 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
_logger.warning(f'Could not process link preview image on {url}')
|
_logger.warning(f'Could not process link preview image on {url}')
|
||||||
linkPreview = LinkPreview(**kwargs)
|
linkPreview = LinkPreview(**kwargs)
|
||||||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, video = video, forwarded = forwarded)
|
if kwargs['href'] in outlinks:
|
||||||
|
outlinks.remove(kwargs['href'])
|
||||||
|
|
||||||
|
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
||||||
|
views = None if viewsSpan is None else parse_num(viewsSpan.text)
|
||||||
|
|
||||||
|
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
r, soup = self._initial_page()
|
r, soup = self._initial_page()
|
||||||
if '/s/' not in r.url:
|
if '/s/' not in r.url:
|
||||||
_logger.warning('No public post list for this user')
|
_logger.warning('No public post list for this user')
|
||||||
return
|
return
|
||||||
|
nextPageUrl = ''
|
||||||
while True:
|
while True:
|
||||||
yield from self._soup_to_items(soup, r.url)
|
yield from self._soup_to_items(soup, r.url)
|
||||||
|
try:
|
||||||
|
if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
|
||||||
|
# if message 1 is the first message in the page, terminate scraping
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
pass
|
||||||
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
|
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
|
||||||
if not pageLink:
|
if not pageLink:
|
||||||
break
|
# some pages are missing a "tme_messages_more" tag, causing early termination
|
||||||
|
if '=' not in nextPageUrl:
|
||||||
|
nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
|
||||||
|
nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
|
||||||
|
if nextPostIndex > 20:
|
||||||
|
pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
|
||||||
|
else:
|
||||||
|
break
|
||||||
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
||||||
r = self._get(nextPageUrl, headers = self._headers)
|
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
@@ -173,9 +295,13 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
||||||
if membersDiv.text.endswith(' members'):
|
if membersDiv.text.endswith((' members', ' subscribers')):
|
||||||
kwargs['members'] = int(membersDiv.text[:-8].replace(' ', ''))
|
kwargs['members'] = int(''.join(membersDiv.text.split(' ')[:-1]))
|
||||||
kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src']
|
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
|
||||||
|
if photoImg is not None:
|
||||||
|
kwargs['photo'] = photoImg.attrs['src']
|
||||||
|
else:
|
||||||
|
kwargs['photo'] = None
|
||||||
|
|
||||||
r, soup = self._initial_page()
|
r, soup = self._initial_page()
|
||||||
if '/s/' not in r.url: # Redirect on channels without public posts
|
if '/s/' not in r.url: # Redirect on channels without public posts
|
||||||
@@ -196,15 +322,6 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
|
if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
|
||||||
kwargs['description'] = descriptionDiv.text
|
kwargs['description'] = descriptionDiv.text
|
||||||
|
|
||||||
def parse_num(s):
|
|
||||||
s = s.replace(' ', '')
|
|
||||||
if s.endswith('M'):
|
|
||||||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
|
||||||
elif s.endswith('K'):
|
|
||||||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
|
||||||
else:
|
|
||||||
return int(s), 1
|
|
||||||
|
|
||||||
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
||||||
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
||||||
type_ = div.find('span', class_ = 'counter_type').text
|
type_ = div.find('span', class_ = 'counter_type').text
|
||||||
@@ -223,3 +340,21 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.channel)
|
return cls._cli_construct(args, args.channel)
|
||||||
|
|
||||||
|
def parse_num(s):
|
||||||
|
s = s.replace(' ', '')
|
||||||
|
if s.endswith('M'):
|
||||||
|
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
||||||
|
elif s.endswith('K'):
|
||||||
|
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
||||||
|
return int(s), 1
|
||||||
|
|
||||||
|
def durationStrToSeconds(durationStr):
|
||||||
|
durationList = durationStr.split(':')
|
||||||
|
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
|
||||||
|
|
||||||
|
def telegramResponseOkCallback(r):
|
||||||
|
if r.status_code == 200:
|
||||||
|
return (True, None)
|
||||||
|
return (False, f'{r.status_code=}')
|
||||||
|
|
||||||
@@ -99,7 +99,7 @@ class VideoVariant:
|
|||||||
class Video(Medium):
|
class Video(Medium):
|
||||||
thumbnailUrl: str
|
thumbnailUrl: str
|
||||||
variants: typing.List[VideoVariant]
|
variants: typing.List[VideoVariant]
|
||||||
duration: float
|
duration: typing.Optional[float] = None
|
||||||
views: typing.Optional[int] = None
|
views: typing.Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
@@ -132,12 +132,300 @@ class Place:
|
|||||||
countryCode: str
|
countryCode: str
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class Card:
|
class Card:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class SummaryCard(Card):
|
||||||
title: str
|
title: str
|
||||||
url: str
|
url: str
|
||||||
description: typing.Optional[str] = None
|
description: typing.Optional[str] = None
|
||||||
thumbnailUrl: typing.Optional[str] = None
|
thumbnailUrl: typing.Optional[str] = None
|
||||||
|
siteUser: typing.Optional['User'] = None
|
||||||
|
creatorUser: typing.Optional['User'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class AppCard(SummaryCard):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PollCard(Card):
|
||||||
|
options: typing.List['PollOption']
|
||||||
|
endDate: datetime.datetime
|
||||||
|
duration: int
|
||||||
|
finalResults: bool
|
||||||
|
lastUpdateDate: typing.Optional[datetime.datetime] = None
|
||||||
|
medium: typing.Optional[Medium] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PollOption:
|
||||||
|
label: str
|
||||||
|
count: typing.Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PlayerCard(Card):
|
||||||
|
title: str
|
||||||
|
url: str
|
||||||
|
description: typing.Optional[str] = None
|
||||||
|
imageUrl: typing.Optional[str] = None
|
||||||
|
siteUser: typing.Optional['User'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PromoConvoCard(Card):
|
||||||
|
actions: typing.List['PromoConvoAction']
|
||||||
|
thankYouText: str
|
||||||
|
medium: Medium
|
||||||
|
thankYouUrl: typing.Optional[str] = None
|
||||||
|
thankYouTcoUrl: typing.Optional[str] = None
|
||||||
|
cover: typing.Optional['Photo'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PromoConvoAction:
|
||||||
|
label: str
|
||||||
|
tweet: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class BroadcastCard(Card):
|
||||||
|
id: str
|
||||||
|
url: str
|
||||||
|
title: str
|
||||||
|
state: typing.Optional[str] = None
|
||||||
|
broadcaster: typing.Optional['User'] = None
|
||||||
|
thumbnailUrl: typing.Optional[str] = None
|
||||||
|
source: typing.Optional[str] = None
|
||||||
|
siteUser: typing.Optional['User'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class PeriscopeBroadcastCard(Card):
|
||||||
|
id: str
|
||||||
|
url: str
|
||||||
|
title: str
|
||||||
|
description: str
|
||||||
|
state: str
|
||||||
|
totalParticipants: int
|
||||||
|
thumbnailUrl: str
|
||||||
|
source: typing.Optional[str] = None
|
||||||
|
broadcaster: typing.Optional['User'] = None
|
||||||
|
siteUser: typing.Optional['User'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class EventCard(Card):
|
||||||
|
event: 'Event'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Event:
|
||||||
|
id: int
|
||||||
|
category: str
|
||||||
|
photo: Photo
|
||||||
|
title: typing.Optional[str] = None
|
||||||
|
description: typing.Optional[str] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def url(self):
|
||||||
|
return f'https://twitter.com/i/events/{self.id}'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class NewsletterCard(Card):
|
||||||
|
title: str
|
||||||
|
description: str
|
||||||
|
imageUrl: str
|
||||||
|
url: str
|
||||||
|
revueAccountId: int
|
||||||
|
issueCount: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class NewsletterIssueCard(Card):
|
||||||
|
newsletterTitle: str
|
||||||
|
newsletterDescription: str
|
||||||
|
issueTitle: str
|
||||||
|
issueNumber: int
|
||||||
|
url: str
|
||||||
|
revueAccountId: int
|
||||||
|
issueDescription: typing.Optional[str] = None
|
||||||
|
imageUrl: typing.Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class AmplifyCard(Card):
|
||||||
|
id: str
|
||||||
|
video: Video
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class AppPlayerCard(Card):
|
||||||
|
title: str
|
||||||
|
video: Video
|
||||||
|
appCategory: str
|
||||||
|
playerOwnerId: int
|
||||||
|
siteUser: typing.Optional['User'] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class SpacesCard(Card):
|
||||||
|
url: str
|
||||||
|
id: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class MessageMeCard(Card):
|
||||||
|
recipient: 'User'
|
||||||
|
url: str
|
||||||
|
buttonText: str
|
||||||
|
|
||||||
|
|
||||||
|
UnifiedCardComponentKey = str
|
||||||
|
UnifiedCardDestinationKey = str
|
||||||
|
UnifiedCardMediumKey = str
|
||||||
|
UnifiedCardAppKey = str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCard(Card):
|
||||||
|
componentObjects: typing.Dict[UnifiedCardComponentKey, 'UnifiedCardComponentObject']
|
||||||
|
destinations: typing.Dict[UnifiedCardDestinationKey, 'UnifiedCardDestination']
|
||||||
|
media: typing.Dict[UnifiedCardMediumKey, Medium]
|
||||||
|
apps: typing.Optional[typing.Dict[UnifiedCardAppKey, typing.List['UnifiedCardApp']]] = None
|
||||||
|
components: typing.Optional[typing.List[UnifiedCardComponentKey]] = None
|
||||||
|
swipeableLayoutSlides: typing.Optional[typing.List['UnifiedCardSwipeableLayoutSlide']] = None
|
||||||
|
type: typing.Optional[str] = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if (self.components is None) == (self.swipeableLayoutSlides is None):
|
||||||
|
raise ValueError('did not get exactly one of components or swipeableLayoutSlides')
|
||||||
|
if self.components and not all(k in self.componentObjects for k in self.components):
|
||||||
|
raise ValueError('missing components')
|
||||||
|
if self.swipeableLayoutSlides and not all(s.mediumComponentKey in self.componentObjects and s.componentKey in self.componentObjects for s in self.swipeableLayoutSlides):
|
||||||
|
raise ValueError('missing components')
|
||||||
|
if any(c.destinationKey not in self.destinations for c in self.componentObjects.values() if hasattr(c, 'destinationKey')):
|
||||||
|
raise ValueError('missing destinations')
|
||||||
|
if any(b.destinationKey not in self.destinations for c in self.componentObjects.values() if isinstance(c, UnifiedCardButtonGroupComponentObject) for b in c.buttons):
|
||||||
|
raise ValueError('missing destinations')
|
||||||
|
mediaKeys = []
|
||||||
|
for c in self.componentObjects.values():
|
||||||
|
if isinstance(c, UnifiedCardMediumComponentObject):
|
||||||
|
mediaKeys.append(c.mediumKey)
|
||||||
|
elif isinstance(c, UnifiedCardSwipeableMediaComponentObject):
|
||||||
|
mediaKeys.extend(x.mediumKey for x in c.media)
|
||||||
|
mediaKeys.extend(d.mediumKey for d in self.destinations.values() if d.mediumKey is not None)
|
||||||
|
mediaKeys.extend(a.iconMediumKey for l in (self.apps.values() if self.apps is not None else []) for a in l if a.iconMediumKey is not None)
|
||||||
|
if any(k not in self.media for k in mediaKeys):
|
||||||
|
raise ValueError('missing media')
|
||||||
|
if any(c.appKey not in self.apps for c in self.componentObjects.values() if hasattr(c, 'appKey')):
|
||||||
|
raise ValueError('missing apps')
|
||||||
|
if any(d.appKey not in self.apps for d in self.destinations.values() if d.appKey is not None):
|
||||||
|
raise ValueError('missing apps')
|
||||||
|
|
||||||
|
|
||||||
|
class UnifiedCardComponentObject:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardDetailComponentObject(UnifiedCardComponentObject):
|
||||||
|
content: str
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardMediumComponentObject(UnifiedCardComponentObject):
|
||||||
|
mediumKey: UnifiedCardMediumKey
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardButtonGroupComponentObject(UnifiedCardComponentObject):
|
||||||
|
buttons: typing.List['UnifiedCardButton']
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardButton:
|
||||||
|
text: str
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardSwipeableMediaComponentObject(UnifiedCardComponentObject):
|
||||||
|
media: typing.List['UnifiedCardSwipeableMediaMedium']
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardSwipeableMediaMedium:
|
||||||
|
mediumKey: UnifiedCardMediumKey
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardAppStoreComponentObject(UnifiedCardComponentObject):
|
||||||
|
appKey: UnifiedCardAppKey
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardTwitterListDetailsComponentObject(UnifiedCardComponentObject):
|
||||||
|
name: str
|
||||||
|
memberCount: int
|
||||||
|
subscriberCount: int
|
||||||
|
user: 'User'
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardTwitterCommunityDetailsComponentObject(UnifiedCardComponentObject):
|
||||||
|
name: str
|
||||||
|
theme: str
|
||||||
|
membersCount: int
|
||||||
|
destinationKey: UnifiedCardDestinationKey
|
||||||
|
membersFacepile: typing.Optional[typing.List['User']] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardDestination:
|
||||||
|
url: typing.Optional[str] = None
|
||||||
|
appKey: typing.Optional[UnifiedCardAppKey] = None
|
||||||
|
mediumKey: typing.Optional[UnifiedCardMediumKey] = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if (self.url is None) == (self.appKey is None):
|
||||||
|
raise ValueError('did not get exactly one of url and appKey')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardApp:
|
||||||
|
type: str
|
||||||
|
id: str
|
||||||
|
title: str
|
||||||
|
category: str
|
||||||
|
countryCode: str
|
||||||
|
url: str
|
||||||
|
description: typing.Optional[str] = None
|
||||||
|
iconMediumKey: typing.Optional[UnifiedCardMediumKey] = None
|
||||||
|
size: typing.Optional[int] = None
|
||||||
|
installs: typing.Optional[int] = None
|
||||||
|
ratingAverage: typing.Optional[float] = None
|
||||||
|
ratingCount: typing.Optional[int] = None
|
||||||
|
isFree: typing.Optional[bool] = None
|
||||||
|
isEditorsChoice: typing.Optional[bool] = None
|
||||||
|
hasInAppPurchases: typing.Optional[bool] = None
|
||||||
|
hasInAppAds: typing.Optional[bool] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UnifiedCardSwipeableLayoutSlide:
|
||||||
|
mediumComponentKey: UnifiedCardComponentKey
|
||||||
|
componentKey: UnifiedCardComponentKey
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
@@ -192,6 +480,11 @@ class UserLabel:
|
|||||||
longDescription: typing.Optional[str] = None
|
longDescription: typing.Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class UserRef:
|
||||||
|
id: int
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Trend(snscrape.base.Item):
|
class Trend(snscrape.base.Item):
|
||||||
name: str
|
name: str
|
||||||
@@ -287,7 +580,12 @@ class _CLIGuestTokenManager(GuestTokenManager):
|
|||||||
def reset(self):
|
def reset(self):
|
||||||
super().reset()
|
super().reset()
|
||||||
with self._lock:
|
with self._lock:
|
||||||
os.remove(self._file)
|
_logger.info(f'Deleting guest token file {self._file}')
|
||||||
|
try:
|
||||||
|
os.remove(self._file)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# Another process likely already removed the file
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class _TwitterAPIType(enum.Enum):
|
class _TwitterAPIType(enum.Enum):
|
||||||
@@ -339,7 +637,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
r = self._post('https://api.twitter.com/1.1/guest/activate.json', data = b'', headers = self._apiHeaders, responseOkCallback = self._check_guest_token_response)
|
r = self._post('https://api.twitter.com/1.1/guest/activate.json', data = b'', headers = self._apiHeaders, responseOkCallback = self._check_guest_token_response)
|
||||||
o = r.json()
|
o = r.json()
|
||||||
if not o.get('guest_token'):
|
if not o.get('guest_token'):
|
||||||
raise snscrape.base.ScraperError('Unable to retrieve guest token')
|
raise snscrape.base.ScraperException('Unable to retrieve guest token')
|
||||||
self._guestTokenManager.token = o['guest_token']
|
self._guestTokenManager.token = o['guest_token']
|
||||||
assert self._guestTokenManager.token
|
assert self._guestTokenManager.token
|
||||||
_logger.debug(f'Using guest token {self._guestTokenManager.token}')
|
_logger.debug(f'Using guest token {self._guestTokenManager.token}')
|
||||||
@@ -505,9 +803,13 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}')
|
raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}')
|
||||||
yield self._tweet_to_tweet(tweet, obj)
|
yield self._tweet_to_tweet(tweet, obj)
|
||||||
|
|
||||||
|
def _get_tweet_id(self, tweet):
|
||||||
|
return tweet['id'] if 'id' in tweet else int(tweet['id_str'])
|
||||||
|
|
||||||
def _make_tweet(self, tweet, user, retweetedTweet = None, quotedTweet = None, card = None):
|
def _make_tweet(self, tweet, user, retweetedTweet = None, quotedTweet = None, card = None):
|
||||||
|
tweetId = self._get_tweet_id(tweet)
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
kwargs['id'] = tweet['id'] if 'id' in tweet else int(tweet['id_str'])
|
kwargs['id'] = tweetId
|
||||||
kwargs['content'] = tweet['full_text']
|
kwargs['content'] = tweet['full_text']
|
||||||
kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities'].get('urls'))
|
kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities'].get('urls'))
|
||||||
kwargs['user'] = user
|
kwargs['user'] = user
|
||||||
@@ -515,7 +817,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
if tweet['entities'].get('urls'):
|
if tweet['entities'].get('urls'):
|
||||||
kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']]
|
kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']]
|
||||||
kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']]
|
kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']]
|
||||||
kwargs['url'] = f'https://twitter.com/{user.username}/status/{kwargs["id"]}'
|
kwargs['url'] = f'https://twitter.com/{user.username}/status/{tweetId}'
|
||||||
kwargs['replyCount'] = tweet['reply_count']
|
kwargs['replyCount'] = tweet['reply_count']
|
||||||
kwargs['retweetCount'] = tweet['retweet_count']
|
kwargs['retweetCount'] = tweet['retweet_count']
|
||||||
kwargs['likeCount'] = tweet['favorite_count']
|
kwargs['likeCount'] = tweet['favorite_count']
|
||||||
@@ -530,36 +832,8 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
if 'extended_entities' in tweet and 'media' in tweet['extended_entities']:
|
if 'extended_entities' in tweet and 'media' in tweet['extended_entities']:
|
||||||
media = []
|
media = []
|
||||||
for medium in tweet['extended_entities']['media']:
|
for medium in tweet['extended_entities']['media']:
|
||||||
if medium['type'] == 'photo':
|
if (mediumO := self._make_medium(medium, tweetId)):
|
||||||
if '.' not in medium['media_url_https']:
|
media.append(mediumO)
|
||||||
_logger.warning(f'Skipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot')
|
|
||||||
continue
|
|
||||||
baseUrl, format = medium['media_url_https'].rsplit('.', 1)
|
|
||||||
if format not in ('jpg', 'png'):
|
|
||||||
_logger.warning(f'Skipping photo with unknown format on tweet {kwargs["id"]}: {format!r}')
|
|
||||||
continue
|
|
||||||
media.append(Photo(
|
|
||||||
previewUrl = f'{baseUrl}?format={format}&name=small',
|
|
||||||
fullUrl = f'{baseUrl}?format={format}&name=large',
|
|
||||||
))
|
|
||||||
elif medium['type'] == 'video' or medium['type'] == 'animated_gif':
|
|
||||||
variants = []
|
|
||||||
for variant in medium['video_info']['variants']:
|
|
||||||
variants.append(VideoVariant(contentType = variant['content_type'], url = variant['url'], bitrate = variant.get('bitrate')))
|
|
||||||
mKwargs = {
|
|
||||||
'thumbnailUrl': medium['media_url_https'],
|
|
||||||
'variants': variants,
|
|
||||||
}
|
|
||||||
if medium['type'] == 'video':
|
|
||||||
mKwargs['duration'] = medium['video_info']['duration_millis'] / 1000
|
|
||||||
if (ext := medium.get('ext')) and (mediaStats := ext['mediaStats']) and isinstance(r := mediaStats['r'], dict) and 'ok' in r and isinstance(r['ok'], dict):
|
|
||||||
mKwargs['views'] = int(r['ok']['viewCount'])
|
|
||||||
elif (mediaStats := medium.get('mediaStats')):
|
|
||||||
mKwargs['views'] = mediaStats['viewCount']
|
|
||||||
cls = Video
|
|
||||||
elif medium['type'] == 'animated_gif':
|
|
||||||
cls = Gif
|
|
||||||
media.append(cls(**mKwargs))
|
|
||||||
if media:
|
if media:
|
||||||
kwargs['media'] = media
|
kwargs['media'] = media
|
||||||
if retweetedTweet:
|
if retweetedTweet:
|
||||||
@@ -600,31 +874,357 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
kwargs['cashtags'] = [o['text'] for o in tweet['entities']['symbols']]
|
kwargs['cashtags'] = [o['text'] for o in tweet['entities']['symbols']]
|
||||||
if card:
|
if card:
|
||||||
kwargs['card'] = card
|
kwargs['card'] = card
|
||||||
# Try to convert the URL to the non-shortened/t.co one
|
if hasattr(card, 'url') and '//t.co/' in card.url:
|
||||||
try:
|
# Try to convert the URL to the non-shortened/t.co one
|
||||||
i = kwargs['tcooutlinks'].index(card.url)
|
# Retweets inherit the card but not the outlinks; try to get them from the retweeted tweet instead in that case.
|
||||||
except ValueError:
|
if 'tcooutlinks' in kwargs and card.url in kwargs['tcooutlinks']:
|
||||||
_logger.warning('Could not find card URL in tcooutlinks')
|
card.url = kwargs['outlinks'][kwargs['tcooutlinks'].index(card.url)]
|
||||||
else:
|
elif retweetedTweet and retweetedTweet.tcooutlinks and card.url in retweetedTweet.tcooutlinks:
|
||||||
card.url = kwargs['outlinks'][i]
|
card.url = retweetedTweet.outlinks[retweetedTweet.tcooutlinks.index(card.url)]
|
||||||
|
else:
|
||||||
|
_logger.warning(f'Could not translate t.co card URL on tweet {tweetId}')
|
||||||
return Tweet(**kwargs)
|
return Tweet(**kwargs)
|
||||||
|
|
||||||
def _make_card(self, card, apiType):
|
def _make_medium(self, medium, tweetId):
|
||||||
cardKwargs = {}
|
if medium['type'] == 'photo':
|
||||||
for key, kwarg in [('title', 'title'), ('description', 'description'), ('card_url', 'url'), ('thumbnail_image_original', 'thumbnailUrl')]:
|
if '?format=' in medium['media_url_https'] or '&format=' in medium['media_url_https']:
|
||||||
if apiType is _TwitterAPIType.V2:
|
return Photo(previewUrl = medium['media_url_https'], fullUrl = medium['media_url_https'])
|
||||||
value = card['binding_values'].get(key)
|
if '.' not in medium['media_url_https']:
|
||||||
elif apiType is _TwitterAPIType.GRAPHQL:
|
_logger.warning(f'Skipping malformed medium URL on tweet {tweetId}: {medium["media_url_https"]!r} contains no dot')
|
||||||
value = next((o['value'] for o in card['legacy']['binding_values'] if o['key'] == key), None)
|
return
|
||||||
if not value:
|
baseUrl, format = medium['media_url_https'].rsplit('.', 1)
|
||||||
|
if format not in ('jpg', 'png'):
|
||||||
|
_logger.warning(f'Skipping photo with unknown format on tweet {tweetId}: {format!r}')
|
||||||
|
return
|
||||||
|
return Photo(
|
||||||
|
previewUrl = f'{baseUrl}?format={format}&name=small',
|
||||||
|
fullUrl = f'{baseUrl}?format={format}&name=large',
|
||||||
|
)
|
||||||
|
elif medium['type'] == 'video' or medium['type'] == 'animated_gif':
|
||||||
|
variants = []
|
||||||
|
for variant in medium['video_info']['variants']:
|
||||||
|
variants.append(VideoVariant(contentType = variant['content_type'], url = variant['url'], bitrate = variant.get('bitrate')))
|
||||||
|
mKwargs = {
|
||||||
|
'thumbnailUrl': medium['media_url_https'],
|
||||||
|
'variants': variants,
|
||||||
|
}
|
||||||
|
if medium['type'] == 'video':
|
||||||
|
mKwargs['duration'] = medium['video_info']['duration_millis'] / 1000
|
||||||
|
if (ext := medium.get('ext')) and (mediaStats := ext.get('mediaStats')) and isinstance(r := mediaStats['r'], dict) and 'ok' in r and isinstance(r['ok'], dict):
|
||||||
|
mKwargs['views'] = int(r['ok']['viewCount'])
|
||||||
|
elif (mediaStats := medium.get('mediaStats')):
|
||||||
|
mKwargs['views'] = mediaStats['viewCount']
|
||||||
|
cls = Video
|
||||||
|
elif medium['type'] == 'animated_gif':
|
||||||
|
cls = Gif
|
||||||
|
return cls(**mKwargs)
|
||||||
|
else:
|
||||||
|
_logger.warning(f'Unsupported medium type on tweet {tweetId}: {medium["type"]!r}')
|
||||||
|
|
||||||
|
def _make_card(self, card, apiType, tweetId):
|
||||||
|
bindingValues = {}
|
||||||
|
|
||||||
|
def _kwargs_from_map(keyKwargMap):
|
||||||
|
nonlocal bindingValues
|
||||||
|
return {kwarg: bindingValues[key] for key, kwarg in keyKwargMap.items() if key in bindingValues}
|
||||||
|
|
||||||
|
userRefs = {}
|
||||||
|
if apiType is _TwitterAPIType.V2:
|
||||||
|
for o in card.get('users', {}).values():
|
||||||
|
userId = o['id']
|
||||||
|
assert userId not in userRefs
|
||||||
|
userRefs[userId] = self._user_to_user(o)
|
||||||
|
elif apiType is _TwitterAPIType.GRAPHQL:
|
||||||
|
for o in card['legacy'].get('user_refs', {}):
|
||||||
|
userId = int(o['rest_id'])
|
||||||
|
if userId in userRefs:
|
||||||
|
_logger.warning(f'Duplicate user {userId} in card on tweet {tweetId}')
|
||||||
|
continue
|
||||||
|
if 'legacy' in o:
|
||||||
|
userRefs[userId] = self._user_to_user(o['legacy'], id_ = userId)
|
||||||
|
else:
|
||||||
|
userRefs[userId] = UserRef(id = userId)
|
||||||
|
|
||||||
|
if apiType is _TwitterAPIType.V2:
|
||||||
|
messyBindingValues = card['binding_values'].items()
|
||||||
|
elif apiType is _TwitterAPIType.GRAPHQL:
|
||||||
|
messyBindingValues = ((x['key'], x['value']) for x in card['legacy']['binding_values'])
|
||||||
|
for key, value in messyBindingValues:
|
||||||
|
if 'type' not in value:
|
||||||
|
# Silently ignore creator/site entries since they frequently appear like this.
|
||||||
|
if key not in ('creator', 'site'):
|
||||||
|
_logger.warning(f'Skipping type-less card value {key!r} on tweet {tweetId}')
|
||||||
continue
|
continue
|
||||||
if value['type'] == 'STRING':
|
if value['type'] == 'STRING':
|
||||||
cardKwargs[kwarg] = value['string_value']
|
bindingValues[key] = value['string_value']
|
||||||
|
if key.endswith('_datetime_utc'):
|
||||||
|
bindingValues[key] = datetime.datetime.strptime(bindingValues[key], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo = datetime.timezone.utc)
|
||||||
elif value['type'] == 'IMAGE':
|
elif value['type'] == 'IMAGE':
|
||||||
cardKwargs[kwarg] = value['image_value']['url']
|
bindingValues[key] = value['image_value']['url']
|
||||||
|
elif value['type'] == 'IMAGE_COLOR':
|
||||||
|
# Silently discard this.
|
||||||
|
pass
|
||||||
|
elif value['type'] == 'BOOLEAN':
|
||||||
|
bindingValues[key] = value['boolean_value']
|
||||||
|
elif value['type'] == 'USER':
|
||||||
|
bindingValues[key] = userRefs[int(value['user_value']['id_str'])]
|
||||||
else:
|
else:
|
||||||
raise snscrape.base.ScraperError(f'Unknown card value type: {value["type"]!r}')
|
_logger.warning(f'Unsupported card value type on {key!r} on tweet {tweetId}: {value["type"]!r}')
|
||||||
return Card(**cardKwargs)
|
|
||||||
|
if apiType is _TwitterAPIType.V2:
|
||||||
|
cardName = card['name']
|
||||||
|
elif apiType is _TwitterAPIType.GRAPHQL:
|
||||||
|
cardName = card['legacy']['name']
|
||||||
|
|
||||||
|
if cardName in ('summary', 'summary_large_image', 'app', 'direct_store_link_app'):
|
||||||
|
keyKwargMap = {
|
||||||
|
'title': 'title',
|
||||||
|
'description': 'description',
|
||||||
|
'card_url': 'url',
|
||||||
|
'site': 'siteUser',
|
||||||
|
'creator': 'creatorUser',
|
||||||
|
}
|
||||||
|
if cardName in ('app', 'direct_store_link_app'):
|
||||||
|
keyKwargMap['thumbnail_original'] = 'thumbnailUrl'
|
||||||
|
return AppCard(**_kwargs_from_map(keyKwargMap))
|
||||||
|
else:
|
||||||
|
keyKwargMap['thumbnail_image_original'] = 'thumbnailUrl'
|
||||||
|
return SummaryCard(**_kwargs_from_map(keyKwargMap))
|
||||||
|
elif any(cardName.startswith(x) for x in ('poll2choice_', 'poll3choice_', 'poll4choice_')) and cardName.split('_', 1)[1] in ('text_only', 'image', 'video'):
|
||||||
|
kwargs = _kwargs_from_map({'end_datetime_utc': 'endDate', 'last_updated_datetime_utc': 'lastUpdateDate', 'duration_minutes': 'duration', 'counts_are_final': 'finalResults'})
|
||||||
|
|
||||||
|
options = []
|
||||||
|
for key in sorted(bindingValues):
|
||||||
|
if key.startswith('choice') and key.endswith('_label'):
|
||||||
|
optKwargs = {'label': bindingValues[key]}
|
||||||
|
if (count := bindingValues.get(f'{key[:-5]}count')):
|
||||||
|
optKwargs['count'] = int(count)
|
||||||
|
options.append(PollOption(**optKwargs))
|
||||||
|
kwargs['options'] = options
|
||||||
|
kwargs['duration'] = int(kwargs['duration'])
|
||||||
|
|
||||||
|
if cardName.endswith('_image'):
|
||||||
|
kwargs['medium'] = Photo(previewUrl = bindingValues['image_small'], fullUrl = bindingValues['image_original'])
|
||||||
|
elif cardName.endswith('_video'):
|
||||||
|
variants = []
|
||||||
|
variants.append(VideoVariant(contentType = 'application/x-mpegurl', url = bindingValues['player_hls_url'], bitrate = None))
|
||||||
|
if 'vmap' not in bindingValues['player_stream_url']:
|
||||||
|
_logger.warning(f'Non-VMAP URL in {cardName} player_stream_url on tweet {tweetId}')
|
||||||
|
variants.append(VideoVariant(contentType = 'text/xml', url = bindingValues['player_stream_url'], bitrate = None))
|
||||||
|
kwargs['medium'] = Video(thumbnailUrl = bindingValues['player_image_original'], variants = variants, duration = int(bindingValues['content_duration_seconds']))
|
||||||
|
|
||||||
|
return PollCard(**kwargs)
|
||||||
|
elif cardName == 'player':
|
||||||
|
return PlayerCard(**_kwargs_from_map({'title': 'title', 'description': 'description', 'card_url': 'url', 'player_image_original': 'imageUrl', 'site': 'siteUser'}))
|
||||||
|
elif cardName in ('promo_image_convo', 'promo_video_convo'):
|
||||||
|
kwargs = _kwargs_from_map({'thank_you_text': 'thankYouText', 'thank_you_url': 'thankYouUrl', 'thank_you_shortened_url': 'thankYouTcoUrl'})
|
||||||
|
kwargs['actions'] = []
|
||||||
|
for l in ('one', 'two', 'three', 'four'):
|
||||||
|
if f'cta_{l}' in bindingValues:
|
||||||
|
kwargs['actions'].append(PromoConvoAction(label = bindingValues[f'cta_{l}'], tweet = bindingValues[f'cta_{l}_tweet']))
|
||||||
|
if 'image' in cardName:
|
||||||
|
kwargs['medium'] = Photo(previewUrl = bindingValues['promo_image_small'], fullUrl = bindingValues['promo_image_original'])
|
||||||
|
if 'cover_promo_image' in bindingValues:
|
||||||
|
kwargs['cover'] = Photo(previewUrl = bindingValues['cover_promo_image_small'], fullUrl = bindingValues['cover_promo_image_original'])
|
||||||
|
elif 'video' in cardName:
|
||||||
|
variants = []
|
||||||
|
variants.append(VideoVariant(contentType = bindingValues['player_stream_content_type'], url = bindingValues['player_stream_url'], bitrate = None))
|
||||||
|
if bindingValues['player_stream_url'] != bindingValues['player_url']:
|
||||||
|
if 'vmap' not in bindingValues['player_url']:
|
||||||
|
_logger.warning(f'Non-VMAP URL in {cardName} player_url on tweet {tweetId}')
|
||||||
|
variants.append(VideoVariant(contentType = 'text/xml', url = bindingValues['player_url'], bitrate = None))
|
||||||
|
kwargs['medium'] = Video(thumbnailUrl = bindingValues['player_image_original'], variants = variants, duration = int(bindingValues['content_duration_seconds']))
|
||||||
|
return PromoConvoCard(**kwargs)
|
||||||
|
elif cardName in ('745291183405076480:broadcast', '3691233323:periscope_broadcast'):
|
||||||
|
keyKwargMap = {'broadcast_state': 'state', 'broadcast_source': 'source', 'site': 'siteUser'}
|
||||||
|
if cardName == '745291183405076480:broadcast':
|
||||||
|
keyKwargMap = {**keyKwargMap, 'broadcast_id': 'id', 'broadcast_url': 'url', 'broadcast_title': 'title', 'broadcast_thumbnail_original': 'thumbnailUrl'}
|
||||||
|
else:
|
||||||
|
keyKwargMap = {**keyKwargMap, 'id': 'id', 'url': 'url', 'title': 'title', 'description': 'description', 'total_participants': 'totalParticipants', 'full_size_thumbnail_url': 'thumbnailUrl'}
|
||||||
|
kwargs = _kwargs_from_map(keyKwargMap)
|
||||||
|
if 'broadcaster_twitter_id' in bindingValues:
|
||||||
|
kwargs['broadcaster'] = User(id = int(bindingValues['broadcaster_twitter_id']), username = bindingValues['broadcaster_username'], displayname = bindingValues['broadcaster_display_name'])
|
||||||
|
if 'siteUser' not in kwargs:
|
||||||
|
kwargs['siteUser'] = None
|
||||||
|
if cardName == '745291183405076480:broadcast':
|
||||||
|
return BroadcastCard(**kwargs)
|
||||||
|
else:
|
||||||
|
kwargs['totalParticipants'] = int(kwargs['totalParticipants'])
|
||||||
|
return PeriscopeBroadcastCard(**kwargs)
|
||||||
|
elif cardName == '745291183405076480:live_event':
|
||||||
|
kwargs = _kwargs_from_map({'event_id': 'id', 'event_title': 'title', 'event_category': 'category', 'event_subtitle': 'description'})
|
||||||
|
kwargs['id'] = int(kwargs['id'])
|
||||||
|
kwargs['photo'] = Photo(previewUrl = bindingValues['event_thumbnail_small'], fullUrl = bindingValues.get('event_thumbnail_original') or bindingValues['event_thumbnail'])
|
||||||
|
return EventCard(event = Event(**kwargs))
|
||||||
|
elif cardName == '3337203208:newsletter_publication':
|
||||||
|
kwargs = _kwargs_from_map({'newsletter_title': 'title', 'newsletter_description': 'description', 'newsletter_image_original': 'imageUrl', 'card_url': 'url', 'revue_account_id': 'revueAccountId', 'issue_count': 'issueCount'})
|
||||||
|
kwargs['revueAccountId'] = int(kwargs['revueAccountId'])
|
||||||
|
kwargs['issueCount'] = int(kwargs['issueCount'])
|
||||||
|
return NewsletterCard(**kwargs)
|
||||||
|
elif cardName == '3337203208:newsletter_issue':
|
||||||
|
kwargs = _kwargs_from_map({
|
||||||
|
'newsletter_title': 'newsletterTitle',
|
||||||
|
'newsletter_description': 'newsletterDescription',
|
||||||
|
'issue_title': 'issueTitle',
|
||||||
|
'issue_description': 'issueDescription',
|
||||||
|
'issue_number': 'issueNumber',
|
||||||
|
'issue_image_original': 'imageUrl',
|
||||||
|
'card_url': 'url',
|
||||||
|
'revue_account_id': 'revueAccountId'
|
||||||
|
})
|
||||||
|
kwargs['issueNumber'] = int(kwargs['issueNumber'])
|
||||||
|
kwargs['revueAccountId'] = int(kwargs['revueAccountId'])
|
||||||
|
return NewsletterIssueCard(**kwargs)
|
||||||
|
elif cardName == 'amplify':
|
||||||
|
return AmplifyCard(
|
||||||
|
id = bindingValues['amplify_content_id'],
|
||||||
|
video = Video(
|
||||||
|
thumbnailUrl = bindingValues['player_image'],
|
||||||
|
variants = [VideoVariant(contentType = bindingValues['player_stream_content_type'], url = bindingValues['amplify_url_vmap'], bitrate = None)],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
elif cardName == 'appplayer':
|
||||||
|
kwargs = _kwargs_from_map({'title': 'title', 'app_category': 'appCategory', 'player_owner_id': 'playerOwnerId', 'site': 'siteUser'})
|
||||||
|
kwargs['playerOwnerId'] = int(kwargs['playerOwnerId'])
|
||||||
|
variants = []
|
||||||
|
variants.append(VideoVariant(contentType = 'application/x-mpegurl', url = bindingValues['player_hls_url'], bitrate = None))
|
||||||
|
if 'vmap' not in bindingValues['player_url']:
|
||||||
|
_logger.warning(f'Non-VMAP URL in {cardName} player_url on tweet {tweetId}')
|
||||||
|
variants.append(VideoVariant(contentType = 'text/xml', url = bindingValues['player_url'], bitrate = None))
|
||||||
|
kwargs['video'] = Video(thumbnailUrl = bindingValues['player_image_original'], variants = variants, duration = int(bindingValues['content_duration_seconds']))
|
||||||
|
return AppPlayerCard(**kwargs)
|
||||||
|
elif cardName == '3691233323:audiospace':
|
||||||
|
return SpacesCard(**_kwargs_from_map({'card_url': 'url', 'id': 'id'}))
|
||||||
|
elif cardName == '2586390716:message_me':
|
||||||
|
# Note that the strings in Twitter's JS appear to have an incorrect mapping that then gets changed somewhere in the 1.8 MiB of JS!
|
||||||
|
# cta_1, 3, and 4 should mean 'Message us', 'Send a private message', and 'Send me a private message', but the correct mapping is currently unknown.
|
||||||
|
ctas = {'message_me_card_cta_2': 'Send us a private message'}
|
||||||
|
if bindingValues['cta'] not in ctas:
|
||||||
|
_logger.warning(f'Unsupported message_me card cta on tweet {tweetId}: {bindingValues["cta"]!r}')
|
||||||
|
return
|
||||||
|
return MessageMeCard(**_kwargs_from_map({'recipient': 'recipient', 'card_url': 'url'}), buttonText = ctas[bindingValues['cta']])
|
||||||
|
elif cardName == 'unified_card':
|
||||||
|
o = json.loads(bindingValues['unified_card'])
|
||||||
|
kwargs = {}
|
||||||
|
if 'type' in o:
|
||||||
|
unifiedCardType = o.get('type')
|
||||||
|
if unifiedCardType not in (
|
||||||
|
'image_app',
|
||||||
|
'image_carousel_app',
|
||||||
|
'image_carousel_website',
|
||||||
|
'image_multi_dest_carousel_website',
|
||||||
|
'image_website',
|
||||||
|
'mixed_media_multi_dest_carousel_website',
|
||||||
|
'mixed_media_single_dest_carousel_app',
|
||||||
|
'mixed_media_single_dest_carousel_website',
|
||||||
|
'video_app',
|
||||||
|
'video_carousel_app',
|
||||||
|
'video_carousel_website',
|
||||||
|
'video_multi_dest_carousel_website',
|
||||||
|
'video_website',
|
||||||
|
):
|
||||||
|
_logger.warning(f'Unsupported unified_card type on tweet {tweetId}: {unifiedCardType!r}')
|
||||||
|
return
|
||||||
|
kwargs['type'] = unifiedCardType
|
||||||
|
elif set(c['type'] for c in o['component_objects'].values()) not in ({'media', 'twitter_list_details'}, {'media', 'community_details'}):
|
||||||
|
_logger.warning(f'Unsupported unified_card type on tweet {tweetId}')
|
||||||
|
return
|
||||||
|
|
||||||
|
kwargs['componentObjects'] = {}
|
||||||
|
for k, v in o['component_objects'].items():
|
||||||
|
if v['type'] == 'details':
|
||||||
|
co = UnifiedCardDetailComponentObject(content = v['data']['title']['content'], destinationKey = v['data']['destination'])
|
||||||
|
elif v['type'] == 'media':
|
||||||
|
co = UnifiedCardMediumComponentObject(mediumKey = v['data']['id'], destinationKey = v['data']['destination'])
|
||||||
|
elif v['type'] == 'button_group':
|
||||||
|
if not all(b['type'] == 'cta' for b in v['data']['buttons']):
|
||||||
|
_logger.warning(f'Unsupported unified_card button_group button type on tweet {tweetId}')
|
||||||
|
return
|
||||||
|
buttons = [UnifiedCardButton(text = b['action'][0].upper() + re.sub('[A-Z]', lambda x: f' {x[0]}', b['action'][1:]), destinationKey = b['destination']) for b in v['data']['buttons']]
|
||||||
|
co = UnifiedCardButtonGroupComponentObject(buttons = buttons)
|
||||||
|
elif v['type'] == 'swipeable_media':
|
||||||
|
media = [UnifiedCardSwipeableMediaMedium(mediumKey = m['id'], destinationKey = m['destination']) for m in v['data']['media_list']]
|
||||||
|
co = UnifiedCardSwipeableMediaComponentObject(media = media)
|
||||||
|
elif v['type'] == 'app_store_details':
|
||||||
|
co = UnifiedCardAppStoreComponentObject(appKey = v['data']['app_id'], destinationKey = v['data']['destination'])
|
||||||
|
elif v['type'] == 'twitter_list_details':
|
||||||
|
co = UnifiedCardTwitterListDetailsComponentObject(
|
||||||
|
name = v['data']['name']['content'],
|
||||||
|
memberCount = v['data']['member_count'],
|
||||||
|
subscriberCount = v['data']['subscriber_count'],
|
||||||
|
user = self._user_to_user(o['users'][v['data']['user_id']]),
|
||||||
|
destinationKey = v['data']['destination'],
|
||||||
|
)
|
||||||
|
elif v['type'] == 'community_details':
|
||||||
|
co = UnifiedCardTwitterCommunityDetailsComponentObject(
|
||||||
|
name = v['data']['name']['content'],
|
||||||
|
theme = v['data']['theme'],
|
||||||
|
membersCount = v['data']['member_count'],
|
||||||
|
destinationKey = v['data']['destination'],
|
||||||
|
membersFacepile = [self._user_to_user(u) for u in map(o['users'].get, v['data']['members_facepile']) if u],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_logger.warning(f'Unsupported unified_card component type on tweet {tweetId}: {v["type"]!r}')
|
||||||
|
return
|
||||||
|
kwargs['componentObjects'][k] = co
|
||||||
|
|
||||||
|
kwargs['destinations'] = {}
|
||||||
|
for k, v in o['destination_objects'].items():
|
||||||
|
dKwargs = {}
|
||||||
|
if 'url_data' in v['data']:
|
||||||
|
dKwargs['url'] = v['data']['url_data']['url']
|
||||||
|
if 'app_id' in v['data']:
|
||||||
|
dKwargs['appKey'] = v['data']['app_id']
|
||||||
|
if 'media_id' in v['data']:
|
||||||
|
dKwargs['mediumKey'] = v['data']['media_id']
|
||||||
|
kwargs['destinations'][k] = UnifiedCardDestination(**dKwargs)
|
||||||
|
|
||||||
|
kwargs['media'] = {}
|
||||||
|
for k, v in o['media_entities'].items():
|
||||||
|
if (medium := self._make_medium(v, tweetId)):
|
||||||
|
kwargs['media'][k] = medium
|
||||||
|
|
||||||
|
if 'app_store_data' in o:
|
||||||
|
kwargs['apps'] = {}
|
||||||
|
for k, v in o['app_store_data'].items():
|
||||||
|
variants = []
|
||||||
|
for var in v:
|
||||||
|
vKwargsMap = {
|
||||||
|
'type': 'type',
|
||||||
|
'id': 'id',
|
||||||
|
'icon_media_key': 'iconMediumKey',
|
||||||
|
'country_code': 'countryCode',
|
||||||
|
'num_installs': 'installs',
|
||||||
|
'size_bytes': 'size',
|
||||||
|
'is_free': 'isFree',
|
||||||
|
'is_editors_choice': 'isEditorsChoice',
|
||||||
|
'has_in_app_purchases': 'hasInAppPurchases',
|
||||||
|
'has_in_app_ads': 'hasInAppAds',
|
||||||
|
}
|
||||||
|
vKwargs = {kwarg: var[key] for key, kwarg in vKwargsMap.items() if key in var}
|
||||||
|
vKwargs['title'] = var['title']['content']
|
||||||
|
if 'description' in var:
|
||||||
|
vKwargs['description'] = var['description']['content']
|
||||||
|
vKwargs['category'] = var['category']['content']
|
||||||
|
if (ratings := var['ratings']):
|
||||||
|
vKwargs['ratingAverage'] = var['ratings']['star']
|
||||||
|
vKwargs['ratingCount'] = var['ratings']['count']
|
||||||
|
vKwargs['url'] = f'https://play.google.com/store/apps/details?id={var["id"]}' if var['type'] == 'android_app' else f'https://itunes.apple.com/app/id{var["id"]}'
|
||||||
|
variants.append(UnifiedCardApp(**vKwargs))
|
||||||
|
kwargs['apps'][k] = variants
|
||||||
|
|
||||||
|
if o['components']:
|
||||||
|
kwargs['components'] = o['components']
|
||||||
|
|
||||||
|
if 'layout' in o:
|
||||||
|
if o['layout']['type'] != 'swipeable':
|
||||||
|
_logger.warning(f'Unsupported unified_card layout type on tweet {tweetId}: {o["layout"]["type"]!r}')
|
||||||
|
return
|
||||||
|
kwargs['swipeableLayoutSlides'] = [UnifiedCardSwipeableLayoutSlide(mediumComponentKey = v[0], componentKey = v[1]) for v in o['layout']['data']['slides']]
|
||||||
|
|
||||||
|
return UnifiedCard(**kwargs)
|
||||||
|
|
||||||
|
_logger.warning(f'Unsupported card type on tweet {tweetId}: {cardName!r}')
|
||||||
|
|
||||||
def _tweet_to_tweet(self, tweet, obj):
|
def _tweet_to_tweet(self, tweet, obj):
|
||||||
user = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']])
|
user = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']])
|
||||||
@@ -634,7 +1234,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
if 'quoted_status_id_str' in tweet and tweet['quoted_status_id_str'] in obj['globalObjects']['tweets']:
|
if 'quoted_status_id_str' in tweet and tweet['quoted_status_id_str'] in obj['globalObjects']['tweets']:
|
||||||
kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj)
|
kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj)
|
||||||
if 'card' in tweet:
|
if 'card' in tweet:
|
||||||
kwargs['card'] = self._make_card(tweet['card'], _TwitterAPIType.V2)
|
kwargs['card'] = self._make_card(tweet['card'], _TwitterAPIType.V2, self._get_tweet_id(tweet))
|
||||||
return self._make_tweet(tweet, user, **kwargs)
|
return self._make_tweet(tweet, user, **kwargs)
|
||||||
|
|
||||||
def _graphql_timeline_tweet_item_result_to_tweet(self, result):
|
def _graphql_timeline_tweet_item_result_to_tweet(self, result):
|
||||||
@@ -644,7 +1244,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
#TODO Include result['softInterventionPivot'] in the Tweet object
|
#TODO Include result['softInterventionPivot'] in the Tweet object
|
||||||
result = result['tweet']
|
result = result['tweet']
|
||||||
else:
|
else:
|
||||||
raise snscrape.base.ScraperError(f'Unknown result type {result["__typename"]!r}')
|
raise snscrape.base.ScraperException(f'Unknown result type {result["__typename"]!r}')
|
||||||
tweet = result['legacy']
|
tweet = result['legacy']
|
||||||
userId = int(result['core']['user_results']['result']['rest_id'])
|
userId = int(result['core']['user_results']['result']['rest_id'])
|
||||||
user = self._user_to_user(result['core']['user_results']['result']['legacy'], id_ = userId)
|
user = self._user_to_user(result['core']['user_results']['result']['legacy'], id_ = userId)
|
||||||
@@ -661,8 +1261,10 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
|
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
|
||||||
else:
|
else:
|
||||||
kwargs['quotedTweet'] = TweetRef(id = int(result['quotedRefResult']['result']['rest_id']))
|
kwargs['quotedTweet'] = TweetRef(id = int(result['quotedRefResult']['result']['rest_id']))
|
||||||
|
elif 'quoted_status_id_str' in tweet:
|
||||||
|
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
|
||||||
if 'card' in result:
|
if 'card' in result:
|
||||||
kwargs['card'] = self._make_card(result['card'], _TwitterAPIType.GRAPHQL)
|
kwargs['card'] = self._make_card(result['card'], _TwitterAPIType.GRAPHQL, self._get_tweet_id(tweet))
|
||||||
return self._make_tweet(tweet, user, **kwargs)
|
return self._make_tweet(tweet, user, **kwargs)
|
||||||
|
|
||||||
def _graphql_timeline_instructions_to_tweets(self, instructions, includeConversationThreads = False):
|
def _graphql_timeline_instructions_to_tweets(self, instructions, includeConversationThreads = False):
|
||||||
|
|||||||
@@ -177,11 +177,15 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
continue
|
continue
|
||||||
if 'data-video' in a.attrs:
|
if 'data-video' in a.attrs:
|
||||||
# Video
|
# Video
|
||||||
|
if 'data-link-attr' in a.attrs:
|
||||||
|
hrefUrl = urllib.parse.unquote(a.attrs['data-link-attr'].split('to=')[1].split('&')[0])
|
||||||
|
else:
|
||||||
|
hrefUrl = f'https://vk.com{a["href"]}'
|
||||||
video = Video(
|
video = Video(
|
||||||
id = a['data-video'],
|
id = a['data-video'],
|
||||||
list = a['data-list'],
|
list = a['data-list'],
|
||||||
duration = int(a['data-duration']),
|
duration = int(a['data-duration']),
|
||||||
url = f'https://vk.com{a["href"]}',
|
url = hrefUrl,
|
||||||
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ class WeiboUserScraper(snscrape.base.Scraper):
|
|||||||
_logger.warning('User does not exist')
|
_logger.warning('User does not exist')
|
||||||
self._user = _userDoesNotExist
|
self._user = _userDoesNotExist
|
||||||
else:
|
else:
|
||||||
raise snscrape.base.ScraperError(f'Got unexpected response on resolving username ({r.status_code})')
|
raise snscrape.base.ScraperException(f'Got unexpected response on resolving username ({r.status_code})')
|
||||||
|
|
||||||
def _check_timeline_response(self, r):
|
def _check_timeline_response(self, r):
|
||||||
if r.status_code == 200 and r.content == b'{"ok":0,"msg":"\\u8fd9\\u91cc\\u8fd8\\u6ca1\\u6709\\u5185\\u5bb9","data":{"cards":[]}}':
|
if r.status_code == 200 and r.content == b'{"ok":0,"msg":"\\u8fd9\\u91cc\\u8fd8\\u6ca1\\u6709\\u5185\\u5bb9","data":{"cards":[]}}':
|
||||||
|
|||||||
Reference in New Issue
Block a user