mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-12 12:28:28 +03:00
Refactor API interaction into something cleaner and more reusable
This commit is contained in:
@@ -112,33 +112,103 @@ class TwitterCommonScraper(snscrape.base.Scraper):
|
|||||||
return True, None
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
class TwitterSearchScraper(TwitterCommonScraper):
|
class TwitterAPIScraper(TwitterCommonScraper):
|
||||||
name = 'twitter-search'
|
def __init__(self, baseUrl, **kwargs):
|
||||||
|
|
||||||
def __init__(self, query, cursor = None, **kwargs):
|
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self._query = query
|
self._baseUrl = baseUrl
|
||||||
self._cursor = cursor
|
|
||||||
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
|
||||||
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
|
|
||||||
self._guestToken = None
|
self._guestToken = None
|
||||||
|
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
||||||
|
self._apiHeaders = {
|
||||||
|
'User-Agent': self._userAgent,
|
||||||
|
'Authorization': _API_AUTHORIZATION_HEADER,
|
||||||
|
'Referer': self._baseUrl,
|
||||||
|
}
|
||||||
|
|
||||||
def _get_guest_token(self, url):
|
def _ensure_guest_token(self, url = None):
|
||||||
logger.info(f'Retrieving guest token')
|
if self._guestToken is not None:
|
||||||
gt = None
|
return
|
||||||
r = self._get(url, headers = {'User-Agent': self._userAgent})
|
logger.info('Retrieving guest token')
|
||||||
|
r = self._get(self._baseUrl if url is None else url, headers = {'User-Agent': self._userAgent})
|
||||||
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)
|
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)
|
||||||
if match:
|
if match:
|
||||||
logger.debug('Found guest token in HTML')
|
logger.debug('Found guest token in HTML')
|
||||||
gt = match.group(1)
|
self._guestToken = match.group(1)
|
||||||
if 'gt' in r.cookies:
|
if 'gt' in r.cookies:
|
||||||
logger.debug('Found guest token in cookies')
|
logger.debug('Found guest token in cookies')
|
||||||
gt = r.cookies['gt']
|
self._guestToken = r.cookies['gt']
|
||||||
if gt:
|
if self._guestToken:
|
||||||
self._session.cookies.set('gt', gt, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800)
|
self._session.cookies.set('gt', self._guestToken, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800)
|
||||||
return gt
|
self._apiHeaders['x-guest-token'] = self._guestToken
|
||||||
|
return
|
||||||
raise snscrape.base.ScraperException('Unable to find guest token')
|
raise snscrape.base.ScraperException('Unable to find guest token')
|
||||||
|
|
||||||
|
def _unset_guest_token(self):
|
||||||
|
self._guestToken = None
|
||||||
|
del self._session.cookies['gt']
|
||||||
|
del self._apiHeaders['x-guest-token']
|
||||||
|
|
||||||
|
def _check_api_response(self, r):
|
||||||
|
if r.status_code == 429:
|
||||||
|
self._unset_guest_token()
|
||||||
|
self._ensure_guest_token()
|
||||||
|
return False, 'rate-limited'
|
||||||
|
if r.headers.get('content-type').replace(' ', '') != 'application/json;charset=utf-8':
|
||||||
|
return False, 'content type is not JSON'
|
||||||
|
if r.status_code != 200:
|
||||||
|
return False, 'non-200 status code'
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
def _get_api_data(self, endpoint, params):
|
||||||
|
self._ensure_guest_token()
|
||||||
|
r = self._get(endpoint, params = params, headers = self._apiHeaders, responseOkCallback = self._check_api_response)
|
||||||
|
try:
|
||||||
|
obj = r.json()
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None):
|
||||||
|
# Iterate over endpoint with params/paginationParams, optionally starting from a cursor
|
||||||
|
# Handles guest token extraction using the baseUrl passed to __init__ etc.
|
||||||
|
# Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten).
|
||||||
|
if cursor is None:
|
||||||
|
reqParams = params
|
||||||
|
else:
|
||||||
|
reqParams = paginationParams.copy()
|
||||||
|
reqParams['cursor'] = cursor
|
||||||
|
while True:
|
||||||
|
logger.info(f'Retrieving scroll page {cursor}')
|
||||||
|
obj = self._get_api_data(endpoint, reqParams)
|
||||||
|
yield obj
|
||||||
|
|
||||||
|
# No data format test, just a hard and loud crash if anything's wrong :-)
|
||||||
|
newCursor = None
|
||||||
|
for instruction in obj['timeline']['instructions']:
|
||||||
|
if 'addEntries' in instruction:
|
||||||
|
entries = instruction['addEntries']['entries']
|
||||||
|
elif 'replaceEntry' in instruction:
|
||||||
|
entries = [instruction['replaceEntry']['entry']]
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
for entry in entries:
|
||||||
|
if entry['entryId'] == 'sq-cursor-bottom':
|
||||||
|
newCursor = entry['content']['operation']['cursor']['value']
|
||||||
|
if not newCursor or newCursor == cursor:
|
||||||
|
# End of pagination
|
||||||
|
break
|
||||||
|
cursor = newCursor
|
||||||
|
reqParams = paginationParams.copy()
|
||||||
|
reqParams['cursor'] = cursor
|
||||||
|
|
||||||
|
|
||||||
|
class TwitterSearchScraper(TwitterAPIScraper):
|
||||||
|
name = 'twitter-search'
|
||||||
|
|
||||||
|
def __init__(self, query, cursor = None, **kwargs):
|
||||||
|
super().__init__(baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': query, 'src': 'spelling_expansion_revert_click'}), **kwargs)
|
||||||
|
self._query = query
|
||||||
|
self._cursor = cursor
|
||||||
|
|
||||||
def _check_scroll_response(self, r):
|
def _check_scroll_response(self, r):
|
||||||
if r.status_code == 429:
|
if r.status_code == 429:
|
||||||
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
|
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
|
||||||
@@ -150,65 +220,42 @@ class TwitterSearchScraper(TwitterCommonScraper):
|
|||||||
return True, None
|
return True, None
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
headers = {
|
params = {
|
||||||
'User-Agent': self._userAgent,
|
'include_profile_interstitial_type': '1',
|
||||||
'Authorization': _API_AUTHORIZATION_HEADER,
|
'include_blocking': '1',
|
||||||
'Referer': self._baseUrl,
|
'include_blocked_by': '1',
|
||||||
|
'include_followed_by': '1',
|
||||||
|
'include_want_retweets': '1',
|
||||||
|
'include_mute_edge': '1',
|
||||||
|
'include_can_dm': '1',
|
||||||
|
'include_can_media_tag': '1',
|
||||||
|
'skip_status': '1',
|
||||||
|
'cards_platform': 'Web-12',
|
||||||
|
'include_cards': '1',
|
||||||
|
'include_ext_alt_text': 'true',
|
||||||
|
'include_quote_count': 'true',
|
||||||
|
'include_reply_count': '1',
|
||||||
|
'tweet_mode': 'extended',
|
||||||
|
'include_entities': 'true',
|
||||||
|
'include_user_entities': 'true',
|
||||||
|
'include_ext_media_color': 'true',
|
||||||
|
'include_ext_media_availability': 'true',
|
||||||
|
'send_error_codes': 'true',
|
||||||
|
'simple_quoted_tweets': 'true',
|
||||||
|
'q': self._query,
|
||||||
|
'tweet_search_mode': 'live',
|
||||||
|
'count': '100',
|
||||||
|
'query_source': 'spelling_expansion_revert_click',
|
||||||
}
|
}
|
||||||
if self._guestToken:
|
paginationParams = params.copy()
|
||||||
headers['x-guest-token'] = self._guestToken
|
paginationParams['cursor'] = None
|
||||||
cursor = self._cursor
|
for d in (params, paginationParams):
|
||||||
while True:
|
d['pc'] = '1'
|
||||||
if not self._guestToken:
|
d['spelling_corrections'] = '1'
|
||||||
self._guestToken = self._get_guest_token(self._baseUrl)
|
d['ext'] = 'ext=mediaStats%2ChighlightedLabel'
|
||||||
headers['x-guest-token'] = self._guestToken
|
|
||||||
|
|
||||||
logger.info(f'Retrieving scroll page {cursor}')
|
|
||||||
params = {
|
|
||||||
'include_profile_interstitial_type': '1',
|
|
||||||
'include_blocking': '1',
|
|
||||||
'include_blocked_by': '1',
|
|
||||||
'include_followed_by': '1',
|
|
||||||
'include_want_retweets': '1',
|
|
||||||
'include_mute_edge': '1',
|
|
||||||
'include_can_dm': '1',
|
|
||||||
'include_can_media_tag': '1',
|
|
||||||
'skip_status': '1',
|
|
||||||
'cards_platform': 'Web-12',
|
|
||||||
'include_cards': '1',
|
|
||||||
'include_ext_alt_text': 'true',
|
|
||||||
'include_quote_count': 'true',
|
|
||||||
'include_reply_count': '1',
|
|
||||||
'tweet_mode': 'extended',
|
|
||||||
'include_entities': 'true',
|
|
||||||
'include_user_entities': 'true',
|
|
||||||
'include_ext_media_color': 'true',
|
|
||||||
'include_ext_media_availability': 'true',
|
|
||||||
'send_error_codes': 'true',
|
|
||||||
'simple_quoted_tweets': 'true',
|
|
||||||
'q': self._query,
|
|
||||||
'tweet_search_mode': 'live',
|
|
||||||
'count': '100',
|
|
||||||
'query_source': 'spelling_expansion_revert_click',
|
|
||||||
}
|
|
||||||
if cursor:
|
|
||||||
params['cursor'] = cursor
|
|
||||||
params['pc'] = '1'
|
|
||||||
params['spelling_corrections'] = '1'
|
|
||||||
params['ext'] = 'ext=mediaStats%2ChighlightedLabel'
|
|
||||||
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
|
|
||||||
if r.status_code == 429:
|
|
||||||
self._guestToken = None
|
|
||||||
del self._session.cookies['gt']
|
|
||||||
del headers['x-guest-token']
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
obj = r.json()
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
|
|
||||||
|
|
||||||
|
for obj in self._iter_api_data('https://api.twitter.com/2/search/adaptive.json', params, paginationParams):
|
||||||
# No data format test, just a hard and loud crash if anything's wrong :-)
|
# No data format test, just a hard and loud crash if anything's wrong :-)
|
||||||
newCursor = None
|
|
||||||
for instruction in obj['timeline']['instructions']:
|
for instruction in obj['timeline']['instructions']:
|
||||||
if 'addEntries' in instruction:
|
if 'addEntries' in instruction:
|
||||||
entries = instruction['addEntries']['entries']
|
entries = instruction['addEntries']['entries']
|
||||||
@@ -234,12 +281,6 @@ class TwitterSearchScraper(TwitterCommonScraper):
|
|||||||
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
|
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
|
||||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||||
elif entry['entryId'] == 'sq-cursor-bottom':
|
|
||||||
newCursor = entry['content']['operation']['cursor']['value']
|
|
||||||
if not newCursor or newCursor == cursor:
|
|
||||||
# End of pagination
|
|
||||||
break
|
|
||||||
cursor = newCursor
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_parser(cls, subparser):
|
def setup_parser(cls, subparser):
|
||||||
@@ -261,53 +302,36 @@ class TwitterUserScraper(TwitterSearchScraper):
|
|||||||
self._username = username
|
self._username = username
|
||||||
|
|
||||||
def get_entity(self):
|
def get_entity(self):
|
||||||
while True:
|
self._ensure_guest_token(f'https://twitter.com/{self._username}')
|
||||||
if not self._guestToken:
|
params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))}
|
||||||
self._guestToken = self._get_guest_token(f'https://twitter.com/{self._username}')
|
obj = self._get_api_data('https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote))
|
||||||
|
user = obj['data']['user']
|
||||||
params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))}
|
rawDescription = user['legacy']['description']
|
||||||
r = self._get(f'https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName',
|
if user['legacy']['entities']['description']['urls']:
|
||||||
params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote),
|
description = []
|
||||||
headers = {'User-Agent': self._userAgent, 'Authorization': _API_AUTHORIZATION_HEADER, 'Referer': 'https://twitter.com/', 'x-guest-token': self._guestToken},
|
description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]])
|
||||||
responseOkCallback = self._check_scroll_response,
|
urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0])
|
||||||
)
|
for url, nextUrl in itertools.zip_longest(urls, urls[1:]):
|
||||||
if r.status_code == 429:
|
description.append(url['display_url'])
|
||||||
self._guestToken = None
|
description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None])
|
||||||
del self._session.cookies['gt']
|
description = ''.join(description)
|
||||||
continue
|
else:
|
||||||
elif r.status_code != 200:
|
description = rawDescription
|
||||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
return User(
|
||||||
try:
|
username = user['legacy']['screen_name'],
|
||||||
obj = r.json()
|
description = description,
|
||||||
except json.JSONDecodeError as e:
|
rawDescription = rawDescription,
|
||||||
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
|
descriptionUrls = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']],
|
||||||
user = obj['data']['user']
|
verified = user['legacy']['verified'],
|
||||||
rawDescription = user['legacy']['description']
|
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
|
||||||
if user['legacy']['entities']['description']['urls']:
|
followersCount = user['legacy']['followers_count'],
|
||||||
description = []
|
friendsCount = user['legacy']['friends_count'],
|
||||||
description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]])
|
statusesCount = user['legacy']['statuses_count'],
|
||||||
urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0])
|
linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None,
|
||||||
for url, nextUrl in itertools.zip_longest(urls, urls[1:]):
|
linkTcourl = user['legacy'].get('url'),
|
||||||
description.append(url['display_url'])
|
profileImageUrl = user['legacy']['profile_image_url_https'],
|
||||||
description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None])
|
profileBannerUrl = user['legacy'].get('profile_banner_url'),
|
||||||
description = ''.join(description)
|
)
|
||||||
else:
|
|
||||||
description = rawDescription
|
|
||||||
return User(
|
|
||||||
username = user['legacy']['screen_name'],
|
|
||||||
description = description,
|
|
||||||
rawDescription = rawDescription,
|
|
||||||
descriptionUrls = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']],
|
|
||||||
verified = user['legacy']['verified'],
|
|
||||||
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
|
|
||||||
followersCount = user['legacy']['followers_count'],
|
|
||||||
friendsCount = user['legacy']['friends_count'],
|
|
||||||
statusesCount = user['legacy']['statuses_count'],
|
|
||||||
linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None,
|
|
||||||
linkTcourl = user['legacy'].get('url'),
|
|
||||||
profileImageUrl = user['legacy']['profile_image_url_https'],
|
|
||||||
profileBannerUrl = user['legacy'].get('profile_banner_url'),
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_valid_username(s):
|
def is_valid_username(s):
|
||||||
|
|||||||
Reference in New Issue
Block a user