Port TwitterSearchScraper to redesign

Fixes #57
This commit is contained in:
JustAnotherArchivist
2020-03-04 00:33:48 +00:00
parent 82a87b7b5a
commit 613395d1c2

View File

@@ -3,8 +3,10 @@ import datetime
import json import json
import random import random
import logging import logging
import re
import snscrape.base import snscrape.base
import typing import typing
import urllib.parse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -86,82 +88,122 @@ class TwitterCommonScraper(snscrape.base.Scraper):
class TwitterSearchScraper(TwitterCommonScraper): class TwitterSearchScraper(TwitterCommonScraper):
name = 'twitter-search' name = 'twitter-search'
def __init__(self, query, maxPosition = None, **kwargs): def __init__(self, query, cursor = None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self._query = query self._query = query
self._maxPosition = maxPosition self._cursor = cursor
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
def _get_feed_from_html(self, html, withMinPosition): def _get_guest_token(self):
soup = bs4.BeautifulSoup(html, 'lxml') logger.info(f'Retrieving guest token from search page')
feed = soup.find_all('li', 'js-stream-item') r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
if withMinPosition: match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
streamContainer = soup.find('div', 'stream-container') if not match:
if not streamContainer or not streamContainer.has_attr('data-min-position'): raise RuntimeError('Unable to find guest token')
if soup.find('div', 'SearchEmptyTimeline'): return match.group(1)
# No results found
minPosition = None def _check_scroll_response(self, r):
else: if r.status_code == 429:
# Unknown error condition # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
raise RuntimeError('Unable to find min-position') return True, None
else: if r.headers.get('content-type') != 'application/json;charset=utf-8':
minPosition = streamContainer['data-min-position'] return False, f'content type is not JSON'
else: if r.status_code != 200:
minPosition = None return False, f'non-200 status code'
return feed, minPosition return True, None
def get_items(self): def get_items(self):
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'} headers = {
'User-Agent': self._userAgent,
# First page 'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
if self._maxPosition is None: 'Referer': self._baseUrl,
logger.info(f'Retrieving search page for {self._query}') }
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers) guestToken = None
cursor = self._cursor
feed, maxPosition = self._get_feed_from_html(r.text, True)
if not feed:
logger.warning(f'No results for {self._query}')
return
yield from self._feed_to_items(feed)
else:
maxPosition = self._maxPosition
if not maxPosition:
return
while True: while True:
logger.info(f'Retrieving scroll page {maxPosition}') if not guestToken:
r = self._get('https://twitter.com/i/search/timeline', guestToken = self._get_guest_token()
params = { headers['x-guest-token'] = guestToken
'f': 'tweets',
'vertical': 'default',
'lang': 'en',
'q': self._query,
'include_available_features': '1',
'include_entities': '1',
'reset_error_state': 'false',
'src': 'spxr',
'qf': 'off',
'max_position': maxPosition,
},
headers = headers,
responseOkCallback = self._check_json_callback)
obj = json.loads(r.text) logger.info(f'Retrieving scroll page {cursor}')
feed, _ = self._get_feed_from_html(obj['items_html'], False) params = {
if feed: 'include_profile_interstitial_type': '1',
yield from self._feed_to_items(feed) 'include_blocking': '1',
if obj['min_position'] == maxPosition: 'include_blocked_by': '1',
return 'include_followed_by': '1',
maxPosition = obj['min_position'] 'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_composer_source': 'true',
'include_ext_alt_text': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweets': 'true',
'q': self._query,
'tweet_search_mode': 'live',
'count': '100',
'query_source': 'spelling_expansion_revert_click',
}
if cursor:
params['cursor'] = cursor
params['pc'] = '1'
params['spelling_corrections'] = '1'
params['ext'] = 'mediaStats%2CcameraMoment'
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
if r.status_code == 429:
guestToken = None
continue
try:
obj = r.json()
except json.JSONDecodeError as e:
logger.error(f'Received invalid JSON from Twitter: {e!s}')
raise RuntimeError('Received invalid JSON from Twitter') from e
# No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
entries = instruction['addEntries']['entries']
elif 'replaceEntry' in instruction:
entries = [instruction['replaceEntry']['entry']]
else:
continue
for entry in entries:
if entry['entryId'].startswith('sq-I-t-'):
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
tweetID = tweet['id']
content = tweet['full_text']
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
url = f'https://twitter.com/{username}/status/{tweetID}'
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
elif entry['entryId'] == 'sq-cursor-bottom':
newCursor = entry['content']['operation']['cursor']['value']
if not newCursor or newCursor == cursor:
# End of pagination
break
cursor = newCursor
@classmethod @classmethod
def setup_parser(cls, subparser): def setup_parser(cls, subparser):
subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition') subparser.add_argument('--cursor', metavar = 'CURSOR')
subparser.add_argument('query', help = 'A Twitter search string') subparser.add_argument('query', help = 'A Twitter search string')
@classmethod @classmethod
def from_args(cls, args): def from_args(cls, args):
return cls(args.query, maxPosition = args.maxPosition, retries = args.retries) return cls(args.query, cursor = args.cursor, retries = args.retries)
class TwitterUserScraper(TwitterSearchScraper): class TwitterUserScraper(TwitterSearchScraper):