diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index f3cd45e..0c17b6b 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -130,6 +130,16 @@ class Place: countryCode: str +@dataclasses.dataclass +class TweetRef(snscrape.base.Item): + '''A reference to a tweet for which no proper Tweet object could be produced from the data returned by Twitter''' + + id: int + + def __str__(self): + return f'https://twitter.com/i/web/status/{self.id}' + + @dataclasses.dataclass class User(snscrape.base.Entity): # Most fields can be None if they're not known. @@ -267,6 +277,11 @@ class _CLIGuestTokenManager(GuestTokenManager): os.remove(self._file) +class _TwitterAPIType(enum.Enum): + V2 = 0 # Introduced with the redesign + GRAPHQL = 1 + + class _TwitterAPIScraper(snscrape.base.Scraper): def __init__(self, baseUrl, *, guestTokenManager = None, **kwargs): super().__init__(**kwargs) @@ -334,8 +349,10 @@ class _TwitterAPIScraper(snscrape.base.Scraper): return False, 'non-200 status code' return True, None - def _get_api_data(self, endpoint, params): + def _get_api_data(self, endpoint, apiType, params): self._ensure_guest_token() + if apiType is _TwitterAPIType.GRAPHQL: + params = urllib.parse.urlencode({'variables': json.dumps(params, separators = (',', ':'))}, quote_via = urllib.parse.quote) r = self._get(endpoint, params = params, headers = self._apiHeaders, responseOkCallback = self._check_api_response) try: obj = r.json() @@ -343,7 +360,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper): raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e return obj - def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = _ScrollDirection.BOTTOM): + def _iter_api_data(self, endpoint, apiType, params, paginationParams = None, cursor = None, direction = _ScrollDirection.BOTTOM): # Iterate over endpoint with params/paginationParams, optionally starting from a cursor # Handles guest token extraction using the baseUrl passed to __init__ etc. # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten). @@ -366,38 +383,61 @@ class _TwitterAPIScraper(snscrape.base.Scraper): emptyResponsesOnCursor = 0 while True: _logger.info(f'Retrieving scroll page {cursor}') - obj = self._get_api_data(endpoint, reqParams) + obj = self._get_api_data(endpoint, apiType, reqParams) yield obj # No data format test, just a hard and loud crash if anything's wrong :-) newCursor = None promptCursor = None newBottomCursorAndStop = None - for instruction in obj['timeline']['instructions']: + if apiType is _TwitterAPIType.V2: + instructions = obj['timeline']['instructions'] + elif apiType is _TwitterAPIType.GRAPHQL: + if 'user' in obj['data']: + # UserTweets, UserTweetsAndReplies + instructions = obj['data']['user']['result']['timeline']['timeline']['instructions'] + else: + # TweetDetail + instructions = obj['data']['threaded_conversation_with_injections']['instructions'] + tweetCount = 0 + for instruction in instructions: if 'addEntries' in instruction: entries = instruction['addEntries']['entries'] elif 'replaceEntry' in instruction: entries = [instruction['replaceEntry']['entry']] + elif instruction.get('type') == 'TimelineAddEntries': + entries = instruction['entries'] else: continue + tweetCount += self._count_tweets(entries) for entry in entries: + if not (entry['entryId'].startswith('sq-cursor-') or entry['entryId'].startswith('cursor-')): + continue + if apiType is _TwitterAPIType.V2: + entryCursor = entry['content']['operation']['cursor']['value'] + entryCursorStop = entry['content']['operation']['cursor'].get('stopOnEmptyResponse', None) + elif apiType is _TwitterAPIType.GRAPHQL: + cursorContent = entry['content'] + while cursorContent.get('itemType') == 'TimelineTimelineItem' or cursorContent.get('entryType') == 'TimelineTimelineItem': + cursorContent = cursorContent['itemContent'] + entryCursor, entryCursorStop = cursorContent['value'], cursorContent.get('stopOnEmptyResponse', None) if entry['entryId'] == f'sq-cursor-{dir}' or entry['entryId'].startswith(f'cursor-{dir}-'): - newCursor = entry['content']['operation']['cursor']['value'] - if 'stopOnEmptyResponse' in entry['content']['operation']['cursor']: - stopOnEmptyResponse = entry['content']['operation']['cursor']['stopOnEmptyResponse'] + newCursor = entryCursor + if entryCursorStop is not None: + stopOnEmptyResponse = entryCursorStop elif entry['entryId'].startswith('cursor-showMoreThreadsPrompt-'): # E.g. 'offensive' replies button - promptCursor = entry['content']['operation']['cursor']['value'] + promptCursor = entryCursor elif direction is _ScrollDirection.BOTH and bottomCursorAndStop is None and (entry['entryId'] == 'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-')): - newBottomCursorAndStop = (entry['content']['operation']['cursor']['value'], entry['content']['operation']['cursor'].get('stopOnEmptyResponse', False)) + newBottomCursorAndStop = (entryCursor, entryCursorStop or False) if bottomCursorAndStop is None and newBottomCursorAndStop is not None: bottomCursorAndStop = newBottomCursorAndStop - if newCursor == cursor and self._count_tweets(obj) == 0: + if newCursor == cursor and tweetCount == 0: # Twitter sometimes returns the same cursor as requested and no results even though there are more results. # When this happens, retry the same cursor up to the retries setting. emptyResponsesOnCursor += 1 if emptyResponsesOnCursor > self._retries: break - if not newCursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0): + if not newCursor or (stopOnEmptyResponse and tweetCount == 0): # End of pagination if promptCursor is not None: newCursor = promptCursor @@ -413,21 +453,10 @@ class _TwitterAPIScraper(snscrape.base.Scraper): reqParams = paginationParams.copy() reqParams['cursor'] = cursor - def _count_tweets(self, obj): - count = 0 - for instruction in obj['timeline']['instructions']: - if 'addEntries' in instruction: - entries = instruction['addEntries']['entries'] - elif 'replaceEntry' in instruction: - entries = [instruction['replaceEntry']['entry']] - else: - continue - for entry in entries: - if entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-'): - count += 1 - return count + def _count_tweets(self, entries): + return sum(entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-') for entry in entries) - def _instructions_to_tweets(self, obj, includeConversationThreads = False): + def _v2_timeline_instructions_to_tweets(self, obj, includeConversationThreads = False): # No data format test, just a hard and loud crash if anything's wrong :-) for instruction in obj['timeline']['instructions']: if 'addEntries' in instruction: @@ -444,7 +473,7 @@ class _TwitterAPIScraper(snscrape.base.Scraper): if item['entryId'].startswith('tweet-'): yield from self._instruction_tweet_entry_to_tweet(item['entryId'], item, obj) - def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj): + def _v2_instruction_tweet_entry_to_tweet(self, entryId, entry, obj): if 'tweet' in entry['item']['content']: if 'promotedMetadata' in entry['item']['content']['tweet']: # Promoted tweet aka ads return @@ -463,18 +492,17 @@ class _TwitterAPIScraper(snscrape.base.Scraper): raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}') yield self._tweet_to_tweet(tweet, obj) - def _tweet_to_tweet(self, tweet, obj): - # Transforms a Twitter API tweet object into a Tweet + def _make_tweet(self, tweet, user, retweetedTweet = None, quotedTweet = None): kwargs = {} kwargs['id'] = tweet['id'] if 'id' in tweet else int(tweet['id_str']) kwargs['content'] = tweet['full_text'] kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities'].get('urls')) - kwargs['user'] = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']]) + kwargs['user'] = user kwargs['date'] = email.utils.parsedate_to_datetime(tweet['created_at']) if tweet['entities'].get('urls'): kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']] kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']] - kwargs['url'] = f'https://twitter.com/{obj["globalObjects"]["users"][tweet["user_id_str"]]["screen_name"]}/status/{kwargs["id"]}' + kwargs['url'] = f'https://twitter.com/{user.username}/status/{kwargs["id"]}' kwargs['replyCount'] = tweet['reply_count'] kwargs['retweetCount'] = tweet['retweet_count'] kwargs['likeCount'] = tweet['favorite_count'] @@ -511,18 +539,20 @@ class _TwitterAPIScraper(snscrape.base.Scraper): } if medium['type'] == 'video': mKwargs['duration'] = medium['video_info']['duration_millis'] / 1000 - if (ext := medium['ext']) and (mediaStats := ext['mediaStats']) and isinstance(r := mediaStats['r'], dict) and 'ok' in r and isinstance(r['ok'], dict): + if (ext := medium.get('ext')) and (mediaStats := ext['mediaStats']) and isinstance(r := mediaStats['r'], dict) and 'ok' in r and isinstance(r['ok'], dict): mKwargs['views'] = int(r['ok']['viewCount']) + elif (mediaStats := medium.get('mediaStats')): + mKwargs['views'] = mediaStats['viewCount'] cls = Video elif medium['type'] == 'animated_gif': cls = Gif media.append(cls(**mKwargs)) if media: kwargs['media'] = media - if 'retweeted_status_id_str' in tweet: - kwargs['retweetedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj) - if 'quoted_status_id_str' in tweet and tweet['quoted_status_id_str'] in obj['globalObjects']['tweets']: - kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj) + if retweetedTweet: + kwargs['retweetedTweet'] = retweetedTweet + if quotedTweet: + kwargs['quotedTweet'] = quotedTweet if (inReplyToTweetId := tweet.get('in_reply_to_status_id_str')): kwargs['inReplyToTweetId'] = int(inReplyToTweetId) inReplyToUserId = int(tweet['in_reply_to_user_id_str']) @@ -557,6 +587,50 @@ class _TwitterAPIScraper(snscrape.base.Scraper): kwargs['cashtags'] = [o['text'] for o in tweet['entities']['symbols']] return Tweet(**kwargs) + def _tweet_to_tweet(self, tweet, obj): + user = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']]) + kwargs = {} + if 'retweeted_status_id_str' in tweet: + kwargs['retweetedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj) + if 'quoted_status_id_str' in tweet and tweet['quoted_status_id_str'] in obj['globalObjects']['tweets']: + kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj) + return self._make_tweet(tweet, user, **kwargs) + + def _graphql_timeline_tweet_item_result_to_tweet(self, result): + tweet = result['legacy'] + userId = int(result['core']['user_results']['result']['rest_id']) + user = self._user_to_user(result['core']['user_results']['result']['legacy'], id_ = userId) + kwargs = {} + if 'retweeted_status_result' in tweet: + kwargs['retweetedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(tweet['retweeted_status_result']['result']) + #TODO Does retweetedRefResult exist? + if 'quoted_status_result' in result: + if result['quoted_status_result']['result']['__typename'] == 'TweetTombstone': + kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) + else: + kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quoted_status_result']['result']) + elif 'quotedRefResult' in result: + if result['quotedRefResult']['result']['__typename'] == 'TweetTombstone': + kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) + else: + kwargs['quotedTweet'] = TweetRef(id = int(result['quotedRefResult']['result']['rest_id'])) + return self._make_tweet(tweet, user, **kwargs) + + def _graphql_timeline_instructions_to_tweets(self, instructions, includeConversationThreads = False): + for instruction in instructions: + if instruction['type'] != 'TimelineAddEntries': + continue + for entry in instruction['entries']: + if entry['entryId'].startswith('tweet-'): + if entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet': + yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result']) + else: + logger.warning('Got unrecognised timeline tweet item(s)') + elif includeConversationThreads and entry['entryId'].startswith('conversationthread-'): #TODO show more cursor? + for item in entry['content']['items']: + if item['entryId'].startswith(f'{entry["entryId"]}-tweet-'): + yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result']) + def _render_text_with_urls(self, text, urls): if not urls: return text @@ -570,10 +644,10 @@ class _TwitterAPIScraper(snscrape.base.Scraper): out.append(text[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None]) return ''.join(out) - def _user_to_user(self, user): + def _user_to_user(self, user, id_ = None): kwargs = {} kwargs['username'] = user['screen_name'] - kwargs['id'] = user['id'] if 'id' in user else int(user['id_str']) + kwargs['id'] = id_ if id_ else user['id'] if 'id' in user else int(user['id_str']) kwargs['displayname'] = user['name'] kwargs['description'] = self._render_text_with_urls(user['description'], user['entities']['description'].get('urls')) kwargs['rawDescription'] = user['description'] @@ -677,8 +751,8 @@ class TwitterSearchScraper(_TwitterAPIScraper): del params['tweet_search_mode'] del paginationParams['tweet_search_mode'] - for obj in self._iter_api_data('https://api.twitter.com/2/search/adaptive.json', params, paginationParams, cursor = self._cursor): - yield from self._instructions_to_tweets(obj) + for obj in self._iter_api_data('https://api.twitter.com/2/search/adaptive.json', _TwitterAPIType.V2, params, paginationParams, cursor = self._cursor): + yield from self._v2_timeline_instructions_to_tweets(obj) @classmethod def _cli_setup_parser(cls, subparser): @@ -706,15 +780,15 @@ class TwitterUserScraper(TwitterSearchScraper): self._ensure_guest_token() if not self._isUserId: fieldName = 'screen_name' - endpoint = 'https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName' + endpoint = 'https://twitter.com/i/api/graphql/7mjxD3-C6BxitPMVQ6w0-Q/UserByScreenName' else: fieldName = 'userId' - endpoint = 'https://twitter.com/i/api/graphql/WN6Hck-Pwm-YP0uxVj1oMQ/UserByRestIdWithoutResults' - params = {'variables': json.dumps({fieldName: str(self._user), 'withHighlightedLabel': True}, separators = (',', ':'))} - obj = self._get_api_data(endpoint, params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)) + endpoint = 'https://twitter.com/i/api/graphql/I5nvpI91ljifos1Y3Lltyg/UserByRestId' + variables = {fieldName: str(self._user), 'withSafetyModeUserFields': True, 'withSuperFollowsUserFields': True} + obj = self._get_api_data(endpoint, _TwitterAPIType.GRAPHQL, params = variables) if not obj['data']: return None - user = obj['data']['user'] + user = obj['data']['user']['result'] rawDescription = user['legacy']['description'] description = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls']) label = None @@ -779,39 +853,32 @@ class TwitterProfileScraper(TwitterUserScraper): userId = self.entity.id else: userId = self._user - paginationParams = { - 'include_profile_interstitial_type': '1', - 'include_blocking': '1', - 'include_blocked_by': '1', - 'include_followed_by': '1', - 'include_want_retweets': '1', - 'include_mute_edge': '1', - 'include_can_dm': '1', - 'include_can_media_tag': '1', - 'skip_status': '1', - 'cards_platform': 'Web-12', - 'include_cards': '1', - 'include_ext_alt_text': 'true', - 'include_quote_count': 'true', - 'include_reply_count': '1', - 'tweet_mode': 'extended', - 'include_entities': 'true', - 'include_user_entities': 'true', - 'include_ext_media_color': 'true', - 'include_ext_media_availability': 'true', - 'send_error_codes': 'true', - 'simple_quoted_tweets': 'true', - 'include_tweet_replies': 'true', + paginationVariables = { 'userId': userId, - 'count': '100', + 'count': 100, 'cursor': None, - 'ext': 'mediaStats,highlightedLabel', + 'includePromotedContent': True, + 'withCommunity': True, + 'withSuperFollowsUserFields': True, + 'withDownvotePerspective': False, + 'withReactionsMetadata': False, + 'withReactionsPerspective': False, + 'withSuperFollowsTweetFields': True, + 'withVoice': True, + 'withV2Timeline': False, } - params = paginationParams.copy() - del params['cursor'] + variables = paginationVariables.copy() + del variables['cursor'] - for obj in self._iter_api_data(f'https://api.twitter.com/2/timeline/profile/{userId}.json', params, paginationParams): - yield from self._instructions_to_tweets(obj) + gotPinned = False + for obj in self._iter_api_data('https://twitter.com/i/api/graphql/BSKxQ9_IaCoVyIvQHQROIQ/UserTweetsAndReplies', _TwitterAPIType.GRAPHQL, variables, paginationVariables): + instructions = obj['data']['user']['result']['timeline']['timeline']['instructions'] + if not gotPinned: + for instruction in instructions: + if instruction['type'] == 'TimelinePinEntry': + gotPinned = True + yield self._graphql_timeline_tweet_item_result_to_tweet(instruction['entry']['content']['itemContent']['tweet_results']['result']) + yield from self._graphql_timeline_instructions_to_tweets(instructions) class TwitterHashtagScraper(TwitterSearchScraper): @@ -853,49 +920,52 @@ class TwitterTweetScraper(_TwitterAPIScraper): super().__init__(f'https://twitter.com/i/web/status/{self._tweetId}', **kwargs) def get_items(self): - paginationParams = { - 'include_profile_interstitial_type': '1', - 'include_blocking': '1', - 'include_blocked_by': '1', - 'include_followed_by': '1', - 'include_want_retweets': '1', - 'include_mute_edge': '1', - 'include_can_dm': '1', - 'include_can_media_tag': '1', - 'skip_status': '1', - 'cards_platform': 'Web-12', - 'include_cards': '1', - 'include_ext_alt_text': 'true', - 'include_quote_count': 'true', - 'include_reply_count': '1', - 'tweet_mode': 'extended', - 'include_entities': 'true', - 'include_user_entities': 'true', - 'include_ext_media_color': 'true', - 'include_ext_media_availability': 'true', - 'send_error_codes': 'true', - 'simple_quoted_tweet': 'true', - 'count': '20', + paginationVariables = { + 'focalTweetId': str(self._tweetId), 'cursor': None, - 'include_ext_has_birdwatch_notes': 'false', - 'ext': 'mediaStats,highlightedLabel', + 'referrer': 'tweet', + 'with_rux_injections': False, + 'includePromotedContent': True, + 'withCommunity': True, + 'withQuickPromoteEligibilityTweetFields': True, + 'withTweetQuoteCount': True, + 'withBirdwatchNotes': True, + 'withSuperFollowsUserFields': True, + 'withBirdwatchPivots': False, + 'withDownvotePerspective': False, + 'withReactionsMetadata': False, + 'withReactionsPerspective': False, + 'withSuperFollowsTweetFields': True, + 'withVoice': True, + 'withV2Timeline': False, } - params = paginationParams.copy() - del params['cursor'] + variables = paginationVariables.copy() + del variables['cursor'], variables['referrer'] + url = 'https://twitter.com/i/api/graphql/8svRea_Lc0_mdhwP6dqe0Q/TweetDetail' if self._mode is TwitterTweetScraperMode.SINGLE: - obj = self._get_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params) - yield self._tweet_to_tweet(obj['globalObjects']['tweets'][str(self._tweetId)], obj) + obj = self._get_api_data(url, _TwitterAPIType.GRAPHQL, params = variables) + for instruction in obj['data']['threaded_conversation_with_injections']['instructions']: + if instruction['type'] != 'TimelineAddEntries': + continue + for entry in instruction['entries']: + if entry['entryId'] == f'tweet-{self._tweetId}' and entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet': + yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result']) + break elif self._mode is TwitterTweetScraperMode.SCROLL: - for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params, paginationParams, direction = _ScrollDirection.BOTH): - yield from self._instructions_to_tweets(obj, includeConversationThreads = True) + for obj in self._iter_api_data(url, _TwitterAPIType.GRAPHQL, variables, paginationVariables, direction = _ScrollDirection.BOTH): + yield from self._graphql_timeline_instructions_to_tweets(obj['data']['threaded_conversation_with_injections']['instructions'], includeConversationThreads = True) elif self._mode is TwitterTweetScraperMode.RECURSE: seenTweets = set() queue = collections.deque() queue.append(self._tweetId) while queue: tweetId = queue.popleft() - for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{tweetId}.json', params, paginationParams, direction = _ScrollDirection.BOTH): - for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True): + thisPagVariables = paginationVariables.copy() + thisPagVariables['focalTweetId'] = str(tweetId) + thisVariables = thisPagVariables.copy() + del thisPagVariables['cursor'], thisPagVariables['referrer'] + for obj in self._iter_api_data(url, _TwitterAPIType.GRAPHQL, thisVariables, thisPagVariables, direction = _ScrollDirection.BOTH): + for tweet in self._graphql_timeline_instructions_to_tweets(obj['data']['threaded_conversation_with_injections']['instructions'], includeConversationThreads = True): if tweet.id not in seenTweets: yield tweet seenTweets.add(tweet.id) @@ -965,7 +1035,7 @@ class TwitterTrendsScraper(_TwitterAPIScraper): 'entity_tokens': 'false', 'ext': 'mediaStats,highlightedLabel,voiceInfo', } - obj = self._get_api_data('https://twitter.com/i/api/2/guide.json', params) + obj = self._get_api_data('https://twitter.com/i/api/2/guide.json', _TwitterAPIType.V2, params) for instruction in obj['timeline']['instructions']: if not 'addEntries' in instruction: continue