From 45d1fa27de7bea4d77076fc6df9455abd1f5ce43 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Sun, 23 May 2021 02:12:13 +0000 Subject: [PATCH] Add twitter-tweet scraper for retrieving tweets by ID, including scroll and recursion modes Closes #51, closes #137 --- snscrape/modules/twitter.py | 164 +++++++++++++++++++++++++++++++----- 1 file changed, 145 insertions(+), 19 deletions(-) diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index 52b1528..6f0fdbd 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -1,7 +1,9 @@ import bs4 +import collections import dataclasses import datetime import email.utils +import enum import itertools import json import random @@ -138,6 +140,12 @@ class User(snscrape.base.Entity): return self.url +class ScrollDirection(enum.Enum): + TOP = enum.auto() + BOTTOM = enum.auto() + BOTH = enum.auto() + + class TwitterAPIScraper(snscrape.base.Scraper): def __init__(self, baseUrl, **kwargs): super().__init__(**kwargs) @@ -193,15 +201,25 @@ class TwitterAPIScraper(snscrape.base.Scraper): raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e return obj - def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None): + def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = ScrollDirection.BOTTOM): # Iterate over endpoint with params/paginationParams, optionally starting from a cursor # Handles guest token extraction using the baseUrl passed to __init__ etc. # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten). + # direction controls in which direction it should scroll from the initial response. BOTH equals TOP followed by BOTTOM. + + # Logic for dual scrolling: direction is set to top, but if the bottom cursor is found, bottomCursorAndStop is set accordingly. + # Once the top pagination is exhausted, the bottomCursorAndStop is used and reset to None; it isn't set anymore after because the first entry condition will always be true for the bottom cursor. + if cursor is None: reqParams = params else: reqParams = paginationParams.copy() reqParams['cursor'] = cursor + bottomCursorAndStop = None + if direction is ScrollDirection.TOP or direction is ScrollDirection.BOTH: + dir = 'top' + else: + dir = 'bottom' stopOnEmptyResponse = False while True: logger.info(f'Retrieving scroll page {cursor}') @@ -210,6 +228,8 @@ class TwitterAPIScraper(snscrape.base.Scraper): # No data format test, just a hard and loud crash if anything's wrong :-) newCursor = None + promptCursor = None + newBottomCursorAndStop = None for instruction in obj['timeline']['instructions']: if 'addEntries' in instruction: entries = instruction['addEntries']['entries'] @@ -218,13 +238,26 @@ class TwitterAPIScraper(snscrape.base.Scraper): else: continue for entry in entries: - if entry['entryId'] == 'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-'): + if entry['entryId'] == f'sq-cursor-{dir}' or entry['entryId'].startswith(f'cursor-{dir}-'): newCursor = entry['content']['operation']['cursor']['value'] if 'stopOnEmptyResponse' in entry['content']['operation']['cursor']: stopOnEmptyResponse = entry['content']['operation']['cursor']['stopOnEmptyResponse'] + elif entry['entryId'].startswith('cursor-showMoreThreadsPrompt-'): # E.g. 'offensive' replies button + promptCursor = entry['content']['operation']['cursor']['value'] + elif direction is ScrollDirection.BOTH and bottomCursorAndStop is None and (entry['entryId'] == f'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-')): + newBottomCursorAndStop = (entry['content']['operation']['cursor']['value'], entry['content']['operation']['cursor'].get('stopOnEmptyResponse', False)) + if bottomCursorAndStop is None and newBottomCursorAndStop is not None: + bottomCursorAndStop = newBottomCursorAndStop if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0): # End of pagination - break + if promptCursor is not None: + newCursor = promptCursor + elif direction is ScrollDirection.BOTH and bottomCursorAndStop is not None: + dir = 'bottom' + newCursor, stopOnEmptyResponse = bottomCursorAndStop + bottomCursorAndStop = None + else: + break cursor = newCursor reqParams = paginationParams.copy() reqParams['cursor'] = cursor @@ -243,7 +276,7 @@ class TwitterAPIScraper(snscrape.base.Scraper): count += 1 return count - def _instructions_to_tweets(self, obj): + def _instructions_to_tweets(self, obj, includeConversationThreads = False): # No data format test, just a hard and loud crash if anything's wrong :-) for instruction in obj['timeline']['instructions']: if 'addEntries' in instruction: @@ -254,21 +287,30 @@ class TwitterAPIScraper(snscrape.base.Scraper): continue for entry in entries: if entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-'): - if 'tweet' in entry['content']['item']['content']: - if 'promotedMetadata' in entry['content']['item']['content']['tweet']: # Promoted tweet aka ads - continue - if entry['content']['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']: - logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tweet"]["id"]} which is not in globalObjects') - continue - tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']] - elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']: - if entry['content']['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']: - logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects') - continue - tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']] - else: - raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}') - yield self._tweet_to_tweet(tweet, obj) + yield from self._instruction_tweet_entry_to_tweet(entry['entryId'], entry['content'], obj) + elif includeConversationThreads and entry['entryId'].startswith('conversationThread-') and not entry['entryId'].endswith('-show_more_cursor'): + for item in entry['content']['timelineModule']['items']: + if item['entryId'].startswith('tweet-'): + yield from self._instruction_tweet_entry_to_tweet(item['entryId'], item, obj) + + def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj): + if 'tweet' in entry['item']['content']: + if 'promotedMetadata' in entry['item']['content']['tweet']: # Promoted tweet aka ads + return + if entry['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']: + logger.warning(f'Skipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects') + return + tweet = obj['globalObjects']['tweets'][entry['item']['content']['tweet']['id']] + elif 'tombstone' in entry['item']['content']: + if 'tweet' not in entry['item']['content']['tombstone']: # E.g. deleted reply + return + if entry['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']: + logger.warning(f'Skipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects') + return + tweet = obj['globalObjects']['tweets'][entry['item']['content']['tombstone']['tweet']['id']] + else: + raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}') + yield self._tweet_to_tweet(tweet, obj) def _tweet_to_tweet(self, tweet, obj): # Transforms a Twitter API tweet object into a Tweet @@ -586,6 +628,90 @@ class TwitterHashtagScraper(TwitterSearchScraper): return cls(args.hashtag, retries = args.retries) +class TwitterTweetScraperMode(enum.Enum): + SINGLE = 'single' + SCROLL = 'scroll' + RECURSE = 'recurse' + + @classmethod + def from_args(cls, args): + if args.scroll: + return cls.SCROLL + if args.recurse: + return cls.RECURSE + return cls.SINGLE + + +class TwitterTweetScraper(TwitterAPIScraper): + name = 'twitter-tweet' + + def __init__(self, tweetId, mode, **kwargs): + self._tweetId = tweetId + self._mode = mode + super().__init__(f'https://twitter.com/i/web/{self._tweetId}', **kwargs) + + def get_items(self): + paginationParams = { + 'include_profile_interstitial_type': '1', + 'include_blocking': '1', + 'include_blocked_by': '1', + 'include_followed_by': '1', + 'include_want_retweets': '1', + 'include_mute_edge': '1', + 'include_can_dm': '1', + 'include_can_media_tag': '1', + 'skip_status': '1', + 'cards_platform': 'Web-12', + 'include_cards': '1', + 'include_ext_alt_text': 'true', + 'include_quote_count': 'true', + 'include_reply_count': '1', + 'tweet_mode': 'extended', + 'include_entities': 'true', + 'include_user_entities': 'true', + 'include_ext_media_color': 'true', + 'include_ext_media_availability': 'true', + 'send_error_codes': 'true', + 'simple_quoted_tweet': 'true', + 'count': '20', + 'cursor': None, + 'include_ext_has_birdwatch_notes': 'false', + 'ext': 'mediaStats%2ChighlightedLabel', + } + params = paginationParams.copy() + del params['cursor'] + if self._mode is TwitterTweetScraperMode.SINGLE: + obj = self._get_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params) + yield self._tweet_to_tweet(obj['globalObjects']['tweets'][str(self._tweetId)], obj) + elif self._mode is TwitterTweetScraperMode.SCROLL: + for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH): + yield from self._instructions_to_tweets(obj, includeConversationThreads = True) + elif self._mode is TwitterTweetScraperMode.RECURSE: + seenTweets = set() + queue = collections.deque() + queue.append(self._tweetId) + while queue: + tweetId = queue.popleft() + for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH): + for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True): + if tweet.id not in seenTweets: + yield tweet + seenTweets.add(tweet.id) + if tweet.replyCount: + queue.append(tweet.id) + + @classmethod + def setup_parser(cls, subparser): + group = subparser.add_mutually_exclusive_group(required = False) + group.add_argument('--scroll', action = 'store_true', default = False, help = 'Enable scrolling in both directions') + group.add_argument('--recurse', '--recursive', action = 'store_true', default = False, help = 'Enable recursion through all tweets encountered (warning: slow, potentially memory-intensive!)') + subparser.add_argument('tweetId', type = int, help = 'A tweet ID') + + @classmethod + def from_args(cls, args): + return cls(args.tweetId, TwitterTweetScraperMode.from_args(args), retries = args.retries) + + class TwitterListPostsScraper(TwitterSearchScraper): name = 'twitter-list-posts'