diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index 26d72ea..b03b86a 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -3,8 +3,10 @@ import datetime import json import random import logging +import re import snscrape.base import typing +import urllib.parse logger = logging.getLogger(__name__) @@ -86,82 +88,122 @@ class TwitterCommonScraper(snscrape.base.Scraper): class TwitterSearchScraper(TwitterCommonScraper): name = 'twitter-search' - def __init__(self, query, maxPosition = None, **kwargs): + def __init__(self, query, cursor = None, **kwargs): super().__init__(**kwargs) self._query = query - self._maxPosition = maxPosition + self._cursor = cursor + self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}' + self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'}) - def _get_feed_from_html(self, html, withMinPosition): - soup = bs4.BeautifulSoup(html, 'lxml') - feed = soup.find_all('li', 'js-stream-item') - if withMinPosition: - streamContainer = soup.find('div', 'stream-container') - if not streamContainer or not streamContainer.has_attr('data-min-position'): - if soup.find('div', 'SearchEmptyTimeline'): - # No results found - minPosition = None - else: - # Unknown error condition - raise RuntimeError('Unable to find min-position') - else: - minPosition = streamContainer['data-min-position'] - else: - minPosition = None - return feed, minPosition + def _get_guest_token(self): + logger.info(f'Retrieving guest token from search page') + r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent}) + match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text) + if not match: + raise RuntimeError('Unable to find guest token') + return match.group(1) + + def _check_scroll_response(self, r): + if r.status_code == 429: + # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items + return True, None + if r.headers.get('content-type') != 'application/json;charset=utf-8': + return False, f'content type is not JSON' + if r.status_code != 200: + return False, f'non-200 status code' + return True, None def get_items(self): - headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'} - - # First page - if self._maxPosition is None: - logger.info(f'Retrieving search page for {self._query}') - r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers) - - feed, maxPosition = self._get_feed_from_html(r.text, True) - if not feed: - logger.warning(f'No results for {self._query}') - return - yield from self._feed_to_items(feed) - else: - maxPosition = self._maxPosition - - if not maxPosition: - return - + headers = { + 'User-Agent': self._userAgent, + 'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA', + 'Referer': self._baseUrl, + } + guestToken = None + cursor = self._cursor while True: - logger.info(f'Retrieving scroll page {maxPosition}') - r = self._get('https://twitter.com/i/search/timeline', - params = { - 'f': 'tweets', - 'vertical': 'default', - 'lang': 'en', - 'q': self._query, - 'include_available_features': '1', - 'include_entities': '1', - 'reset_error_state': 'false', - 'src': 'spxr', - 'qf': 'off', - 'max_position': maxPosition, - }, - headers = headers, - responseOkCallback = self._check_json_callback) + if not guestToken: + guestToken = self._get_guest_token() + headers['x-guest-token'] = guestToken - obj = json.loads(r.text) - feed, _ = self._get_feed_from_html(obj['items_html'], False) - if feed: - yield from self._feed_to_items(feed) - if obj['min_position'] == maxPosition: - return - maxPosition = obj['min_position'] + logger.info(f'Retrieving scroll page {cursor}') + params = { + 'include_profile_interstitial_type': '1', + 'include_blocking': '1', + 'include_blocked_by': '1', + 'include_followed_by': '1', + 'include_want_retweets': '1', + 'include_mute_edge': '1', + 'include_can_dm': '1', + 'include_can_media_tag': '1', + 'skip_status': '1', + 'cards_platform': 'Web-12', + 'include_cards': '1', + 'include_composer_source': 'true', + 'include_ext_alt_text': 'true', + 'include_reply_count': '1', + 'tweet_mode': 'extended', + 'include_entities': 'true', + 'include_user_entities': 'true', + 'include_ext_media_color': 'true', + 'include_ext_media_availability': 'true', + 'send_error_codes': 'true', + 'simple_quoted_tweets': 'true', + 'q': self._query, + 'tweet_search_mode': 'live', + 'count': '100', + 'query_source': 'spelling_expansion_revert_click', + } + if cursor: + params['cursor'] = cursor + params['pc'] = '1' + params['spelling_corrections'] = '1' + params['ext'] = 'mediaStats%2CcameraMoment' + r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response) + if r.status_code == 429: + guestToken = None + continue + try: + obj = r.json() + except json.JSONDecodeError as e: + logger.error(f'Received invalid JSON from Twitter: {e!s}') + raise RuntimeError('Received invalid JSON from Twitter') from e + + # No data format test, just a hard and loud crash if anything's wrong :-) + newCursor = None + for instruction in obj['timeline']['instructions']: + if 'addEntries' in instruction: + entries = instruction['addEntries']['entries'] + elif 'replaceEntry' in instruction: + entries = [instruction['replaceEntry']['entry']] + else: + continue + for entry in entries: + if entry['entryId'].startswith('sq-I-t-'): + tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']] + tweetID = tweet['id'] + content = tweet['full_text'] + username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name'] + date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc) + outlinks = [u['expanded_url'] for u in tweet['entities']['urls']] + tcooutlinks = [u['url'] for u in tweet['entities']['urls']] + url = f'https://twitter.com/{username}/status/{tweetID}' + yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks)) + elif entry['entryId'] == 'sq-cursor-bottom': + newCursor = entry['content']['operation']['cursor']['value'] + if not newCursor or newCursor == cursor: + # End of pagination + break + cursor = newCursor @classmethod def setup_parser(cls, subparser): - subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition') + subparser.add_argument('--cursor', metavar = 'CURSOR') subparser.add_argument('query', help = 'A Twitter search string') @classmethod def from_args(cls, args): - return cls(args.query, maxPosition = args.maxPosition, retries = args.retries) + return cls(args.query, cursor = args.cursor, retries = args.retries) class TwitterUserScraper(TwitterSearchScraper):