Files
snscrape/snscrape/modules/twitter.py
JustAnotherArchivist dd25fd0526 Add support for extracting the entity behind a scrape
Closes #11

Backwards incompatibility: snscrape.modules.twitter.Account is now called User. However, this was previously only used on the list member scraper, which has been broken for a while since the list member list is no longer publicly accessible.

For compatibility reasons, the CLI does not output the entity by default; the new option --with-entity enables it.
2020-08-24 01:38:27 +00:00

464 lines
17 KiB
Python

import bs4
import datetime
import email.utils
import itertools
import json
import random
import logging
import re
import snscrape.base
import time
import typing
import urllib.parse
logger = logging.getLogger(__name__)
_API_AUTHORIZATION_HEADER = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
class Tweet(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
id: int
username: str
outlinks: list
outlinksss: str
tcooutlinks: list
tcooutlinksss: str
def __str__(self):
return self.url
class DescriptionURL(typing.NamedTuple):
text: str
url: str
tcourl: str
indices: typing.Tuple[int, int]
class User(typing.NamedTuple, snscrape.base.Item, snscrape.base.Entity):
# This is both an Item and an Entity since it can be returned as TwitterUserScraper's entity as well as TwitterListMembersScraper's items.
# Most fields can be None if they're not known.
username: str
description: typing.Optional[str] = None # Description as it's displayed on the web interface with URLs replaced
rawDescription: typing.Optional[str] = None # Raw description with the URL(s) intact
descriptionUrls: typing.Optional[typing.List[DescriptionURL]] = None
verified: typing.Optional[bool] = None
created: typing.Optional[datetime.datetime] = None
followersCount: typing.Optional[int] = None
friendsCount: typing.Optional[int] = None
statusesCount: typing.Optional[int] = None
linkUrl: typing.Optional[str] = None
linkTcourl: typing.Optional[str] = None
profileImageUrl: typing.Optional[str] = None
profileBannerUrl: typing.Optional[str] = None
@property
def url(self):
return f'https://twitter.com/{self.username}'
def __str__(self):
return self.url
class TwitterCommonScraper(snscrape.base.Scraper):
def _feed_to_items(self, feed):
for tweet in feed:
username = tweet.find('span', 'username').find('b').text
tweetID = tweet['data-item-id']
url = f'https://twitter.com/{username}/status/{tweetID}'
date = None
timestampA = tweet.find('a', 'tweet-timestamp')
if timestampA:
timestampSpan = timestampA.find('span', '_timestamp')
if timestampSpan and timestampSpan.has_attr('data-time'):
date = datetime.datetime.fromtimestamp(int(timestampSpan['data-time']), datetime.timezone.utc)
if not date:
logger.warning(f'Failed to extract date for {url}')
contentP = tweet.find('p', 'tweet-text')
content = None
outlinks = []
tcooutlinks = []
if contentP:
content = contentP.text
for a in contentP.find_all('a'):
if a.has_attr('href') and not a['href'].startswith('/') and (not a.has_attr('class') or 'u-hidden' not in a['class']):
if a.has_attr('data-expanded-url'):
outlinks.append(a['data-expanded-url'])
else:
logger.warning(f'Ignoring link without expanded URL on {url}: {a["href"]}')
tcooutlinks.append(a['href'])
else:
logger.warning(f'Failed to extract content for {url}')
card = tweet.find('div', 'card2')
if card and 'has-autoplayable-media' not in card['class']:
for div in card.find_all('div'):
if div.has_attr('data-card-url'):
outlinks.append(div['data-card-url'])
tcooutlinks.append(div['data-card-url'])
outlinks = list(dict.fromkeys(outlinks)) # Deduplicate in case the same link was shared more than once within this tweet; may change order on Python 3.6 or older
tcooutlinks = list(dict.fromkeys(tcooutlinks))
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
def _check_json_callback(self, r):
if r.headers.get('content-type') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
return True, None
class TwitterSearchScraper(TwitterCommonScraper):
name = 'twitter-search'
def __init__(self, query, cursor = None, **kwargs):
super().__init__(**kwargs)
self._query = query
self._cursor = cursor
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
self._guestToken = None
def _get_guest_token(self, url):
logger.info(f'Retrieving guest token')
gt = None
r = self._get(url, headers = {'User-Agent': self._userAgent})
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)
if match:
logger.debug('Found guest token in HTML')
gt = match.group(1)
if 'gt' in r.cookies:
logger.debug('Found guest token in cookies')
gt = r.cookies['gt']
if gt:
self._session.cookies.set('gt', gt, domain = '.twitter.com', path = '/', secure = True, expires = time.time() + 10800)
return gt
raise snscrape.base.ScraperException('Unable to find guest token')
def _check_scroll_response(self, r):
if r.status_code == 429:
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
return True, None
if r.headers.get('content-type').replace(' ', '') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
if r.status_code != 200:
return False, f'non-200 status code'
return True, None
def get_items(self):
headers = {
'User-Agent': self._userAgent,
'Authorization': _API_AUTHORIZATION_HEADER,
'Referer': self._baseUrl,
}
if self._guestToken:
headers['x-guest-token'] = self._guestToken
cursor = self._cursor
while True:
if not self._guestToken:
self._guestToken = self._get_guest_token(self._baseUrl)
headers['x-guest-token'] = self._guestToken
logger.info(f'Retrieving scroll page {cursor}')
params = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
'include_blocked_by': '1',
'include_followed_by': '1',
'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_ext_alt_text': 'true',
'include_quote_count': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweets': 'true',
'q': self._query,
'tweet_search_mode': 'live',
'count': '100',
'query_source': 'spelling_expansion_revert_click',
}
if cursor:
params['cursor'] = cursor
params['pc'] = '1'
params['spelling_corrections'] = '1'
params['ext'] = 'ext=mediaStats%2ChighlightedLabel'
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
if r.status_code == 429:
self._guestToken = None
del self._session.cookies['gt']
del headers['x-guest-token']
continue
try:
obj = r.json()
except json.JSONDecodeError as e:
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
# No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
entries = instruction['addEntries']['entries']
elif 'replaceEntry' in instruction:
entries = [instruction['replaceEntry']['entry']]
else:
continue
for entry in entries:
if entry['entryId'].startswith('sq-I-t-'):
if 'tweet' in entry['content']['item']['content']:
if 'promotedMetadata' in entry['content']['item']['content']['tweet']: # Promoted tweet aka ads
continue
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}')
tweetID = tweet['id']
content = tweet['full_text']
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
url = f'https://twitter.com/{username}/status/{tweetID}'
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
elif entry['entryId'] == 'sq-cursor-bottom':
newCursor = entry['content']['operation']['cursor']['value']
if not newCursor or newCursor == cursor:
# End of pagination
break
cursor = newCursor
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('--cursor', metavar = 'CURSOR')
subparser.add_argument('query', help = 'A Twitter search string')
@classmethod
def from_args(cls, args):
return cls(args.query, cursor = args.cursor, retries = args.retries)
class TwitterUserScraper(TwitterSearchScraper):
name = 'twitter-user'
def __init__(self, username, **kwargs):
super().__init__(f'from:{username}', **kwargs)
self._username = username
def get_entity(self):
while True:
if not self._guestToken:
self._guestToken = self._get_guest_token(f'https://twitter.com/{self._username}')
params = {'variables': json.dumps({'screen_name': self._username, 'withHighlightedLabel': True}, separators = (',', ':'))}
r = self._get(f'https://api.twitter.com/graphql/-xfUfZsnR_zqjFd-IfrN5A/UserByScreenName',
params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote),
headers = {'User-Agent': self._userAgent, 'Authorization': _API_AUTHORIZATION_HEADER, 'Referer': 'https://twitter.com/', 'x-guest-token': self._guestToken},
responseOkCallback = self._check_scroll_response,
)
if r.status_code == 429:
self._guestToken = None
del self._session.cookies['gt']
continue
elif r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
try:
obj = r.json()
except json.JSONDecodeError as e:
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
user = obj['data']['user']
rawDescription = user['legacy']['description']
if user['legacy']['entities']['description']['urls']:
description = []
description.append(rawDescription[:user['legacy']['entities']['description']['urls'][0]['indices'][0]])
urls = sorted(user['legacy']['entities']['description']['urls'], key = lambda x: x['indices'][0])
for url, nextUrl in itertools.zip_longest(urls, urls[1:]):
description.append(url['display_url'])
description.append(rawDescription[url['indices'][1] : nextUrl['indices'][0] if nextUrl is not None else None])
description = ''.join(description)
else:
description = rawDescription
return User(
username = user['legacy']['screen_name'],
description = description,
rawDescription = rawDescription,
descriptionUrls = [{'text': x['display_url'], 'url': x['expanded_url'], 'tcourl': x['url'], 'indices': tuple(x['indices'])} for x in user['legacy']['entities']['description']['urls']],
verified = user['legacy']['verified'],
created = email.utils.parsedate_to_datetime(user['legacy']['created_at']),
followersCount = user['legacy']['followers_count'],
friendsCount = user['legacy']['friends_count'],
statusesCount = user['legacy']['statuses_count'],
linkUrl = user['legacy']['entities']['url']['urls'][0]['expanded_url'] if 'url' in user['legacy']['entities'] else None,
linkTcourl = user['legacy'].get('url'),
profileImageUrl = user['legacy']['profile_image_url_https'],
profileBannerUrl = user['legacy'].get('profile_banner_url'),
)
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A Twitter username (without @)')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)
class TwitterHashtagScraper(TwitterSearchScraper):
name = 'twitter-hashtag'
def __init__(self, hashtag, **kwargs):
super().__init__(f'#{hashtag}', **kwargs)
self._hashtag = hashtag
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('hashtag', help = 'A Twitter hashtag (without #)')
@classmethod
def from_args(cls, args):
return cls(args.hashtag, retries = args.retries)
class TwitterThreadScraper(TwitterCommonScraper):
name = 'twitter-thread'
def __init__(self, tweetID = None, **kwargs):
if tweetID is not None and tweetID.strip('0123456789') != '':
raise ValueError('Invalid tweet ID, must be numeric')
super().__init__(**kwargs)
self._tweetID = tweetID
def get_items(self):
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18 Bot'}
# Fetch the page of the last tweet in the thread
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
soup = bs4.BeautifulSoup(r.text, 'lxml')
# Extract tweets on that page in the correct order; first, the tweet that was supplied, then the ancestors with pagination if necessary
tweet = soup.find('div', 'ThreadedConversation--permalinkTweetWithAncestors')
if tweet:
tweet = tweet.find('div', 'tweet')
if not tweet:
logger.warning('Tweet does not exist, is not a thread, or does not have ancestors')
return
items = list(self._feed_to_items([tweet]))
assert len(items) == 1
yield items[0]
username = items[0].username
ancestors = soup.find('div', 'ThreadedConversation--ancestors')
if not ancestors:
logger.warning('Tweet does not have ancestors despite claiming to')
return
feed = reversed(ancestors.find_all('li', 'js-stream-item'))
yield from self._feed_to_items(feed)
# If necessary, iterate through pagination until reaching the initial tweet
streamContainer = ancestors.find('div', 'stream-container')
if not streamContainer.has_attr('data-max-position') or streamContainer['data-max-position'] == '':
return
minPosition = streamContainer['data-max-position']
while True:
r = self._get(
f'https://twitter.com/i/{username}/conversation/{self._tweetID}?include_available_features=1&include_entities=1&min_position={minPosition}',
headers = headers,
responseOkCallback = self._check_json_callback
)
obj = json.loads(r.text)
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
feed = reversed(soup.find_all('li', 'js-stream-item'))
yield from self._feed_to_items(feed)
if not obj['has_more_items']:
break
minPosition = obj['max_position']
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('tweetID', help = 'A tweet ID of the last tweet in a thread')
@classmethod
def from_args(cls, args):
return cls(tweetID = args.tweetID, retries = args.retries)
class TwitterListPostsScraper(TwitterSearchScraper):
name = 'twitter-list-posts'
def __init__(self, listName, **kwargs):
super().__init__(f'list:{listName}', **kwargs)
self._listName = listName
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
@classmethod
def from_args(cls, args):
return cls(args.list, retries = args.retries)
class TwitterListMembersScraper(TwitterCommonScraper):
name = 'twitter-list-members'
def __init__(self, listName, **kwargs):
super().__init__(**kwargs)
self._user, self._list = listName.split('/')
def get_items(self):
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
r = self._get(baseUrl, headers = headers)
if r.status_code != 200:
logger.warning('List not found')
return
soup = bs4.BeautifulSoup(r.text, 'lxml')
container = soup.find('div', 'stream-container')
if not container:
raise snscrape.base.ScraperException('Unable to find container')
items = container.find_all('li', 'js-stream-item')
if not items:
logger.warning('Empty list')
return
for item in items:
yield User(username = item.find('div', 'account')['data-screen-name'])
if not container.has_attr('data-min-position') or container['data-min-position'] == '':
return
maxPosition = container['data-min-position']
while True:
r = self._get(
f'{baseUrl}/timeline?include_available_features=1&include_entities=1&max_position={maxPosition}&reset_error_state=false',
headers = headers,
responseOkCallback = self._check_json_callback
)
obj = json.loads(r.text)
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
items = soup.find_all('li', 'js-stream-item')
for item in items:
yield User(username = item.find('div', 'account')['data-screen-name'])
if not obj['has_more_items']:
break
maxPosition = obj['min_position']
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
@classmethod
def from_args(cls, args):
return cls(args.list, retries = args.retries)