mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-11 03:48:29 +03:00
Compare commits
18 Commits
add-vk-use
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40b8d9f267 | ||
|
|
fdc40f7411 | ||
|
|
82351800d6 | ||
|
|
73f10a4f24 | ||
|
|
d72b51953f | ||
|
|
056cd6215c | ||
|
|
d5b406bc1b | ||
|
|
56e4232083 | ||
|
|
50899c01f3 | ||
|
|
bcad6923c2 | ||
|
|
0d361685ff | ||
|
|
530f4fa122 | ||
|
|
dc6bc9bf9d | ||
|
|
01cf6a09b3 | ||
|
|
ef7c4fad3e | ||
|
|
65723f10ff | ||
|
|
07a5f6fd7d | ||
|
|
b8efce2a12 |
@@ -229,7 +229,7 @@ class Scraper:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._construct(args)
|
return cls._cli_construct(args)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cli_construct(cls, argparseArgs, *args, **kwargs):
|
def _cli_construct(cls, argparseArgs, *args, **kwargs):
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import re
|
|||||||
import snscrape.base
|
import snscrape.base
|
||||||
import typing
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
import base64
|
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
_SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
|
||||||
@@ -57,7 +56,7 @@ class TelegramPost(snscrape.base.Item):
|
|||||||
forwarded: typing.Optional['Channel'] = None
|
forwarded: typing.Optional['Channel'] = None
|
||||||
forwardedUrl: typing.Optional[str] = None
|
forwardedUrl: typing.Optional[str] = None
|
||||||
media: typing.Optional[typing.List['Medium']] = None
|
media: typing.Optional[typing.List['Medium']] = None
|
||||||
views: typing.Optional[int] = None
|
views: typing.Optional[snscrape.base.IntWithGranularity] = None
|
||||||
linkPreview: typing.Optional[LinkPreview] = None
|
linkPreview: typing.Optional[LinkPreview] = None
|
||||||
|
|
||||||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
|
||||||
@@ -176,7 +175,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
|
for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
|
||||||
audioUrl = voicePlayer.find('audio')['src']
|
audioUrl = voicePlayer.find('audio')['src']
|
||||||
durationStr = voicePlayer.find('time').text
|
durationStr = voicePlayer.find('time').text
|
||||||
duration = durationStrToSeconds(durationStr)
|
duration = _durationStrToSeconds(durationStr)
|
||||||
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
|
barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
|
||||||
|
|
||||||
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
|
media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
|
||||||
@@ -201,7 +200,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
cls = Video
|
cls = Video
|
||||||
durationStr = videoPlayer.find('time').text
|
durationStr = videoPlayer.find('time').text
|
||||||
mKwargs['duration'] = durationStrToSeconds(durationStr)
|
mKwargs['duration'] = _durationStrToSeconds(durationStr)
|
||||||
media.append(cls(**mKwargs))
|
media.append(cls(**mKwargs))
|
||||||
|
|
||||||
linkPreview = None
|
linkPreview = None
|
||||||
@@ -224,7 +223,12 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
outlinks.remove(kwargs['href'])
|
outlinks.remove(kwargs['href'])
|
||||||
|
|
||||||
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
|
||||||
views = None if viewsSpan is None else parse_num(viewsSpan.text)
|
views = None if viewsSpan is None else _parse_num(viewsSpan.text)
|
||||||
|
|
||||||
|
outlinks = outlinks if outlinks else None
|
||||||
|
media = media if media else None
|
||||||
|
mentions = mentions if mentions else None
|
||||||
|
hashtags = hashtags if hashtags else None
|
||||||
|
|
||||||
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
|
yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
|
||||||
|
|
||||||
@@ -253,7 +257,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
|
||||||
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
|
r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
@@ -266,8 +270,12 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||||
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
membersDiv = soup.find('div', class_ = 'tgme_page_extra')
|
||||||
if membersDiv.text.endswith((' members', ' subscribers')):
|
if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')):
|
||||||
kwargs['members'] = int(''.join(membersDiv.text.split(' ')[:-1]))
|
membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1])
|
||||||
|
if membersStr == 'no':
|
||||||
|
kwargs['members'] = 0
|
||||||
|
else:
|
||||||
|
kwargs['members'] = int(membersStr)
|
||||||
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
|
photoImg = soup.find('img', class_ = 'tgme_page_photo_image')
|
||||||
if photoImg is not None:
|
if photoImg is not None:
|
||||||
kwargs['photo'] = photoImg.attrs['src']
|
kwargs['photo'] = photoImg.attrs['src']
|
||||||
@@ -294,7 +302,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
kwargs['description'] = descriptionDiv.text
|
kwargs['description'] = descriptionDiv.text
|
||||||
|
|
||||||
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
|
||||||
value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
|
value, granularity = _parse_num(div.find('span', class_ = 'counter_value').text)
|
||||||
type_ = div.find('span', class_ = 'counter_type').text
|
type_ = div.find('span', class_ = 'counter_type').text
|
||||||
if type_ == 'members':
|
if type_ == 'members':
|
||||||
# Already extracted more accurately from /channel, skip
|
# Already extracted more accurately from /channel, skip
|
||||||
@@ -312,7 +320,7 @@ class TelegramChannelScraper(snscrape.base.Scraper):
|
|||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.channel)
|
return cls._cli_construct(args, args.channel)
|
||||||
|
|
||||||
def parse_num(s):
|
def _parse_num(s):
|
||||||
s = s.replace(' ', '')
|
s = s.replace(' ', '')
|
||||||
if s.endswith('M'):
|
if s.endswith('M'):
|
||||||
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
|
||||||
@@ -320,11 +328,11 @@ def parse_num(s):
|
|||||||
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
|
||||||
return int(s), 1
|
return int(s), 1
|
||||||
|
|
||||||
def durationStrToSeconds(durationStr):
|
def _durationStrToSeconds(durationStr):
|
||||||
durationList = durationStr.split(':')
|
durationList = durationStr.split(':')
|
||||||
return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
|
return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(durationList))])
|
||||||
|
|
||||||
def telegramResponseOkCallback(r):
|
def _telegramResponseOkCallback(r):
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
return (True, None)
|
return (True, None)
|
||||||
return (False, f'{r.status_code=}')
|
return (False, f'{r.status_code=}')
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
__all__ = [
|
__all__ = [
|
||||||
'Tweet', 'Medium', 'Photo', 'VideoVariant', 'Video', 'Gif', 'DescriptionUrl', 'Coordinates', 'Place',
|
'Tweet', 'Medium', 'Photo', 'VideoVariant', 'Video', 'Gif', 'TextLink', 'Coordinates', 'Place',
|
||||||
'User', 'UserLabel',
|
'User', 'UserLabel',
|
||||||
'Trend',
|
'Trend',
|
||||||
'GuestTokenManager',
|
'GuestTokenManager',
|
||||||
@@ -31,6 +31,18 @@ import string
|
|||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
|
||||||
|
# DescriptionURL deprecation
|
||||||
|
_DEPRECATED_NAMES = {'DescriptionURL': 'TextLink'}
|
||||||
|
def __getattr__(name):
|
||||||
|
if name in _DEPRECATED_NAMES:
|
||||||
|
warnings.warn(f'{name} is deprecated, use {_DEPRECATED_NAMES[name]} instead', FutureWarning, stacklevel = 2)
|
||||||
|
return globals()[_DEPRECATED_NAMES[name]]
|
||||||
|
raise AttributeError(f'module {__name__!r} has no attribute {name!r}')
|
||||||
|
def __dir__():
|
||||||
|
return sorted(__all__ + list(_DEPRECATED_NAMES.keys()))
|
||||||
|
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
@@ -43,7 +55,7 @@ _GUEST_TOKEN_VALIDITY = 10800
|
|||||||
class Tweet(snscrape.base.Item):
|
class Tweet(snscrape.base.Item):
|
||||||
url: str
|
url: str
|
||||||
date: datetime.datetime
|
date: datetime.datetime
|
||||||
content: str
|
rawContent: str
|
||||||
renderedContent: str
|
renderedContent: str
|
||||||
id: int
|
id: int
|
||||||
user: 'User'
|
user: 'User'
|
||||||
@@ -56,8 +68,7 @@ class Tweet(snscrape.base.Item):
|
|||||||
source: str
|
source: str
|
||||||
sourceUrl: typing.Optional[str] = None
|
sourceUrl: typing.Optional[str] = None
|
||||||
sourceLabel: typing.Optional[str] = None
|
sourceLabel: typing.Optional[str] = None
|
||||||
outlinks: typing.Optional[typing.List[str]] = None
|
links: typing.Optional[typing.List['TextLink']] = None
|
||||||
tcooutlinks: typing.Optional[typing.List[str]] = None
|
|
||||||
media: typing.Optional[typing.List['Medium']] = None
|
media: typing.Optional[typing.List['Medium']] = None
|
||||||
retweetedTweet: typing.Optional['Tweet'] = None
|
retweetedTweet: typing.Optional['Tweet'] = None
|
||||||
quotedTweet: typing.Optional['Tweet'] = None
|
quotedTweet: typing.Optional['Tweet'] = None
|
||||||
@@ -71,13 +82,24 @@ class Tweet(snscrape.base.Item):
|
|||||||
card: typing.Optional['Card'] = None
|
card: typing.Optional['Card'] = None
|
||||||
|
|
||||||
username = snscrape.base._DeprecatedProperty('username', lambda self: self.user.username, 'user.username')
|
username = snscrape.base._DeprecatedProperty('username', lambda self: self.user.username, 'user.username')
|
||||||
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks) if self.outlinks else '', 'outlinks')
|
outlinks = snscrape.base._DeprecatedProperty('outlinks', lambda self: [x.url for x in self.links] if self.links else [], 'links (url attribute)')
|
||||||
tcooutlinksss = snscrape.base._DeprecatedProperty('tcooutlinksss', lambda self: ' '.join(self.tcooutlinks) if self.tcooutlinks else '', 'tcooutlinks')
|
outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(x.url for x in self.links) if self.links else '', 'links (url attribute)')
|
||||||
|
tcooutlinks = snscrape.base._DeprecatedProperty('tcooutlinks', lambda self: [x.tcourl for x in self.links] if self.links else [], 'links (tcourl attribute)')
|
||||||
|
tcooutlinksss = snscrape.base._DeprecatedProperty('tcooutlinksss', lambda self: ' '.join(x.tcourl for x in self.links) if self.links else '', 'links (tcourl attribute)')
|
||||||
|
content = snscrape.base._DeprecatedProperty('content', lambda self: self.rawContent, 'rawContent')
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.url
|
return self.url
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class TextLink:
|
||||||
|
text: typing.Optional[str]
|
||||||
|
url: str
|
||||||
|
tcourl: str
|
||||||
|
indices: typing.Tuple[int, int]
|
||||||
|
|
||||||
|
|
||||||
class Medium:
|
class Medium:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -109,14 +131,6 @@ class Gif(Medium):
|
|||||||
variants: typing.List[VideoVariant]
|
variants: typing.List[VideoVariant]
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
|
||||||
class DescriptionURL:
|
|
||||||
text: typing.Optional[str]
|
|
||||||
url: str
|
|
||||||
tcourl: str
|
|
||||||
indices: typing.Tuple[int, int]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Coordinates:
|
class Coordinates:
|
||||||
longitude: float
|
longitude: float
|
||||||
@@ -445,9 +459,9 @@ class User(snscrape.base.Entity):
|
|||||||
username: str
|
username: str
|
||||||
id: int
|
id: int
|
||||||
displayname: typing.Optional[str] = None
|
displayname: typing.Optional[str] = None
|
||||||
description: typing.Optional[str] = None # Description as it's displayed on the web interface with URLs replaced
|
|
||||||
rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact
|
rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact
|
||||||
descriptionUrls: typing.Optional[typing.List[DescriptionURL]] = None
|
renderedDescription: typing.Optional[str] = None # Description as it's displayed on the web interface with URLs replaced
|
||||||
|
descriptionLinks: typing.Optional[typing.List[TextLink]] = None
|
||||||
verified: typing.Optional[bool] = None
|
verified: typing.Optional[bool] = None
|
||||||
created: typing.Optional[datetime.datetime] = None
|
created: typing.Optional[datetime.datetime] = None
|
||||||
followersCount: typing.Optional[int] = None
|
followersCount: typing.Optional[int] = None
|
||||||
@@ -458,12 +472,16 @@ class User(snscrape.base.Entity):
|
|||||||
mediaCount: typing.Optional[int] = None
|
mediaCount: typing.Optional[int] = None
|
||||||
location: typing.Optional[str] = None
|
location: typing.Optional[str] = None
|
||||||
protected: typing.Optional[bool] = None
|
protected: typing.Optional[bool] = None
|
||||||
linkUrl: typing.Optional[str] = None
|
link: typing.Optional[TextLink] = None
|
||||||
linkTcourl: typing.Optional[str] = None
|
|
||||||
profileImageUrl: typing.Optional[str] = None
|
profileImageUrl: typing.Optional[str] = None
|
||||||
profileBannerUrl: typing.Optional[str] = None
|
profileBannerUrl: typing.Optional[str] = None
|
||||||
label: typing.Optional['UserLabel'] = None
|
label: typing.Optional['UserLabel'] = None
|
||||||
|
|
||||||
|
descriptionUrls = snscrape.base._DeprecatedProperty('descriptionUrls', lambda self: self.descriptionLinks, 'descriptionLinks')
|
||||||
|
linkUrl = snscrape.base._DeprecatedProperty('linkUrl', lambda self: self.link.url if self.link else None, 'link.url')
|
||||||
|
linkTcourl = snscrape.base._DeprecatedProperty('linkTcourl', lambda self: self.link.tcourl if self.link else None, 'link.tcourl')
|
||||||
|
description = snscrape.base._DeprecatedProperty('description', lambda self: self.renderedDescription, 'renderedDescription')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def url(self):
|
def url(self):
|
||||||
return f'https://twitter.com/{self.username}'
|
return f'https://twitter.com/{self.username}'
|
||||||
@@ -548,7 +566,12 @@ class _CLIGuestTokenManager(GuestTokenManager):
|
|||||||
return None
|
return None
|
||||||
_logger.info(f'Reading guest token from {self._file}')
|
_logger.info(f'Reading guest token from {self._file}')
|
||||||
with open(self._file, 'r') as fp:
|
with open(self._file, 'r') as fp:
|
||||||
o = json.load(fp)
|
try:
|
||||||
|
o = json.load(fp)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
_logger.warning(f'Malformed guest token file {self._file}: {e!s}')
|
||||||
|
self.reset()
|
||||||
|
return None
|
||||||
self._token = o['token']
|
self._token = o['token']
|
||||||
self._setTime = o['setTime']
|
self._setTime = o['setTime']
|
||||||
if self._setTime < time.time() - _GUEST_TOKEN_VALIDITY:
|
if self._setTime < time.time() - _GUEST_TOKEN_VALIDITY:
|
||||||
@@ -810,13 +833,17 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
tweetId = self._get_tweet_id(tweet)
|
tweetId = self._get_tweet_id(tweet)
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
kwargs['id'] = tweetId
|
kwargs['id'] = tweetId
|
||||||
kwargs['content'] = tweet['full_text']
|
kwargs['rawContent'] = tweet['full_text']
|
||||||
kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities'].get('urls'))
|
kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities'].get('urls'))
|
||||||
kwargs['user'] = user
|
kwargs['user'] = user
|
||||||
kwargs['date'] = email.utils.parsedate_to_datetime(tweet['created_at'])
|
kwargs['date'] = email.utils.parsedate_to_datetime(tweet['created_at'])
|
||||||
if tweet['entities'].get('urls'):
|
if tweet['entities'].get('urls'):
|
||||||
kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']]
|
kwargs['links'] = [TextLink(
|
||||||
kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']]
|
text = u.get('display_url'),
|
||||||
|
url = u['expanded_url'],
|
||||||
|
tcourl = u['url'],
|
||||||
|
indices = tuple(u['indices']),
|
||||||
|
) for u in tweet['entities']['urls']]
|
||||||
kwargs['url'] = f'https://twitter.com/{user.username}/status/{tweetId}'
|
kwargs['url'] = f'https://twitter.com/{user.username}/status/{tweetId}'
|
||||||
kwargs['replyCount'] = tweet['reply_count']
|
kwargs['replyCount'] = tweet['reply_count']
|
||||||
kwargs['retweetCount'] = tweet['retweet_count']
|
kwargs['retweetCount'] = tweet['retweet_count']
|
||||||
@@ -877,10 +904,15 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
if hasattr(card, 'url') and '//t.co/' in card.url:
|
if hasattr(card, 'url') and '//t.co/' in card.url:
|
||||||
# Try to convert the URL to the non-shortened/t.co one
|
# Try to convert the URL to the non-shortened/t.co one
|
||||||
# Retweets inherit the card but not the outlinks; try to get them from the retweeted tweet instead in that case.
|
# Retweets inherit the card but not the outlinks; try to get them from the retweeted tweet instead in that case.
|
||||||
if 'tcooutlinks' in kwargs and card.url in kwargs['tcooutlinks']:
|
candidates = []
|
||||||
card.url = kwargs['outlinks'][kwargs['tcooutlinks'].index(card.url)]
|
if 'links' in kwargs:
|
||||||
elif retweetedTweet and retweetedTweet.tcooutlinks and card.url in retweetedTweet.tcooutlinks:
|
candidates.extend(kwargs['links'])
|
||||||
card.url = retweetedTweet.outlinks[retweetedTweet.tcooutlinks.index(card.url)]
|
if retweetedTweet:
|
||||||
|
candidates.extend(retweetedTweet.links)
|
||||||
|
for u in candidates:
|
||||||
|
if u.tcourl == card.url:
|
||||||
|
card.url = u.url
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
_logger.warning(f'Could not translate t.co card URL on tweet {tweetId}')
|
_logger.warning(f'Could not translate t.co card URL on tweet {tweetId}')
|
||||||
return Tweet(**kwargs)
|
return Tweet(**kwargs)
|
||||||
@@ -1300,10 +1332,15 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
kwargs['username'] = user['screen_name']
|
kwargs['username'] = user['screen_name']
|
||||||
kwargs['id'] = id_ if id_ else user['id'] if 'id' in user else int(user['id_str'])
|
kwargs['id'] = id_ if id_ else user['id'] if 'id' in user else int(user['id_str'])
|
||||||
kwargs['displayname'] = user['name']
|
kwargs['displayname'] = user['name']
|
||||||
kwargs['description'] = self._render_text_with_urls(user['description'], user['entities']['description'].get('urls'))
|
|
||||||
kwargs['rawDescription'] = user['description']
|
kwargs['rawDescription'] = user['description']
|
||||||
|
kwargs['renderedDescription'] = self._render_text_with_urls(user['description'], user['entities']['description'].get('urls'))
|
||||||
if user['entities']['description'].get('urls'):
|
if user['entities']['description'].get('urls'):
|
||||||
kwargs['descriptionUrls'] = [{'text': x.get('display_url'), 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['entities']['description']['urls']]
|
kwargs['descriptionLinks'] = [TextLink(
|
||||||
|
text = x.get('display_url'),
|
||||||
|
url = x['expanded_url'],
|
||||||
|
tcourl = x['url'],
|
||||||
|
indices = tuple(x['indices']),
|
||||||
|
) for x in user['entities']['description']['urls']]
|
||||||
kwargs['verified'] = user.get('verified')
|
kwargs['verified'] = user.get('verified')
|
||||||
kwargs['created'] = email.utils.parsedate_to_datetime(user['created_at'])
|
kwargs['created'] = email.utils.parsedate_to_datetime(user['created_at'])
|
||||||
kwargs['followersCount'] = user['followers_count']
|
kwargs['followersCount'] = user['followers_count']
|
||||||
@@ -1314,9 +1351,13 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
|
|||||||
kwargs['mediaCount'] = user['media_count']
|
kwargs['mediaCount'] = user['media_count']
|
||||||
kwargs['location'] = user['location']
|
kwargs['location'] = user['location']
|
||||||
kwargs['protected'] = user.get('protected')
|
kwargs['protected'] = user.get('protected')
|
||||||
if 'url' in user['entities']:
|
if user.get('url'):
|
||||||
kwargs['linkUrl'] = (user['entities']['url']['urls'][0].get('expanded_url') or user.get('url'))
|
entity = user['entities'].get('url', {}).get('urls', [None])[0]
|
||||||
kwargs['linkTcourl'] = user.get('url')
|
if not entity or entity['url'] != user['url']:
|
||||||
|
self.logger.warning(f'Link inconsistency on user {kwargs["id"]}')
|
||||||
|
if not entity:
|
||||||
|
entity = {'indices': (0, len(user['url']))}
|
||||||
|
kwargs['link'] = TextLink(text = entity.get('display_url'), url = entity.get('expanded_url', user['url']), tcourl = user['url'], indices = tuple(entity['indices']))
|
||||||
kwargs['profileImageUrl'] = user['profile_image_url_https']
|
kwargs['profileImageUrl'] = user['profile_image_url_https']
|
||||||
kwargs['profileBannerUrl'] = user.get('profile_banner_url')
|
kwargs['profileBannerUrl'] = user.get('profile_banner_url')
|
||||||
if 'ext' in user and (label := user['ext']['highlightedLabel']['r']['ok'].get('label')):
|
if 'ext' in user and (label := user['ext']['highlightedLabel']['r']['ok'].get('label')):
|
||||||
@@ -1373,6 +1414,7 @@ class TwitterSearchScraper(_TwitterAPIScraper):
|
|||||||
'include_mute_edge': '1',
|
'include_mute_edge': '1',
|
||||||
'include_can_dm': '1',
|
'include_can_dm': '1',
|
||||||
'include_can_media_tag': '1',
|
'include_can_media_tag': '1',
|
||||||
|
'include_ext_has_nft_avatar': '1',
|
||||||
'skip_status': '1',
|
'skip_status': '1',
|
||||||
'cards_platform': 'Web-12',
|
'cards_platform': 'Web-12',
|
||||||
'include_cards': '1',
|
'include_cards': '1',
|
||||||
@@ -1384,16 +1426,18 @@ class TwitterSearchScraper(_TwitterAPIScraper):
|
|||||||
'include_user_entities': 'true',
|
'include_user_entities': 'true',
|
||||||
'include_ext_media_color': 'true',
|
'include_ext_media_color': 'true',
|
||||||
'include_ext_media_availability': 'true',
|
'include_ext_media_availability': 'true',
|
||||||
|
'include_ext_sensitive_media_warning': 'true',
|
||||||
|
'include_ext_trusted_friends_metadata': 'true',
|
||||||
'send_error_codes': 'true',
|
'send_error_codes': 'true',
|
||||||
'simple_quoted_tweets': 'true',
|
'simple_quoted_tweet': 'true',
|
||||||
'q': self._query,
|
'q': self._query,
|
||||||
'tweet_search_mode': 'live',
|
'tweet_search_mode': 'live',
|
||||||
'count': '100',
|
'count': '20',
|
||||||
'query_source': 'spelling_expansion_revert_click',
|
'query_source': 'spelling_expansion_revert_click',
|
||||||
'cursor': None,
|
'cursor': None,
|
||||||
'pc': '1',
|
'pc': '1',
|
||||||
'spelling_corrections': '1',
|
'spelling_corrections': '1',
|
||||||
'ext': 'mediaStats,highlightedLabel',
|
'ext': 'mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,enrichments,superFollowMetadata,unmentionInfo',
|
||||||
}
|
}
|
||||||
params = paginationParams.copy()
|
params = paginationParams.copy()
|
||||||
del params['cursor']
|
del params['cursor']
|
||||||
@@ -1441,7 +1485,15 @@ class TwitterUserScraper(TwitterSearchScraper):
|
|||||||
return None
|
return None
|
||||||
user = obj['data']['user']['result']
|
user = obj['data']['user']['result']
|
||||||
rawDescription = user['legacy']['description']
|
rawDescription = user['legacy']['description']
|
||||||
description = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls'])
|
renderedDescription = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls'])
|
||||||
|
link = None
|
||||||
|
if user['legacy'].get('url'):
|
||||||
|
entity = user['legacy']['entities'].get('url', {}).get('urls', [None])[0]
|
||||||
|
if not entity or entity['url'] != user['legacy']['url']:
|
||||||
|
self.logger.warning(f'Link inconsistency on user')
|
||||||
|
if not entity:
|
||||||
|
entity = {'indices': (0, len(user['legacy']['url']))}
|
||||||
|
link = TextLink(text = entity.get('display_url'), url = entity.get('expanded_url', user['legacy']['url']), tcourl = user['legacy']['url'], indices = tuple(entity['indices']))
|
||||||
label = None
|
label = None
|
||||||
if (labelO := user['affiliates_highlighted_label'].get('label')):
|
if (labelO := user['affiliates_highlighted_label'].get('label')):
|
||||||
label = self._user_label_to_user_label(labelO)
|
label = self._user_label_to_user_label(labelO)
|
||||||
@@ -1449,9 +1501,14 @@ class TwitterUserScraper(TwitterSearchScraper):
|
|||||||
username = user['legacy']['screen_name'],
|
username = user['legacy']['screen_name'],
|
||||||
id = int(user['rest_id']),
|
id = int(user['rest_id']),
|
||||||
displayname = user['legacy']['name'],
|
displayname = user['legacy']['name'],
|
||||||
description = description,
|
|
||||||
rawDescription = rawDescription,
|
rawDescription = rawDescription,
|
||||||
descriptionUrls = [{'text': x.get('display_url'), 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']],
|
renderedDescription = renderedDescription,
|
||||||
|
descriptionLinks = [TextLink(
|
||||||
|
text = x.get('display_url'),
|
||||||
|
url = x['expanded_url'],
|
||||||
|
tcourl = x['url'],
|
||||||
|
indices = tuple(x['indices']),
|
||||||
|
) for x in user['legacy']['entities']['description']['urls']],
|
||||||
verified = user['legacy']['verified'],
|
verified = user['legacy']['verified'],
|
||||||
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
|
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
|
||||||
followersCount = user['legacy']['followers_count'],
|
followersCount = user['legacy']['followers_count'],
|
||||||
@@ -1462,8 +1519,7 @@ class TwitterUserScraper(TwitterSearchScraper):
|
|||||||
mediaCount = user['legacy']['media_count'],
|
mediaCount = user['legacy']['media_count'],
|
||||||
location = user['legacy']['location'],
|
location = user['legacy']['location'],
|
||||||
protected = user['legacy']['protected'],
|
protected = user['legacy']['protected'],
|
||||||
linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None,
|
link = link,
|
||||||
linkTcourl = user['legacy'].get('url'),
|
|
||||||
profileImageUrl = user['legacy']['profile_image_url_https'],
|
profileImageUrl = user['legacy']['profile_image_url_https'],
|
||||||
profileBannerUrl = user['legacy'].get('profile_banner_url'),
|
profileBannerUrl = user['legacy'].get('profile_banner_url'),
|
||||||
label = label,
|
label = label,
|
||||||
@@ -1673,6 +1729,7 @@ class TwitterTrendsScraper(_TwitterAPIScraper):
|
|||||||
'include_mute_edge': '1',
|
'include_mute_edge': '1',
|
||||||
'include_can_dm': '1',
|
'include_can_dm': '1',
|
||||||
'include_can_media_tag': '1',
|
'include_can_media_tag': '1',
|
||||||
|
'include_ext_has_nft_avatar': '1',
|
||||||
'skip_status': '1',
|
'skip_status': '1',
|
||||||
'cards_platform': 'Web-12',
|
'cards_platform': 'Web-12',
|
||||||
'include_cards': '1',
|
'include_cards': '1',
|
||||||
@@ -1684,13 +1741,15 @@ class TwitterTrendsScraper(_TwitterAPIScraper):
|
|||||||
'include_user_entities': 'true',
|
'include_user_entities': 'true',
|
||||||
'include_ext_media_color': 'true',
|
'include_ext_media_color': 'true',
|
||||||
'include_ext_media_availability': 'true',
|
'include_ext_media_availability': 'true',
|
||||||
|
'include_ext_sensitive_media_warning': 'true',
|
||||||
|
'include_ext_trusted_friends_metadata': 'true',
|
||||||
'send_error_codes': 'true',
|
'send_error_codes': 'true',
|
||||||
'simple_quoted_tweet': 'true',
|
'simple_quoted_tweet': 'true',
|
||||||
'count': '20',
|
'count': '20',
|
||||||
'candidate_source': 'trends',
|
'candidate_source': 'trends',
|
||||||
'include_page_configuration': 'false',
|
'include_page_configuration': 'false',
|
||||||
'entity_tokens': 'false',
|
'entity_tokens': 'false',
|
||||||
'ext': 'mediaStats,highlightedLabel,voiceInfo',
|
'ext': 'mediaStats,highlightedLabel,hasNftAvatar,voiceInfo,enrichments,superFollowMetadata,unmentionInfo',
|
||||||
}
|
}
|
||||||
obj = self._get_api_data('https://twitter.com/i/api/2/guide.json', _TwitterAPIType.V2, params)
|
obj = self._get_api_data('https://twitter.com/i/api/2/guide.json', _TwitterAPIType.V2, params)
|
||||||
for instruction in obj['timeline']['instructions']:
|
for instruction in obj['timeline']['instructions']:
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ _logger = logging.getLogger(__name__)
|
|||||||
_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
|
_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
|
||||||
_datePattern = re.compile(r'^(?P<date>today'
|
_datePattern = re.compile(r'^(?P<date>today'
|
||||||
r'|yesterday'
|
r'|yesterday'
|
||||||
r'|(?P<day1>\d+)\s+(?P<month1>' + '|'.join(_months) + ')(\s+(?P<year1>\d{4}))?'
|
r'|(?P<day1>\d+)\s+(?P<month1>' + '|'.join(_months) + r')(\s+(?P<year1>\d{4}))?'
|
||||||
r'|(?P<month2>' + '|'.join(_months) + r')\s+(?P<day2>\d+),\s+(?P<year2>\d{4})'
|
r'|(?P<month2>' + '|'.join(_months) + r')\s+(?P<day2>\d+),\s+(?P<year2>\d{4})'
|
||||||
')'
|
')'
|
||||||
r'\s+at\s+(?P<hour>\d+):(?P<minute>\d+)\s+(?P<ampm>[ap]m)$')
|
r'\s+at\s+(?P<hour>\d+):(?P<minute>\d+)\s+(?P<ampm>[ap]m)$')
|
||||||
@@ -178,15 +178,11 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
continue
|
continue
|
||||||
if 'data-video' in a.attrs:
|
if 'data-video' in a.attrs:
|
||||||
# Video
|
# Video
|
||||||
if 'data-link-attr' in a.attrs:
|
|
||||||
hrefUrl = urllib.parse.unquote(a.attrs['data-link-attr'].split('to=')[1].split('&')[0])
|
|
||||||
else:
|
|
||||||
hrefUrl = f'https://vk.com{a["href"]}'
|
|
||||||
video = Video(
|
video = Video(
|
||||||
id = a['data-video'],
|
id = a['data-video'],
|
||||||
list = a['data-list'],
|
list = a['data-list'],
|
||||||
duration = int(a['data-duration']),
|
duration = int(a['data-duration']),
|
||||||
url = hrefUrl,
|
url = f'https://vk.com{a["href"]}',
|
||||||
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)],
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
@@ -407,4 +403,4 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cli_from_args(cls, args):
|
def _cli_from_args(cls, args):
|
||||||
return cls._cli_construct(args, args.username)
|
return cls._cli_construct(args, args.username)
|
||||||
Reference in New Issue
Block a user