Add support for scraping Twitter users by ID

Closes #222
This commit is contained in:
JustAnotherArchivist
2021-05-22 21:17:14 +00:00
parent 460be9d581
commit f18b64e7da

View File

@@ -434,7 +434,7 @@ class TwitterSearchScraper(TwitterAPIScraper):
def __init__(self, query, cursor = None, **kwargs):
super().__init__(baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': query, 'src': 'spelling_expansion_revert_click'}), **kwargs)
self._query = query
self._query = query # Note: may get replaced by subclasses when using user ID resolution
self._cursor = cursor
def _check_scroll_response(self, r):
@@ -498,16 +498,24 @@ class TwitterSearchScraper(TwitterAPIScraper):
class TwitterUserScraper(TwitterSearchScraper):
name = 'twitter-user'
def __init__(self, username, **kwargs):
def __init__(self, username, isUserId, **kwargs):
if not self.is_valid_username(username):
raise ValueError('Invalid username')
super().__init__(f'from:{username}', **kwargs)
self._username = username
self._isUserId = isUserId
self._baseUrl = f'https://twitter.com/{self._username}' if not self._isUserId else f'https://twitter.com/i/user/{self._username}'
def _get_entity(self):
self._ensure_guest_token(f'https://twitter.com/{self._username}')
params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))}
obj = self._get_api_data('https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote))
self._ensure_guest_token()
if not self._isUserId:
fieldName = 'screen_name'
endpoint = 'https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName'
else:
fieldName = 'userId'
endpoint = 'https://twitter.com/i/api/graphql/WN6Hck-Pwm-YP0uxVj1oMQ/UserByRestIdWithoutResults'
params = {'variables': json.dumps({fieldName: self._username, 'withHighlightedLabel': True}, separators = (',', ':'))}
obj = self._get_api_data(endpoint, params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote))
if not obj['data']:
return None
user = obj['data']['user']
@@ -536,9 +544,17 @@ class TwitterUserScraper(TwitterSearchScraper):
profileBannerUrl = user['legacy'].get('profile_banner_url'),
)
def get_items(self):
if self._isUserId:
# Resolve user ID to username
self._username = self.entity.username
self._isUserId = False
self._query = f'from:{self._username}'
yield from super().get_items()
@staticmethod
def is_valid_username(s):
return 1 <= len(s) <= 15 and s.strip(string.ascii_letters + string.digits + '_') == ''
return (1 <= len(s) <= 15 and s.strip(string.ascii_letters + string.digits + '_') == '') or (s and s.strip(string.digits) == '')
@classmethod
def setup_parser(cls, subparser):
@@ -547,22 +563,22 @@ class TwitterUserScraper(TwitterSearchScraper):
return s
raise ValueError('Invalid username')
subparser.add_argument('--user-id', dest = 'isUserId', action = 'store_true', default = False, help = 'Use user ID instead of username')
subparser.add_argument('username', type = username, help = 'A Twitter username (without @)')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)
return cls(args.username, args.isUserId, retries = args.retries)
class TwitterProfileScraper(TwitterUserScraper):
name = 'twitter-profile'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._baseUrl = f'https://twitter.com/{self._username}'
def get_items(self):
user = self.entity
if not self._isUserId:
userId = self.entity.id
else:
userId = self._username
params = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
@@ -586,7 +602,7 @@ class TwitterProfileScraper(TwitterUserScraper):
'send_error_codes': 'true',
'simple_quoted_tweets': 'true',
'include_tweet_replies': 'true',
'userId': user.id,
'userId': userId,
'count': '100',
}
paginationParams = params.copy()
@@ -594,7 +610,7 @@ class TwitterProfileScraper(TwitterUserScraper):
for d in (params, paginationParams):
d['ext'] = 'ext=mediaStats%2ChighlightedLabel'
for obj in self._iter_api_data(f'https://api.twitter.com/2/timeline/profile/{user.id}.json', params, paginationParams):
for obj in self._iter_api_data(f'https://api.twitter.com/2/timeline/profile/{userId}.json', params, paginationParams):
yield from self._instructions_to_tweets(obj)