From 1a09f9b9a34c7976d2be5cc346e0475e2902c2ae Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 24 Sep 2020 01:45:08 +0000 Subject: [PATCH] Extract more information from Twitter Including: reply/retweet/like/quote counts, media (photos, videos, and GIFs), full user object, quoted tweets, mentioned users, rendered content, conversation ID, language, source --- snscrape/modules/twitter.py | 148 ++++++++++++++++++++++++++++++------ 1 file changed, 126 insertions(+), 22 deletions(-) diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index a3abc25..b0b8930 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -21,18 +21,59 @@ class Tweet(typing.NamedTuple, snscrape.base.Item): url: str date: datetime.datetime content: str + renderedContent: str id: int username: str + user: 'User' outlinks: list outlinksss: str tcooutlinks: list tcooutlinksss: str + replyCount: int + retweetCount: int + likeCount: int + quoteCount: int + conversationId: int + lang: str + source: str + media: typing.Optional[typing.List['Medium']] = None retweetedTweet: typing.Optional['Tweet'] = None + quotedTweet: typing.Optional['Tweet'] = None + mentionedUsers: typing.Optional[typing.List['User']] = None def __str__(self): return self.url +class Medium: + pass + + +class Photo(typing.NamedTuple, Medium): + previewUrl: str + fullUrl: str + type: str = 'photo' + + +class VideoVariant(typing.NamedTuple): + contentType: str + url: str + bitrate: typing.Optional[int] + + +class Video(typing.NamedTuple, Medium): + thumbnailUrl: str + variants: typing.List[VideoVariant] + duration: float + type: str = 'video' + + +class Gif(typing.NamedTuple, Medium): + thumbnailUrl: str + variants: typing.List[VideoVariant] + type: str = 'gif' + + class DescriptionURL(typing.NamedTuple): text: str url: str @@ -248,18 +289,90 @@ class TwitterAPIScraper(snscrape.base.Scraper): def _tweet_to_tweet(self, tweet, obj): # Transforms a Twitter API tweet object into a Tweet - tweetID = tweet['id'] if 'id' in tweet else int(tweet['id_str']) - content = tweet['full_text'] - username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name'] - date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc) - outlinks = [u['expanded_url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else [] - tcooutlinks = [u['url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else [] - url = f'https://twitter.com/{username}/status/{tweetID}' - if 'retweeted_status_id_str' in tweet: - retweetedTweet = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj) - else: - retweetedTweet = None - return Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks), retweetedTweet = retweetedTweet) + kwargs = {} + kwargs['id'] = tweet['id'] if 'id' in tweet else int(tweet['id_str']) + kwargs['content'] = tweet['full_text'] + kwargs['renderedContent'] = self._render_text_with_urls(tweet['full_text'], tweet['entities']['urls']) + kwargs['username'] = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name'] + kwargs['user'] = self._user_to_user(obj['globalObjects']['users'][tweet['user_id_str']]) + kwargs['date'] = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc) + kwargs['outlinks'] = [u['expanded_url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else [] + kwargs['outlinksss'] = ' '.join(kwargs['outlinks']) + kwargs['tcooutlinks'] = [u['url'] for u in tweet['entities']['urls']] if 'urls' in tweet['entities'] else [] + kwargs['tcooutlinksss'] = ' '.join(kwargs['tcooutlinks']) + kwargs['url'] = f'https://twitter.com/{kwargs["username"]}/status/{kwargs["id"]}' + kwargs['replyCount'] = tweet['reply_count'] + kwargs['retweetCount'] = tweet['retweet_count'] + kwargs['likeCount'] = tweet['favorite_count'] + kwargs['quoteCount'] = tweet['quote_count'] + kwargs['conversationId'] = tweet['conversation_id'] if 'conversation_id' in tweet else int(tweet['conversation_id_str']) + kwargs['lang'] = tweet['lang'] + kwargs['source'] = tweet['source'] + if 'extended_entities' in tweet and 'media' in tweet['extended_entities']: + media = [] + for medium in tweet['extended_entities']['media']: + if medium['type'] == 'photo': + if '.' not in medium['media_url_https']: + logger.warning(f'Skipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot') + continue + baseUrl, format = medium['media_url_https'].rsplit('.', 1) + if format not in ('jpg', 'png'): + logger.warning(f'Skipping photo with unknown format on tweet {kwargs["id"]}: {format!r}') + continue + media.append(Photo( + previewUrl = f'{baseUrl}?format={format}&name=small', + fullUrl = f'{baseUrl}?format={format}&name=large', + )) + elif medium['type'] == 'video' or medium['type'] == 'animated_gif': + variants = [] + for variant in medium['video_info']['variants']: + variants.append(VideoVariant(contentType = variant['content_type'], url = variant['url'], bitrate = variant.get('bitrate') or None)) + mKwargs = { + 'thumbnailUrl': medium['media_url_https'], + 'variants': variants, + } + if medium['type'] == 'video': + mKwargs['duration'] = medium['video_info']['duration_millis'] / 1000 + cls = Video + elif medium['type'] == 'animated_gif': + cls = Gif + media.append(cls(**mKwargs)) + if media: + kwargs['media'] = media + kwargs['retweetedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['retweeted_status_id_str']], obj) if 'retweeted_status_id_str' in tweet else None + kwargs['quotedTweet'] = self._tweet_to_tweet(obj['globalObjects']['tweets'][tweet['quoted_status_id_str']], obj) if 'quoted_status_id_str' in tweet else None + kwargs['mentionedUsers'] = [User(username = u['screen_name'], id = u['id'] if 'id' in u else int(u['id_str'])) for u in tweet['entities']['user_mentions']] if tweet['entities']['user_mentions'] else None + return Tweet(**kwargs) + + def _render_text_with_urls(self, text, urls): + if not urls: + return text + out = [] + out.append(text[:urls[0]['indices'][0]]) + urlsSorted = sorted(urls, key = lambda x: x['indices'][0]) # Ensure that they're in left to right appearance order + assert all(url['indices'][1] <= nextUrl['indices'][0] for url, nextUrl in zip(urls, urls[1:])), 'broken URL indices' + for url, nextUrl in itertools.zip_longest(urls, urls[1:]): + out.append(url['display_url']) + out.append(text[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None]) + return ''.join(out) + + def _user_to_user(self, user): + kwargs = {} + kwargs['username'] = user['screen_name'] + kwargs['id'] = user['id'] if 'id' in user else int(user['id_str']) + kwargs['description'] = self._render_text_with_urls(user['description'], user['entities']['description']['urls']) + kwargs['rawDescription'] = user['description'] + kwargs['descriptionUrls'] = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['entities']['description']['urls']], + kwargs['verified'] = user['verified'] + kwargs['created'] = email.utils.parsedate_to_datetime(user['created_at']) + kwargs['followersCount'] = user['followers_count'] + kwargs['friendsCount'] = user['friends_count'] + kwargs['statusesCount'] = user['statuses_count'] + kwargs['linkUrl'] = user['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['entities'] else None + kwargs['linkTcourl'] = user.get('url') + kwargs['profileImageUrl'] = user['profile_image_url_https'] + kwargs['profileBannerUrl'] = user['profile_banner_url'] + return User(**kwargs) class TwitterSearchScraper(TwitterAPIScraper): @@ -343,16 +456,7 @@ class TwitterUserScraper(TwitterSearchScraper): obj = self._get_api_data('https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName', params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)) user = obj['data']['user'] rawDescription = user['legacy']['description'] - if user['legacy']['entities']['description']['urls']: - description = [] - description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]]) - urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0]) - for url, nextUrl in itertools.zip_longest(urls, urls[1:]): - description.append(url['display_url']) - description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None]) - description = ''.join(description) - else: - description = rawDescription + description = self._render_text_with_urls(rawDescription, user['legacy']['entities']['description']['urls']) return User( username = user['legacy']['screen_name'], id = user['rest_id'],