Handle tombstones

Closes #392
Fixes #603
This commit is contained in:
JustAnotherArchivist
2023-02-21 04:18:12 +00:00
parent 3d6cd63a00
commit 6a6b02cb28

View File

@@ -102,7 +102,7 @@ class Tweet(snscrape.base.Item):
class TextLink:
text: typing.Optional[str]
url: str
tcourl: str
tcourl: typing.Optional[str]
indices: typing.Tuple[int, int]
@@ -454,6 +454,18 @@ class TweetRef(snscrape.base.Item):
return f'https://twitter.com/i/web/status/{self.id}'
@dataclasses.dataclass
class Tombstone(snscrape.base.Item):
'''A placeholder for a tweet that cannot be accessed'''
id: int
text: typing.Optional[str] = None
textLinks: typing.Optional[typing.List[TextLink]] = None
def __str__(self):
return f'https://twitter.com/i/web/status/{self.id}'
@dataclasses.dataclass
class User(snscrape.base.Entity):
# Most fields can be None if they're not known.
@@ -1352,12 +1364,26 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
kwargs['vibe'] = self._make_vibe(tweet['ext']['vibe']['r']['ok'])
return self._make_tweet(tweet, user, **kwargs)
def _graphql_timeline_tweet_item_result_to_tweet(self, result):
def _make_tombstone(self, tweetId, info):
if tweetId is None:
raise snscrape.base.ScraperException('Cannot create tombstone without tweet ID')
if info and (text := info.get('richText', info['text'])):
return Tombstone(
id = tweetId,
text = text['text'],
textLinks = [TextLink(text = text['text'][x['fromIndex']:x['toIndex']], url = x['ref']['url'], tcourl = None, indices = (x['fromIndex'], x['toIndex'])) for x in text['entities']],
)
else:
return Tombstone(id = tweetId)
def _graphql_timeline_tweet_item_result_to_tweet(self, result, tweetId = None):
if result['__typename'] == 'Tweet':
pass
elif result['__typename'] == 'TweetWithVisibilityResults':
#TODO Include result['softInterventionPivot'] in the Tweet object
result = result['tweet']
elif result['__typename'] == 'TweetTombstone':
return self._make_tombstone(tweetId, result.get('tombstone'))
else:
raise snscrape.base.ScraperException(f'Unknown result type {result["__typename"]!r}')
tweet = result['legacy']
@@ -1365,19 +1391,21 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
user = self._user_to_user(result['core']['user_results']['result']['legacy'], id_ = userId)
kwargs = {}
if 'retweeted_status_result' in tweet:
#TODO Tombstones will cause a crash here.
kwargs['retweetedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(tweet['retweeted_status_result']['result'])
if 'quoted_status_result' in result:
if result['quoted_status_result']['result']['__typename'] == 'TweetTombstone':
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
else:
kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quoted_status_result']['result'])
elif 'quotedRefResult' in result:
kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quoted_status_result']['result'], tweetId = int(tweet['quoted_status_id_str']))
elif result.get('quotedRefResult'):
if result['quotedRefResult']['result']['__typename'] == 'TweetTombstone':
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
kwargs['quotedTweet'] = self._graphql_timeline_tweet_item_result_to_tweet(result['quotedRefResult']['result'], tweetId = int(tweet['quoted_status_id_str']))
else:
if result['quotedRefResult']['result']['__typename'] != 'Tweet':
_logger.warning(f'Unknown quotedRefResult type {result["quotedRefResult"]["result"]["__typename"]!r} on tweet {self._get_tweet_id(tweet)}, using TweetRef')
kwargs['quotedTweet'] = TweetRef(id = int(result['quotedRefResult']['result']['rest_id']))
elif 'quoted_status_id_str' in tweet:
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
# Omit the TweetRef if this is a retweet and the quoted tweet ID matches the tweet quoted in the retweeted tweet.
if tweet['quoted_status_id_str'] != tweet.get('retweeted_status_result', {}).get('result', {}).get('quoted_status_result', {}).get('result', {}).get('rest_id'):
kwargs['quotedTweet'] = TweetRef(id = int(tweet['quoted_status_id_str']))
if 'card' in result:
kwargs['card'] = self._make_card(result['card'], _TwitterAPIType.GRAPHQL, self._get_tweet_id(tweet))
if 'views' in result and 'count' in result['views']:
@@ -1392,17 +1420,21 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
continue
for entry in instruction['entries']:
if entry['entryId'].startswith('tweet-'):
tweetId = int(entry['entryId'].split('-', 1)[1])
if entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet':
if 'result' not in entry['content']['itemContent']['tweet_results']:
_logger.warning(f'Skipping empty tweet entry {entry["entryId"]}')
continue
yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'])
yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'], tweetId = tweetId)
else:
_logger.warning('Got unrecognised timeline tweet item(s)')
elif includeConversationThreads and entry['entryId'].startswith('conversationthread-'): #TODO show more cursor?
for item in entry['content']['items']:
if item['entryId'].startswith(f'{entry["entryId"]}-tweet-'):
yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result'])
tweetId = int(item['entryId'][len(entry['entryId']) + 7:])
yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result'], tweetId = tweetId)
elif not entry['entryId'].startswith('cursor-'):
_logger.warning(f'Skipping unrecognised entry ID: {entry["entryId"]!r}')
def _render_text_with_urls(self, text, urls):
if not urls:
@@ -1707,7 +1739,8 @@ class TwitterProfileScraper(TwitterUserScraper):
for instruction in instructions:
if instruction['type'] == 'TimelinePinEntry':
gotPinned = True
yield self._graphql_timeline_tweet_item_result_to_tweet(instruction['entry']['content']['itemContent']['tweet_results']['result'])
tweetId = int(instruction['entry']['entryId'][6:]) if instruction['entry']['entryId'].startswith('tweet-') else None
yield self._graphql_timeline_tweet_item_result_to_tweet(instruction['entry']['content']['itemContent']['tweet_results']['result'], tweetId = tweetId)
yield from self._graphql_timeline_instructions_to_tweets(instructions)
@@ -1802,7 +1835,7 @@ class TwitterTweetScraper(_TwitterAPIScraper):
continue
for entry in instruction['entries']:
if entry['entryId'] == f'tweet-{self._tweetId}' and entry['content']['entryType'] == 'TimelineTimelineItem' and entry['content']['itemContent']['itemType'] == 'TimelineTweet':
yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'])
yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'], tweetId = self._tweetId)
break
elif self._mode is TwitterTweetScraperMode.SCROLL:
for obj in self._iter_api_data(url, _TwitterAPIType.GRAPHQL, params, paginationParams, direction = _ScrollDirection.BOTH):