Extract more information from Twitter

Including: reply/retweet/like/quote counts, media (photos, videos, and GIFs), full user object, quoted tweets, mentioned users, rendered content, conversation ID, language, source
This commit is contained in:
JustAnotherArchivist
2020-09-24 01:45:08 +00:00
parent 5ae5ec7bcd
commit 1a09f9b9a3

View File

@@ -21,18 +21,59 @@ class Tweet(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
renderedContent: str
id: int
username: str
user: 'User'
outlinks: list
outlinksss: str
tcooutlinks: list
tcooutlinksss: str
replyCount: int
retweetCount: int
likeCount: int
quoteCount: int
conversationId: int
lang: str
source: str
media: typing.Optional[typing.List['Medium']] = None
retweetedTweet: typing.Optional['Tweet'] = None
quotedTweet: typing.Optional['Tweet'] = None
mentionedUsers: typing.Optional[typing.List['User']] = None
def __str__(self):
return self.url
class Medium:
pass
class Photo(typing.NamedTuple, Medium):
previewUrl: str
fullUrl: str
type: str = 'photo'
class VideoVariant(typing.NamedTuple):
contentType: str
url: str
bitrate: typing.Optional[int]
class Video(typing.NamedTuple, Medium):
thumbnailUrl: str
variants: typing.List[VideoVariant]
duration: float
type: str = 'video'
class Gif(typing.NamedTuple, Medium):
thumbnailUrl: str
variants: typing.List[VideoVariant]
type: str = 'gif'
class DescriptionURL(typing.NamedTuple):
text: str
url: str
@@ -248,18 +289,90 @@ class TwitterAPIScraper(snscrape.base.Scraper):
def _tweet_to_tweet(self, tweet, obj):
# Transforms a Twitter API tweet object into a Tweet
tweetID = tweet['id'] if 'id' in tweet else int(tweet['id_str'])
content = tweet['full_text']
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else []
tcooutlinks = [u['url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else []
url = f'https://twitter.com/{username}/status/{tweetID}'
if 'retweeted_status_id_str' in tweet:
retweetedTweet = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj)
else:
retweetedTweet = None
return Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks), retweetedTweet = retweetedTweet)
kwargs = {}
kwargs['id'] = tweet['id'] if 'id' in tweet else int(tweet['id_str'])
kwargs['content'] = tweet['full_text']
kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities']['urls'])
kwargs['username'] = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
kwargs['user'] = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']])
kwargs['date'] = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else []
kwargs['outlinksss'] = ' '.join(kwargs['outlinks'])
kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else []
kwargs['tcooutlinksss'] = ' '.join(kwargs['tcooutlinks'])
kwargs['url'] = f'https://twitter.com/{kwargs["username"]}/status/{kwargs["id"]}'
kwargs['replyCount'] = tweet['reply_count']
kwargs['retweetCount'] = tweet['retweet_count']
kwargs['likeCount'] = tweet['favorite_count']
kwargs['quoteCount'] = tweet['quote_count']
kwargs['conversationId'] = tweet['conversation_id'] if 'conversation_id' in tweet else int(tweet['conversation_id_str'])
kwargs['lang'] = tweet['lang']
kwargs['source'] = tweet['source']
if 'extended_entities' in tweet and 'media' in tweet['extended_entities']:
media = []
for medium in tweet['extended_entities']['media']:
if medium['type'] == 'photo':
if '.' not in medium['media_url_https']:
logger.warning(f'Skipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot')
continue
baseUrl, format = medium['media_url_https'].rsplit('.', 1)
if format not in ('jpg', 'png'):
logger.warning(f'Skipping photo with unknown format on tweet {kwargs["id"]}: {format!r}')
continue
media.append(Photo(
previewUrl = f'{baseUrl}?format={format}&name=small',
fullUrl = f'{baseUrl}?format={format}&name=large',
))
elif medium['type'] == 'video' or medium['type'] == 'animated_gif':
variants = []
for variant in medium['video_info']['variants']:
variants.append(VideoVariant(contentType = variant['content_type'], url = variant['url'], bitrate = variant.get('bitrate') or None))
mKwargs = {
'thumbnailUrl': medium['media_url_https'],
'variants': variants,
}
if medium['type'] == 'video':
mKwargs['duration'] = medium['video_info']['duration_millis'] / 1000
cls = Video
elif medium['type'] == 'animated_gif':
cls = Gif
media.append(cls(**mKwargs))
if media:
kwargs['media'] = media
kwargs['retweetedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj) if 'retweeted_status_id_str' in tweet else None
kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj) if 'quoted_status_id_str' in tweet else None
kwargs['mentionedUsers'] = [User(username = u['screen_name'], id = u['id'] if 'id' in u else int(u['id_str'])) for u in tweet['entities']['user_mentions']] if tweet['entities']['user_mentions'] else None
return Tweet(**kwargs)
def _render_text_with_urls(self, text, urls):
if not urls:
return text
out = []
out.append(text[:urls[0]['indices'][0]])
urlsSorted = sorted(urls, key = lambda x: x['indices'][0]) # Ensure that they're in left to right appearance order
assert all(url['indices'][1] <= nextUrl['indices'][0] for url, nextUrl in zip(urls, urls[1:])), 'broken URL indices'
for url, nextUrl in itertools.zip_longest(urls, urls[1:]):
out.append(url['display_url'])
out.append(text[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None])
return ''.join(out)
def _user_to_user(self, user):
kwargs = {}
kwargs['username'] = user['screen_name']
kwargs['id'] = user['id'] if 'id' in user else int(user['id_str'])
kwargs['description'] = self._render_text_with_urls(user['description'], user['entities']['description']['urls'])
kwargs['rawDescription'] = user['description']
kwargs['descriptionUrls'] = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['entities']['description']['urls']],
kwargs['verified'] = user['verified']
kwargs['created'] = email.utils.parsedate_to_datetime(user['created_at'])
kwargs['followersCount'] = user['followers_count']
kwargs['friendsCount'] = user['friends_count']
kwargs['statusesCount'] = user['statuses_count']
kwargs['linkUrl'] = user['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['entities'] else None
kwargs['linkTcourl'] = user.get('url')
kwargs['profileImageUrl'] = user['profile_image_url_https']
kwargs['profileBannerUrl'] = user['profile_banner_url']
return User(**kwargs)
class TwitterSearchScraper(TwitterAPIScraper):
@@ -343,16 +456,7 @@ class TwitterUserScraper(TwitterSearchScraper):
obj = self._get_api_data('https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote))
user = obj['data']['user']
rawDescription = user['legacy']['description']
if user['legacy']['entities']['description']['urls']:
description = []
description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]])
urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0])
for url, nextUrl in itertools.zip_longest(urls, urls[1:]):
description.append(url['display_url'])
description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None])
description = ''.join(description)
else:
description = rawDescription
description = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls'])
return User(
username = user['legacy']['screen_name'],
id = user['rest_id'],