Add support for scraping Twitter Communities

Closes #614
This commit is contained in:
JustAnotherArchivist
2023-02-21 20:15:57 +00:00
parent 82f64a6472
commit 57b126c656

View File

@@ -14,6 +14,7 @@ __all__ = [
]
import base64
import collections
import copy
import dataclasses
@@ -515,6 +516,27 @@ class UserLabel:
@dataclasses.dataclass
class UserRef:
id: int
text: typing.Optional[str] = None
textLinks: typing.Optional[typing.List[TextLink]] = None
def __str__(self):
return f'https://twitter.com/i/user/{self.id}'
@dataclasses.dataclass
class Community(snscrape.base.Entity):
id: int
name: str
description: str
created: datetime.datetime
admin: typing.Union[User, UserRef]
creator: typing.Union[User, UserRef]
membersFacepile: typing.List[typing.Union[User, UserRef]]
moderatorsCount: int
membersCount: int
rules: typing.List[str]
theme: str
bannerUrl: str
@dataclasses.dataclass
@@ -1428,6 +1450,14 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
yield self._graphql_timeline_tweet_item_result_to_tweet(entry['content']['itemContent']['tweet_results']['result'], tweetId = tweetId)
else:
_logger.warning('Got unrecognised timeline tweet item(s)')
elif entry['entryId'].startswith('homeConversation-'):
if entry['content']['entryType'] == 'TimelineTimelineModule':
for item in entry['content']['items']:
if not item['entryId'].startswith('homeConversation-') or '-tweet-' not in item['entryId']:
raise snscrape.base.ScraperException(f'Unexpected home conversation entry ID: {item["entryId"]!r}')
tweetId = int(item['entryId'].split('-tweet-', 1)[1])
if item['item']['itemContent']['itemType'] == 'TimelineTweet':
yield self._graphql_timeline_tweet_item_result_to_tweet(item['item']['itemContent']['tweet_results']['result'], tweetId = tweetId)
elif includeConversationThreads and entry['entryId'].startswith('conversationthread-'): #TODO show more cursor?
for item in entry['content']['items']:
if item['entryId'].startswith(f'{entry["entryId"]}-tweet-'):
@@ -1497,6 +1527,30 @@ class _TwitterAPIScraper(snscrape.base.Scraper):
labelKwargs['longDescription'] = label['longDescription']['text']
return UserLabel(**labelKwargs)
def _graphql_user_results_to_user_ref(self, obj):
if 'id' not in obj:
return None
if isinstance(obj['id'], int):
userId = obj['id']
elif obj['id'].startswith('VXNlclJlc3VsdHM6'):
# UserResults:<userid> in base64
try:
userId = base64.b64decode(obj['id'])
except ValueError:
return None
assert userId.startswith(b'UserResults:')
userId = int(userId.split(b':', 1)[1])
kwargs = {}
if 'result' in obj and obj['result']['__typename'] == 'UserUnavailable' and 'unavailable_message' in obj['result']:
kwargs['text'] = obj['result']['unavailable_message']['text']
kwargs['textLinks'] = [TextLink(text = kwargs['text'][x['fromIndex']:x['toIndex']], url = x['ref']['url'], tcourl = None, indices = (x['fromIndex'], x['toIndex'])) for x in obj['result']['unavailable_message']['entities']]
return UserRef(id = userId, **kwargs)
def _graphql_user_results_to_user(self, results):
if 'result' not in results or results['result']['__typename'] == 'UserUnavailable':
return self._graphql_user_results_to_user_ref(results)
return self._user_to_user(results['result']['legacy'], id_ = int(results['result']['rest_id']))
@classmethod
def _cli_construct(cls, argparseArgs, *args, **kwargs):
kwargs['guestTokenManager'] = _CLIGuestTokenManager()
@@ -1880,6 +1934,102 @@ class TwitterListPostsScraper(TwitterSearchScraper):
return cls._cli_construct(args, args.list)
class TwitterCommunityScraper(_TwitterAPIScraper):
name = 'twitter-community'
def __init__(self, communityId, **kwargs):
self._communityId = communityId
super().__init__(f'https://twitter.com/i/communities/{self._communityId}', **kwargs)
def _get_entity(self):
self._ensure_guest_token()
params = {
'variables': {
'communityId': str(self._communityId),
'withDmMuting': False,
'withSafetyModeUserFields': False,
'withSuperFollowsUserFields': True,
},
'features': {
'responsive_web_graphql_exclude_directive_enabled': False,
'responsive_web_graphql_skip_user_profile_image_extensions_enabled': False,
'responsive_web_graphql_timeline_navigation_enabled': True,
'responsive_web_twitter_blue_verified_badge_is_enabled': True,
'verified_phone_label_enabled': False,
},
}
obj = self._get_api_data('https://api.twitter.com/graphql/MO8cE7aTvaenXJX_teUGcA/CommunitiesFetchOneQuery', _TwitterAPIType.GRAPHQL, params = params)
if not obj['data'] or 'result' not in obj['data']['communityResults'] or obj['data']['communityResults']['result']['__typename'] == 'CommunityUnavailable':
_logger.warning('Empty response or unavailable community')
return None
community = obj['data']['communityResults']['result']
return Community(
id = int(community['id_str']),
name = community['name'],
description = community['description'],
created = datetime.datetime.fromtimestamp(community['created_at'] / 1000, tz = datetime.timezone.utc),
admin = self._graphql_user_results_to_user(community['admin_results']),
creator = self._graphql_user_results_to_user(community['creator_results']),
membersFacepile = [self._graphql_user_results_to_user(m) for m in community['members_facepile_results']],
moderatorsCount = community['moderator_count'],
membersCount = community['member_count'],
rules = [r['name'] for r in community['rules']],
theme = community.get('custom_theme', community['default_theme']),
bannerUrl = community.get('custom_banner_media', community['default_banner_media'])['media_info']['original_img_url'],
)
def get_items(self):
paginationVariables = {
'count': 20,
'cursor': None,
'communityId': str(self._communityId),
'withCommunity': True,
'withSuperFollowsUserFields': True,
'withDownvotePerspective': False,
'withReactionsMetadata': False,
'withReactionsPerspective': False,
'withSuperFollowsTweetFields': True,
}
variables = paginationVariables.copy()
del variables['count'], variables['cursor']
features = {
'responsive_web_twitter_blue_verified_badge_is_enabled': True,
'responsive_web_graphql_exclude_directive_enabled': False,
'verified_phone_label_enabled': False,
'responsive_web_graphql_timeline_navigation_enabled': True,
'responsive_web_graphql_skip_user_profile_image_extensions_enabled': False,
'tweetypie_unmention_optimization_enabled': True,
'vibe_api_enabled': True,
'responsive_web_edit_tweet_api_enabled': True,
'graphql_is_translatable_rweb_tweet_is_translatable_enabled': True,
'view_counts_everywhere_api_enabled': True,
'longform_notetweets_consumption_enabled': True,
'tweet_awards_web_tipping_enabled': False,
'freedom_of_speech_not_reach_fetch_enabled': False,
'standardized_nudges_misinfo': True,
'tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled': False,
'interactive_text_enabled': True,
'responsive_web_text_conversations_enabled': False,
'responsive_web_enhance_cards_enabled': False,
}
params = {'variables': variables, 'features': features}
paginationParams = {'variables': paginationVariables, 'features': features}
for obj in self._iter_api_data('https://api.twitter.com/graphql/Qvst9FkHq45wuqicCvMpVw/CommunityTweetsTimeline', _TwitterAPIType.GRAPHQL, params, paginationParams):
if obj['data']['communityResults']['result']['__typename'] == 'CommunityUnavailable':
_logger.warning('Community unavailable')
break
yield from self._graphql_timeline_instructions_to_tweets(obj['data']['communityResults']['result']['community_timeline']['timeline']['instructions'])
@classmethod
def _cli_setup_parser(cls, subparser):
subparser.add_argument('communityId', type = int, help = 'A community ID')
@classmethod
def _cli_from_args(cls, args):
return cls._cli_construct(args, args.communityId)
class TwitterTrendsScraper(_TwitterAPIScraper):
name = 'twitter-trends'