diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index db0e96d..fa17711 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -112,33 +112,103 @@ class TwitterCommonScraper(snscrape.base.Scraper): return True, None -class TwitterSearchScraper(TwitterCommonScraper): - name = 'twitter-search' - - def __init__(self, query, cursor = None, **kwargs): +class TwitterAPIScraper(TwitterCommonScraper): + def __init__(self, baseUrl, **kwargs): super().__init__(**kwargs) - self._query = query - self._cursor = cursor - self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}' - self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'}) + self._baseUrl = baseUrl self._guestToken = None + self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}' + self._apiHeaders = { + 'User-Agent': self._userAgent, + 'Authorization': _API_AUTHORIZATION_HEADER, + 'Referer': self._baseUrl, + } - def _get_guest_token(self, url): - logger.info(f'Retrieving guest token') - gt = None - r = self._get(url, headers = {'User-Agent': self._userAgent}) + def _ensure_guest_token(self, url = None): + if self._guestToken is not None: + return + logger.info('Retrieving guest token') + r = self._get(self._baseUrl if url is None else url, headers = {'User-Agent': self._userAgent}) match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text) if match: logger.debug('Found guest token in HTML') - gt = match.group(1) + self._guestToken = match.group(1) if 'gt' in r.cookies: logger.debug('Found guest token in cookies') - gt = r.cookies['gt'] - if gt: - self._session.cookies.set('gt', gt, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800) - return gt + self._guestToken = r.cookies['gt'] + if self._guestToken: + self._session.cookies.set('gt', self._guestToken, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800) + self._apiHeaders['x-guest-token'] = self._guestToken + return raise snscrape.base.ScraperException('Unable to find guest token') + def _unset_guest_token(self): + self._guestToken = None + del self._session.cookies['gt'] + del self._apiHeaders['x-guest-token'] + + def _check_api_response(self, r): + if r.status_code == 429: + self._unset_guest_token() + self._ensure_guest_token() + return False, 'rate-limited' + if r.headers.get('content-type').replace(' ', '') != 'application/json;charset=utf-8': + return False, 'content type is not JSON' + if r.status_code != 200: + return False, 'non-200 status code' + return True, None + + def _get_api_data(self, endpoint, params): + self._ensure_guest_token() + r = self._get(endpoint, params = params, headers = self._apiHeaders, responseOkCallback = self._check_api_response) + try: + obj = r.json() + except json.JSONDecodeError as e: + raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e + return obj + + def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None): + # Iterate over endpoint with params/paginationParams, optionally starting from a cursor + # Handles guest token extraction using the baseUrl passed to __init__ etc. + # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten). + if cursor is None: + reqParams = params + else: + reqParams = paginationParams.copy() + reqParams['cursor'] = cursor + while True: + logger.info(f'Retrieving scroll page {cursor}') + obj = self._get_api_data(endpoint, reqParams) + yield obj + + # No data format test, just a hard and loud crash if anything's wrong :-) + newCursor = None + for instruction in obj['timeline']['instructions']: + if 'addEntries' in instruction: + entries = instruction['addEntries']['entries'] + elif 'replaceEntry' in instruction: + entries = [instruction['replaceEntry']['entry']] + else: + continue + for entry in entries: + if entry['entryId'] == 'sq-cursor-bottom': + newCursor = entry['content']['operation']['cursor']['value'] + if not newCursor or newCursor == cursor: + # End of pagination + break + cursor = newCursor + reqParams = paginationParams.copy() + reqParams['cursor'] = cursor + + +class TwitterSearchScraper(TwitterAPIScraper): + name = 'twitter-search' + + def __init__(self, query, cursor = None, **kwargs): + super().__init__(baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': query, 'src': 'spelling_expansion_revert_click'}), **kwargs) + self._query = query + self._cursor = cursor + def _check_scroll_response(self, r): if r.status_code == 429: # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items @@ -150,65 +220,42 @@ class TwitterSearchScraper(TwitterCommonScraper): return True, None def get_items(self): - headers = { - 'User-Agent': self._userAgent, - 'Authorization': _API_AUTHORIZATION_HEADER, - 'Referer': self._baseUrl, + params = { + 'include_profile_interstitial_type': '1', + 'include_blocking': '1', + 'include_blocked_by': '1', + 'include_followed_by': '1', + 'include_want_retweets': '1', + 'include_mute_edge': '1', + 'include_can_dm': '1', + 'include_can_media_tag': '1', + 'skip_status': '1', + 'cards_platform': 'Web-12', + 'include_cards': '1', + 'include_ext_alt_text': 'true', + 'include_quote_count': 'true', + 'include_reply_count': '1', + 'tweet_mode': 'extended', + 'include_entities': 'true', + 'include_user_entities': 'true', + 'include_ext_media_color': 'true', + 'include_ext_media_availability': 'true', + 'send_error_codes': 'true', + 'simple_quoted_tweets': 'true', + 'q': self._query, + 'tweet_search_mode': 'live', + 'count': '100', + 'query_source': 'spelling_expansion_revert_click', } - if self._guestToken: - headers['x-guest-token'] = self._guestToken - cursor = self._cursor - while True: - if not self._guestToken: - self._guestToken = self._get_guest_token(self._baseUrl) - headers['x-guest-token'] = self._guestToken - - logger.info(f'Retrieving scroll page {cursor}') - params = { - 'include_profile_interstitial_type': '1', - 'include_blocking': '1', - 'include_blocked_by': '1', - 'include_followed_by': '1', - 'include_want_retweets': '1', - 'include_mute_edge': '1', - 'include_can_dm': '1', - 'include_can_media_tag': '1', - 'skip_status': '1', - 'cards_platform': 'Web-12', - 'include_cards': '1', - 'include_ext_alt_text': 'true', - 'include_quote_count': 'true', - 'include_reply_count': '1', - 'tweet_mode': 'extended', - 'include_entities': 'true', - 'include_user_entities': 'true', - 'include_ext_media_color': 'true', - 'include_ext_media_availability': 'true', - 'send_error_codes': 'true', - 'simple_quoted_tweets': 'true', - 'q': self._query, - 'tweet_search_mode': 'live', - 'count': '100', - 'query_source': 'spelling_expansion_revert_click', - } - if cursor: - params['cursor'] = cursor - params['pc'] = '1' - params['spelling_corrections'] = '1' - params['ext'] = 'ext=mediaStats%2ChighlightedLabel' - r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response) - if r.status_code == 429: - self._guestToken = None - del self._session.cookies['gt'] - del headers['x-guest-token'] - continue - try: - obj = r.json() - except json.JSONDecodeError as e: - raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e + paginationParams = params.copy() + paginationParams['cursor'] = None + for d in (params, paginationParams): + d['pc'] = '1' + d['spelling_corrections'] = '1' + d['ext'] = 'ext=mediaStats%2ChighlightedLabel' + for obj in self._iter_api_data('https://api.twitter.com/2/search/adaptive.json', params, paginationParams): # No data format test, just a hard and loud crash if anything's wrong :-) - newCursor = None for instruction in obj['timeline']['instructions']: if 'addEntries' in instruction: entries = instruction['addEntries']['entries'] @@ -234,12 +281,6 @@ class TwitterSearchScraper(TwitterCommonScraper): tcooutlinks = [u['url'] for u in tweet['entities']['urls']] url = f'https://twitter.com/{username}/status/{tweetID}' yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks)) - elif entry['entryId'] == 'sq-cursor-bottom': - newCursor = entry['content']['operation']['cursor']['value'] - if not newCursor or newCursor == cursor: - # End of pagination - break - cursor = newCursor @classmethod def setup_parser(cls, subparser): @@ -261,53 +302,36 @@ class TwitterUserScraper(TwitterSearchScraper): self._username = username def get_entity(self): - while True: - if not self._guestToken: - self._guestToken = self._get_guest_token(f'https://twitter.com/{self._username}') - - params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))} - r = self._get(f'https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', - params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote), - headers = {'User-Agent': self._userAgent, 'Authorization': _API_AUTHORIZATION_HEADER, 'Referer': 'https://twitter.com/', 'x-guest-token': self._guestToken}, - responseOkCallback = self._check_scroll_response, - ) - if r.status_code == 429: - self._guestToken = None - del self._session.cookies['gt'] - continue - elif r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - try: - obj = r.json() - except json.JSONDecodeError as e: - raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e - user = obj['data']['user'] - rawDescription = user['legacy']['description'] - if user['legacy']['entities']['description']['urls']: - description = [] - description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]]) - urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0]) - for url, nextUrl in itertools.zip_longest(urls, urls[1:]): - description.append(url['display_url']) - description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None]) - description = ''.join(description) - else: - description = rawDescription - return User( - username = user['legacy']['screen_name'], - description = description, - rawDescription = rawDescription, - descriptionUrls = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']], - verified = user['legacy']['verified'], - created = email.utils.parsedate_to_datetime(user['legacy']['created_at']), - followersCount = user['legacy']['followers_count'], - friendsCount = user['legacy']['friends_count'], - statusesCount = user['legacy']['statuses_count'], - linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None, - linkTcourl = user['legacy'].get('url'), - profileImageUrl = user['legacy']['profile_image_url_https'], - profileBannerUrl = user['legacy'].get('profile_banner_url'), - ) + self._ensure_guest_token(f'https://twitter.com/{self._username}') + params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))} + obj = self._get_api_data('https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)) + user = obj['data']['user'] + rawDescription = user['legacy']['description'] + if user['legacy']['entities']['description']['urls']: + description = [] + description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]]) + urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0]) + for url, nextUrl in itertools.zip_longest(urls, urls[1:]): + description.append(url['display_url']) + description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None]) + description = ''.join(description) + else: + description = rawDescription + return User( + username = user['legacy']['screen_name'], + description = description, + rawDescription = rawDescription, + descriptionUrls = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']], + verified = user['legacy']['verified'], + created = email.utils.parsedate_to_datetime(user['legacy']['created_at']), + followersCount = user['legacy']['followers_count'], + friendsCount = user['legacy']['friends_count'], + statusesCount = user['legacy']['statuses_count'], + linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None, + linkTcourl = user['legacy'].get('url'), + profileImageUrl = user['legacy']['profile_image_url_https'], + profileBannerUrl = user['legacy'].get('profile_banner_url'), + ) @staticmethod def is_valid_username(s):