Add twitter-tweet scraper for retrieving tweets by ID, including scroll and recursion modes

Closes #51, closes #137
This commit is contained in:
JustAnotherArchivist
2021-05-23 02:12:13 +00:00
parent 98b798b0e5
commit 45d1fa27de

View File

@@ -1,7 +1,9 @@
import bs4 import bs4
import collections
import dataclasses import dataclasses
import datetime import datetime
import email.utils import email.utils
import enum
import itertools import itertools
import json import json
import random import random
@@ -138,6 +140,12 @@ class User(snscrape.base.Entity):
return self.url return self.url
class ScrollDirection(enum.Enum):
TOP = enum.auto()
BOTTOM = enum.auto()
BOTH = enum.auto()
class TwitterAPIScraper(snscrape.base.Scraper): class TwitterAPIScraper(snscrape.base.Scraper):
def __init__(self, baseUrl, **kwargs): def __init__(self, baseUrl, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
@@ -193,15 +201,25 @@ class TwitterAPIScraper(snscrape.base.Scraper):
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
return obj return obj
def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None): def _iter_api_data(self, endpoint, params, paginationParams = None, cursor = None, direction = ScrollDirection.BOTTOM):
# Iterate over endpoint with params/paginationParams, optionally starting from a cursor # Iterate over endpoint with params/paginationParams, optionally starting from a cursor
# Handles guest token extraction using the baseUrl passed to __init__ etc. # Handles guest token extraction using the baseUrl passed to __init__ etc.
# Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten). # Order from params and paginationParams is preserved. To insert the cursor at a particular location, insert a 'cursor' key into paginationParams there (value is overwritten).
# direction controls in which direction it should scroll from the initial response. BOTH equals TOP followed by BOTTOM.
# Logic for dual scrolling: direction is set to top, but if the bottom cursor is found, bottomCursorAndStop is set accordingly.
# Once the top pagination is exhausted, the bottomCursorAndStop is used and reset to None; it isn't set anymore after because the first entry condition will always be true for the bottom cursor.
if cursor is None: if cursor is None:
reqParams = params reqParams = params
else: else:
reqParams = paginationParams.copy() reqParams = paginationParams.copy()
reqParams['cursor'] = cursor reqParams['cursor'] = cursor
bottomCursorAndStop = None
if direction is ScrollDirection.TOP or direction is ScrollDirection.BOTH:
dir = 'top'
else:
dir = 'bottom'
stopOnEmptyResponse = False stopOnEmptyResponse = False
while True: while True:
logger.info(f'Retrieving scroll page {cursor}') logger.info(f'Retrieving scroll page {cursor}')
@@ -210,6 +228,8 @@ class TwitterAPIScraper(snscrape.base.Scraper):
# No data format test, just a hard and loud crash if anything's wrong :-) # No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None newCursor = None
promptCursor = None
newBottomCursorAndStop = None
for instruction in obj['timeline']['instructions']: for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction: if 'addEntries' in instruction:
entries = instruction['addEntries']['entries'] entries = instruction['addEntries']['entries']
@@ -218,13 +238,26 @@ class TwitterAPIScraper(snscrape.base.Scraper):
else: else:
continue continue
for entry in entries: for entry in entries:
if entry['entryId'] == 'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-'): if entry['entryId'] == f'sq-cursor-{dir}' or entry['entryId'].startswith(f'cursor-{dir}-'):
newCursor = entry['content']['operation']['cursor']['value'] newCursor = entry['content']['operation']['cursor']['value']
if 'stopOnEmptyResponse' in entry['content']['operation']['cursor']: if 'stopOnEmptyResponse' in entry['content']['operation']['cursor']:
stopOnEmptyResponse = entry['content']['operation']['cursor']['stopOnEmptyResponse'] stopOnEmptyResponse = entry['content']['operation']['cursor']['stopOnEmptyResponse']
elif entry['entryId'].startswith('cursor-showMoreThreadsPrompt-'): # E.g. 'offensive' replies button
promptCursor = entry['content']['operation']['cursor']['value']
elif direction is ScrollDirection.BOTH and bottomCursorAndStop is None and (entry['entryId'] == f'sq-cursor-bottom' or entry['entryId'].startswith('cursor-bottom-')):
newBottomCursorAndStop = (entry['content']['operation']['cursor']['value'], entry['content']['operation']['cursor'].get('stopOnEmptyResponse', False))
if bottomCursorAndStop is None and newBottomCursorAndStop is not None:
bottomCursorAndStop = newBottomCursorAndStop
if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0): if not newCursor or newCursor == cursor or (stopOnEmptyResponse and self._count_tweets(obj) == 0):
# End of pagination # End of pagination
break if promptCursor is not None:
newCursor = promptCursor
elif direction is ScrollDirection.BOTH and bottomCursorAndStop is not None:
dir = 'bottom'
newCursor, stopOnEmptyResponse = bottomCursorAndStop
bottomCursorAndStop = None
else:
break
cursor = newCursor cursor = newCursor
reqParams = paginationParams.copy() reqParams = paginationParams.copy()
reqParams['cursor'] = cursor reqParams['cursor'] = cursor
@@ -243,7 +276,7 @@ class TwitterAPIScraper(snscrape.base.Scraper):
count += 1 count += 1
return count return count
def _instructions_to_tweets(self, obj): def _instructions_to_tweets(self, obj, includeConversationThreads = False):
# No data format test, just a hard and loud crash if anything's wrong :-) # No data format test, just a hard and loud crash if anything's wrong :-)
for instruction in obj['timeline']['instructions']: for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction: if 'addEntries' in instruction:
@@ -254,21 +287,30 @@ class TwitterAPIScraper(snscrape.base.Scraper):
continue continue
for entry in entries: for entry in entries:
if entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-'): if entry['entryId'].startswith('sq-I-t-') or entry['entryId'].startswith('tweet-'):
if 'tweet' in entry['content']['item']['content']: yield from self._instruction_tweet_entry_to_tweet(entry['entryId'], entry['content'], obj)
if 'promotedMetadata' in entry['content']['item']['content']['tweet']: # Promoted tweet aka ads elif includeConversationThreads and entry['entryId'].startswith('conversationThread-') and not entry['entryId'].endswith('-show_more_cursor'):
continue for item in entry['content']['timelineModule']['items']:
if entry['content']['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']: if item['entryId'].startswith('tweet-'):
logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tweet"]["id"]} which is not in globalObjects') yield from self._instruction_tweet_entry_to_tweet(item['entryId'], item, obj)
continue
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']] def _instruction_tweet_entry_to_tweet(self, entryId, entry, obj):
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']: if 'tweet' in entry['item']['content']:
if entry['content']['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']: if 'promotedMetadata' in entry['item']['content']['tweet']: # Promoted tweet aka ads
logger.warning(f'Skipping tweet {entry["content"]["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects') return
continue if entry['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']] logger.warning(f'Skipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects')
else: return
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}') tweet = obj['globalObjects']['tweets'][entry['item']['content']['tweet']['id']]
yield self._tweet_to_tweet(tweet, obj) elif 'tombstone' in entry['item']['content']:
if 'tweet' not in entry['item']['content']['tombstone']: # E.g. deleted reply
return
if entry['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']:
logger.warning(f'Skipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects')
return
tweet = obj['globalObjects']['tweets'][entry['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entryId!r}')
yield self._tweet_to_tweet(tweet, obj)
def _tweet_to_tweet(self, tweet, obj): def _tweet_to_tweet(self, tweet, obj):
# Transforms a Twitter API tweet object into a Tweet # Transforms a Twitter API tweet object into a Tweet
@@ -586,6 +628,90 @@ class TwitterHashtagScraper(TwitterSearchScraper):
return cls(args.hashtag, retries = args.retries) return cls(args.hashtag, retries = args.retries)
class TwitterTweetScraperMode(enum.Enum):
SINGLE = 'single'
SCROLL = 'scroll'
RECURSE = 'recurse'
@classmethod
def from_args(cls, args):
if args.scroll:
return cls.SCROLL
if args.recurse:
return cls.RECURSE
return cls.SINGLE
class TwitterTweetScraper(TwitterAPIScraper):
name = 'twitter-tweet'
def __init__(self, tweetId, mode, **kwargs):
self._tweetId = tweetId
self._mode = mode
super().__init__(f'https://twitter.com/i/web/{self._tweetId}', **kwargs)
def get_items(self):
paginationParams = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
'include_blocked_by': '1',
'include_followed_by': '1',
'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_ext_alt_text': 'true',
'include_quote_count': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweet': 'true',
'count': '20',
'cursor': None,
'include_ext_has_birdwatch_notes': 'false',
'ext': 'mediaStats%2ChighlightedLabel',
}
params = paginationParams.copy()
del params['cursor']
if self._mode is TwitterTweetScraperMode.SINGLE:
obj = self._get_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params)
yield self._tweet_to_tweet(obj['globalObjects']['tweets'][str(self._tweetId)], obj)
elif self._mode is TwitterTweetScraperMode.SCROLL:
for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{self._tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH):
yield from self._instructions_to_tweets(obj, includeConversationThreads = True)
elif self._mode is TwitterTweetScraperMode.RECURSE:
seenTweets = set()
queue = collections.deque()
queue.append(self._tweetId)
while queue:
tweetId = queue.popleft()
for obj in self._iter_api_data(f'https://twitter.com/i/api/2/timeline/conversation/{tweetId}.json', params, paginationParams, direction = ScrollDirection.BOTH):
for tweet in self._instructions_to_tweets(obj, includeConversationThreads = True):
if tweet.id not in seenTweets:
yield tweet
seenTweets.add(tweet.id)
if tweet.replyCount:
queue.append(tweet.id)
@classmethod
def setup_parser(cls, subparser):
group = subparser.add_mutually_exclusive_group(required = False)
group.add_argument('--scroll', action = 'store_true', default = False, help = 'Enable scrolling in both directions')
group.add_argument('--recurse', '--recursive', action = 'store_true', default = False, help = 'Enable recursion through all tweets encountered (warning: slow, potentially memory-intensive!)')
subparser.add_argument('tweetId', type = int, help = 'A tweet ID')
@classmethod
def from_args(cls, args):
return cls(args.tweetId, TwitterTweetScraperMode.from_args(args), retries = args.retries)
class TwitterListPostsScraper(TwitterSearchScraper): class TwitterListPostsScraper(TwitterSearchScraper):
name = 'twitter-list-posts' name = 'twitter-list-posts'