From 6a6b02cb28817f303758806227ae0d93f806c945 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Tue, 21 Feb 2023 04:18:12 +0000 Subject: [PATCH] Handle tombstones Closes #392 Fixes #603 --- snscrape/modules/twitter.py | 59 +++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index c4cdf54..e291612 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -102,7 +102,7 @@ class Tweet(snscrape.base.Item): class TextLink: text: typing.Optional[str] url: str - tcourl: str + tcourl: typing.Optional[str] indices: typing.Tuple[int, int] @@ -454,6 +454,18 @@ class TweetRef(snscrape.base.Item): return f'https://twitter.com/i/web/status/{self.id}' +@dataclasses.dataclass +class Tombstone(snscrape.base.Item): + '''A placeholder for a tweet that cannot be accessed''' + + id: int + text: typing.Optional[str] = None + textLinks: typing.Optional[typing.List[TextLink]] = None + + def __str__(self): + return f'https://twitter.com/i/web/status/{self.id}' + + @dataclasses.dataclass class User(snscrape.base.Entity): # Most fields can be None if they're not known. @@ -1352,12 +1364,26 @@ class _TwitterAPIScraper(snscrape.base.Scraper): kwargs['vibe'] = self._make_vibe(tweet['ext']['vibe']['r']['ok']) return self._make_tweet(tweet, user, **kwargs) - def _graphql_timeline_tweet_item_result_to_tweet(self, result): + def _make_tombstone(self, tweetId, info): + if tweetId is None: + raise snscrape.base.ScraperException('Cannot create tombstone without tweet ID') + if info and (text := info.get('richText', info['text'])): + return Tombstone( + id = tweetId, + text = text['text'], + textLinks = [TextLink(text = text['text'][x['fromIndex']:x['toIndex']], url = x['ref']['url'], tcourl = None, indices = (x['fromIndex'], x['toIndex'])) for x in text['entities']], + ) + else: + return Tombstone(id = tweetId) + + def _graphql_timeline_tweet_item_result_to_tweet(self, result, tweetId = None): if result['__typename'] == 'Tweet': pass elif result['__typename'] == 'TweetWithVisibilityResults': #TODO Include result['softInterventionPivot'] in the Tweet object result = result['tweet'] + elif result['__typename'] == 'TweetTombstone': + return self._make_tombstone(tweetId, result.get('tombstone')) else: raise snscrape.base.ScraperException(f'Unknown result type {result["__typename"]!r}') tweet = result['legacy'] @@ -1365,19 +1391,21 @@ class _TwitterAPIScraper(snscrape.base.Scraper): user = self._user_to_user(result['core']['user_results']['result']['legacy'], id_ = userId) kwargs = {} if 'retweeted_status_result' in tweet: + #TODO Tombstones will cause a crash here. kwargs['retweetedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(tweet['retweeted_status_result']['result']) if 'quoted_status_result' in result: - if result['quoted_status_result']['result']['__typename'] == 'TweetTombstone': - kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) - else: - kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quoted_status_result']['result']) - elif 'quotedRefResult' in result: + kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quoted_status_result']['result'], tweetId = int(tweet['quoted_status_id_str'])) + elif result.get('quotedRefResult'): if result['quotedRefResult']['result']['__typename'] == 'TweetTombstone': - kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) + kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quotedRefResult']['result'], tweetId = int(tweet['quoted_status_id_str'])) else: + if result['quotedRefResult']['result']['__typename'] != 'Tweet': + _logger.warning(f'Unknown quotedRefResult type {result["quotedRefResult"]["result"]["__typename"]!r} on tweet {self._get_tweet_id(tweet)}, using TweetRef') kwargs['quotedTweet'] = TweetRef(id = int(result['quotedRefResult']['result']['rest_id'])) elif 'quoted_status_id_str' in tweet: - kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) + # Omit the TweetRef if this is a retweet and the quoted tweet ID matches the tweet quoted in the retweeted tweet. + if tweet['quoted_status_id_str'] != tweet.get('retweeted_status_result', {}).get('result', {}).get('quoted_status_result', {}).get('result', {}).get('rest_id'): + kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str'])) if 'card' in result: kwargs['card'] = self._make_card(result['card'], _TwitterAPIType.GRAPHQL, self._get_tweet_id(tweet)) if 'views' in result and 'count' in result['views']: @@ -1392,17 +1420,21 @@ class _TwitterAPIScraper(snscrape.base.Scraper): continue for entry in instruction['entries']: if entry['entryId'].startswith('tweet-'): + tweetId = int(entry['entryId'].split('-', 1)[1]) if entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet': if 'result' not in entry['content']['itemContent']['tweet_results']: _logger.warning(f'Skipping empty tweet entry {entry["entryId"]}') continue - yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result']) + yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'], tweetId = tweetId) else: _logger.warning('Got unrecognised timeline tweet item(s)') elif includeConversationThreads and entry['entryId'].startswith('conversationthread-'): #TODO show more cursor? for item in entry['content']['items']: if item['entryId'].startswith(f'{entry["entryId"]}-tweet-'): - yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result']) + tweetId = int(item['entryId'][len(entry['entryId']) + 7:]) + yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result'], tweetId = tweetId) + elif not entry['entryId'].startswith('cursor-'): + _logger.warning(f'Skipping unrecognised entry ID: {entry["entryId"]!r}') def _render_text_with_urls(self, text, urls): if not urls: @@ -1707,7 +1739,8 @@ class TwitterProfileScraper(TwitterUserScraper): for instruction in instructions: if instruction['type'] == 'TimelinePinEntry': gotPinned = True - yield self._graphql_timeline_tweet_item_result_to_tweet(instruction['entry']['content']['itemContent']['tweet_results']['result']) + tweetId = int(instruction['entry']['entryId'][6:]) if instruction['entry']['entryId'].startswith('tweet-') else None + yield self._graphql_timeline_tweet_item_result_to_tweet(instruction['entry']['content']['itemContent']['tweet_results']['result'], tweetId = tweetId) yield from self._graphql_timeline_instructions_to_tweets(instructions) @@ -1802,7 +1835,7 @@ class TwitterTweetScraper(_TwitterAPIScraper): continue for entry in instruction['entries']: if entry['entryId'] == f'tweet-{self._tweetId}' and entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet': - yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result']) + yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'], tweetId = self._tweetId) break elif self._mode is TwitterTweetScraperMode.SCROLL: for obj in self._iter_api_data(url, _TwitterAPIType.GRAPHQL, params, paginationParams, direction = _ScrollDirection.BOTH):