17 Commits

Author SHA1 Message Date
JustAnotherArchivist
7b3c7deb28 Catch login redirects on Instagram 2020-05-30 00:56:34 +00:00
JustAnotherArchivist
040a11656c Update README 2020-05-30 00:53:52 +00:00
JustAnotherArchivist
1459245258 Consistently raise ScraperException on fatal errors 2020-05-30 00:53:49 +00:00
JustAnotherArchivist
dbe4c5ce55 Remove Google+ module
Google+ was mostly shut down in early 2019. What remained (Google+ for G Suite) was renamed to Google Currents and is for internal communication only (and therefore out of scope for snscrape).
2020-05-30 00:35:06 +00:00
JustAnotherArchivist
80491ecc2c Remove Gab module
Since Gab's move to a fork of Mastodon in July 2019, the module had been broken, and a new module would better be written from scratch as the platform changed entirely.
2020-05-30 00:23:33 +00:00
JustAnotherArchivist
1a71b58101 Add support for Telegram
Closes #50
2020-05-29 23:44:01 +00:00
JustAnotherArchivist
0ce37a69d4 Log exception details on crashes 2020-05-29 22:29:23 +00:00
JustAnotherArchivist
722bfd5f7c Handle Twitter tombstones
Fixes #63
2020-05-29 22:12:37 +00:00
JustAnotherArchivist
b6cc3180d9 Force TwitterThreadScraper and TwitterListMembersScraper to fetch the old design 2020-03-04 00:40:49 +00:00
JustAnotherArchivist
613395d1c2 Port TwitterSearchScraper to redesign
Fixes #57
2020-03-04 00:40:49 +00:00
JustAnotherArchivist
82a87b7b5a Merge pull request #53 from JackDallas/add-more-insta-fields
Add more fields to the instagram scraper
2020-02-09 23:48:59 +00:00
Jack Dallas
9568028bf9 Update changed fields 2020-02-07 11:30:16 +00:00
JustAnotherArchivist
6df351772e Fix crash in Facebook scraper on link-less entries 2020-02-05 16:15:10 +00:00
JustAnotherArchivist
541173b0c8 Merge pull request #54 from jodizzle/fix/vkontakte-user
Fix vkontakte-user: pagination returns JSON now, and handle some unscrapable profiles.
2020-02-05 14:56:12 +00:00
Jody Leonard
b6772d3778 vkontakte-user: Handle additional un-scrapeable profile case 2019-10-31 16:01:29 -04:00
Jody Leonard
20ea117a2c Fix vkontakte-user pagination 2019-10-30 22:29:49 -04:00
JackDallas
ff54c350bc Add more fields to the instagram scraper 2019-08-30 12:43:02 +01:00
9 changed files with 228 additions and 315 deletions

View File

@@ -3,9 +3,8 @@ snscrape is a scraper for social networking services (SNS). It scrapes things li
The following services are currently supported:
* Facebook: user profiles and groups
* Gab: user profile posts, media, and comments
* Google+: user profiles
* Instagram: user profiles, hashtags, and locations
* Telegram: channels
* Twitter: user profiles, hashtags, searches, threads, and lists (members as well as posts)
* VKontakte: user profiles

View File

@@ -93,13 +93,19 @@ def _dump_locals_on_exception():
except Exception as e:
trace = inspect.trace()
if len(trace) >= 2:
name = _dump_stack_and_locals(trace[1:])
name = _dump_stack_and_locals(trace[1:], exc = e)
logger.fatal(f'Dumped stack and locals to {name}')
raise
def _dump_stack_and_locals(trace):
def _dump_stack_and_locals(trace, exc = None):
with tempfile.NamedTemporaryFile('w', prefix = 'snscrape_locals_', delete = False) as fp:
if exc is not None:
fp.write('Exception:\n')
fp.write(f' {type(exc).__module__}.{type(exc).__name__}: {exc!s}\n')
fp.write(f' args: {exc.args!r}\n')
fp.write('\n')
fp.write('Stack:\n')
for frameRecord in trace:
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')

View File

@@ -80,6 +80,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
return False, None
def _soup_to_items(self, soup, baseUrl, mode):
cleanUrl = None # Value from previous iteration is used for warning on link-less entries
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
mediaSetA = entry.find('a', class_ = '_17z-')
@@ -96,6 +97,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
logger.warning(f'Ignoring odd link: {href}')
continue
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
cleanUrl = self._clean_url(dirtyUrl)
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
contentDiv = entry.find('div', class_ = '_5pbx')
if contentDiv:
@@ -116,7 +118,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
outlink = query['u'][0]
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
outlinks.append(outlink)
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
class FacebookUserScraper(FacebookCommonScraper):
@@ -139,8 +141,7 @@ class FacebookUserScraper(FacebookCommonScraper):
logger.warning('User does not exist')
return
elif r.status_code != 200:
logger.error('Got status code {r.status_code}')
return
raise snscrape.base.ScraperException('Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
yield from self._soup_to_items(soup, baseUrl, 'user')
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
@@ -152,8 +153,7 @@ class FacebookUserScraper(FacebookCommonScraper):
# Reproducing that would be difficult to get right, especially as Facebook's codebase evolves, so it's just not sent at all here.
r = self._get(urllib.parse.urljoin(baseUrl, nextPageLink.get('ajaxify')) + '&__a=1', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
response = json.loads(spuriousForLoopPattern.sub('', r.text))
assert 'domops' in response
assert len(response['domops']) == 1
@@ -195,12 +195,10 @@ class FacebookGroupScraper(FacebookCommonScraper):
logger.warning('Group does not exist')
return
elif r.status_code != 200:
logger.error('Got status code {r.status_code}')
return
raise snscrape.base.ScraperException('Got status code {r.status_code}')
if 'content:{pagelet_group_mall:{container_id:"' not in r.text:
logger.error('Code container ID marker not found (does the group exist?)')
return
raise snscrape.base.ScraperException('Code container ID marker not found (does the group exist?)')
soup = bs4.BeautifulSoup(r.text, 'lxml')
@@ -210,9 +208,9 @@ class FacebookGroupScraper(FacebookCommonScraper):
codeContainerId = r.text[codeContainerIdPos : r.text.index('"', codeContainerIdPos)]
codeContainer = soup.find('code', id = codeContainerId)
if not codeContainer:
raise RuntimeError('Code container not found')
raise snscrape.base.ScraperException('Code container not found')
if type(codeContainer.string) is not bs4.element.Comment:
raise RuntimeError('Code container does not contain a comment')
raise snscrape.base.ScraperException('Code container does not contain a comment')
codeSoup = bs4.BeautifulSoup(codeContainer.string, 'lxml')
yield from self._soup_to_items(codeSoup, baseUrl, 'group')
@@ -226,7 +224,7 @@ class FacebookGroupScraper(FacebookCommonScraper):
headers = headers,
)
if r.status_code != 200:
raise RuntimeError(f'Got status code {r.status_code}')
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
obj = json.loads(spuriousForLoopPattern.sub('', r.text))
if obj['payload'] == '':
# End of pagination

View File

@@ -1,115 +0,0 @@
import datetime
import json
import logging
import snscrape.base
import time
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class GabPost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class GabUserCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, username, **kwargs):
super().__init__(**kwargs)
if mode not in ('posts', 'comments', 'media'):
raise ValueError('Invalid mode')
self._mode = mode
self._username = username
if mode == 'posts':
self._baseUrl = f'https://gab.com/api/feed/{username}'
self._beforeGlue = '?'
elif mode == 'comments':
self._baseUrl = f'https://gab.com/api/feed/{username}/comments?includes=post.conversation_parent'
self._beforeGlue = '&'
elif mode == 'media':
self._baseUrl = f'https://gab.com/api/feed/{username}/media'
self._beforeGlue = '?'
def _response_to_items(self, response):
yielded = set()
for post in response['data']:
if post['post']['id'] not in yielded:
yield GabPost(
url = f'https://gab.com/{post["post"]["user"]["username"]}/posts/{post["post"]["id"]}',
date = datetime.datetime.strptime(post['post']['created_at'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z'),
content = post['post']['body'],
)
yielded.add(post['post']['id'])
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
logger.info('Retrieving initial data')
r = self._get(self._baseUrl, headers = headers)
if r.status_code == 404:
logger.error('User does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
if not response['data']:
logger.error('User has no posts')
return
yield from self._response_to_items(response)
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before = 30
while True:
logger.info('Retrieving next page')
r = self._get(f'{self._baseUrl}{self._beforeGlue}before={before}', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
yield from self._response_to_items(response)
if response['no-more'] or not response['data']:
# Last page
return
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before += 30
time.sleep(1) # Gab's API is pretty quick but doesn't like being hammered...
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A Gab username')
class GabUserPostsScraper(GabUserCommonScraper):
name = 'gab-user'
@classmethod
def from_args(cls, args):
return cls('posts', args.username, retries = args.retries)
class GabUserCommentsScraper(GabUserCommonScraper):
name = 'gab-user-comments'
@classmethod
def from_args(cls, args):
return cls('comments', args.username, retries = args.retries)
class GabUserMediaScraper(GabUserCommonScraper):
name = 'gab-user-media'
@classmethod
def from_args(cls, args):
return cls('media', args.username, retries = args.retries)

View File

@@ -1,102 +0,0 @@
import datetime
import itertools
import json
import logging
import re
import snscrape.base
logger = logging.getLogger(__name__)
class GooglePlusUserScraper(snscrape.base.Scraper):
name = 'googleplus-user'
def __init__(self, user, **kwargs):
super().__init__(**kwargs)
self._user = user
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
logger.info('Retrieving initial data')
r = self._get(f'https://plus.google.com/{self._user}', headers = headers)
if r.status_code == 404:
logger.warning('User does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# Global data; only needed for the session ID
#TODO: Make this more robust somehow
match = re.search(r'''(['"])FdrFJe\1\s*:\s*(['"])(?P<sid>.*?)\2''', r.text)
if not match:
logger.error('Unable to find session ID')
return
sid = match.group('sid')
# Page data
# As of 2018-05-18, the much simpler regex r'''<script[^>]*>AF_initDataCallback\(\{key: 'ds:6',.*?return (.*?)\}\}\);</script>''' would work also, but this is more generic and less likely to break:
match = re.search(r'''<script[^>]*>\s*(?:.*?)\s*\(\s*\{(?:|.*?,)\s*key\s*:\s*(['"])ds:6\1\s*,.*?,\s*data\s*:\s*function\s*\(\s*\)\s*\{\s*return\s*(?P<data>.*?)\}\s*\}\s*\)\s*;\s*</script>''', r.text, re.DOTALL)
if not match:
logger.error('Unable to extract data')
return
jsonData = match.group('data')
response = json.loads(jsonData)
if response[0][7] is None:
logger.info('User has no posts')
return
for postObj in response[0][7]:
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
cursor = response[0][1] # 'ADSJ_x'
if cursor is None:
# No further pages
return
baseDate = datetime.datetime.utcnow()
baseSeconds = baseDate.hour * 3600 + baseDate.minute * 60 + baseDate.second
userid = response[1] # Alternatively and more ugly: response[0][7][0][6]['33558957'][16]
for counter in itertools.count(start = 2):
logger.info('Retrieving next page')
reqid = 1 + baseSeconds + int(1e5) * counter
r = self._post(
f'https://plus.google.com/_/PlusAppUi/data?ds.extension=74333095&f.sid={sid}&hl=en-US&soc-app=199&soc-platform=1&soc-device=1&_reqid={reqid}&rt=c',
data = [('f.req', '[[[74333095,[{"74333095":["' + cursor + '","' + userid + '"]}],null,null,0]]]'), ('', '')],
headers = headers
)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# As if everything up to here wasn't terrible already, this is where it gets *really* bad.
# The API contains a few junk characters at the beginning, apparently as an anti-CSRF measure.
# The remainder is effectively a self-made chunked transfer encoding but with decimal digits and including everything except the digits themselves in the chunk size.
# It sucks.
# Each chunk is actually one JSON object; you'd think that we can just read the first one and parse that, but there are some quirks that make this difficult.
# I was unable to figure out what the "chunk size" actually covers exactly; the response is UTF-8 encoded, but the chunk size matches neither the binary nor the decoded length.
# Enter the awful workaround: strip away the initial chunk size, then parse the beginning of the remaining data using a parser that doesn't care if there's junk after the JSON.
garbage = r.text
assert garbage[:6] == ")]}'\n\n" # anti-CSRF and two newlines
data = []
pos = 6
while garbage[pos].isdigit() or garbage[pos].isspace(): # Also strip leading whitespace
pos += 1
response = json.JSONDecoder().raw_decode(''.join(garbage[pos:]))[0] # Parses only the first structure in the data stream without throwing an error about the extra data at the end
for postObj in response[0][2]['74333095'][0][7]:
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
cursor = response[0][2]['74333095'][0][1]
if cursor is None:
break
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('user', help = 'A Google Plus username (with leading "+") or numeric ID')
@classmethod
def from_args(cls, args):
return cls(args.user, retries = args.retries)

View File

@@ -16,6 +16,11 @@ class InstagramPost(typing.NamedTuple, snscrape.base.Item):
content: str
thumbnailUrl: str
displayUrl: str
username: str
likes: int
comments: int
commentsDisabled: bool
isVideo: bool
def __str__(self):
return self.cleanUrl
@@ -57,7 +62,8 @@ class InstagramCommonScraper(snscrape.base.Scraper):
def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
code = node['node']['shortcode']
usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
usernameQuery = '?taken-by=' + username
cleanUrl = f'https://www.instagram.com/p/{code}/'
yield InstagramPost(
cleanUrl = cleanUrl,
@@ -66,6 +72,11 @@ class InstagramCommonScraper(snscrape.base.Scraper):
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
username = username,
likes = node['node']['edge_media_preview_like']['count'],
comments = node['node']['edge_media_to_comment']['count'],
commentsDisabled = node['node']['comments_disabled'],
isVideo = node['node']['is_video'],
)
def _check_initial_page_callback(self, r):
@@ -98,8 +109,9 @@ class InstagramCommonScraper(snscrape.base.Scraper):
logger.warning(f'{self._mode} does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
elif r.url.startswith('https://www.instagram.com/accounts/login/'):
raise snscrape.base.ScraperException('Redirected to login page')
response = r._snscrape_json_obj
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
@@ -122,8 +134,7 @@ class InstagramCommonScraper(snscrape.base.Scraper):
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
response = r._snscrape_json_obj
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:

View File

@@ -0,0 +1,66 @@
import bs4
import datetime
import logging
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class TelegramPost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: list
outlinksss: str
def __str__(self):
return self.url
class TelegramChannelScraper(snscrape.base.Scraper):
name = 'telegram-channel'
def __init__(self, name, **kwargs):
super().__init__(**kwargs)
self._name = name
def _soup_to_items(self, soup, pageUrl):
posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})
for post in reversed(posts):
date = datetime.datetime.strptime(post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date').find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
message = post.find('div', class_ = 'tgme_widget_message_text')
if message:
content = message.text
outlinks = [urllib.parse.urljoin(pageUrl, link['href']) for link in post.find_all('a') if not link.text.startswith('@') and link['href'].startswith('https://t.me/')]
outlinksss = ' '.join(outlinks)
else:
content = None
outlinks = []
outlinksss = ''
yield TelegramPost(url = f'https://t.me/s/{post["data-post"]}', date = date, content = content, outlinks = outlinks, outlinksss = outlinksss)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'}
nextPageUrl = f'https://t.me/s/{self._name}'
while True:
r = self._get(nextPageUrl, headers = headers)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
yield from self._soup_to_items(soup, nextPageUrl)
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
break
nextPageUrl = urllib.parse.urljoin(nextPageUrl, pageLink['href'])
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('channel', help = 'A channel name')
@classmethod
def from_args(cls, args):
return cls(args.channel, retries = args.retries)

View File

@@ -3,8 +3,10 @@ import datetime
import json
import random
import logging
import re
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
@@ -86,82 +88,126 @@ class TwitterCommonScraper(snscrape.base.Scraper):
class TwitterSearchScraper(TwitterCommonScraper):
name = 'twitter-search'
def __init__(self, query, maxPosition = None, **kwargs):
def __init__(self, query, cursor = None, **kwargs):
super().__init__(**kwargs)
self._query = query
self._maxPosition = maxPosition
self._cursor = cursor
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
def _get_feed_from_html(self, html, withMinPosition):
soup = bs4.BeautifulSoup(html, 'lxml')
feed = soup.find_all('li', 'js-stream-item')
if withMinPosition:
streamContainer = soup.find('div', 'stream-container')
if not streamContainer or not streamContainer.has_attr('data-min-position'):
if soup.find('div', 'SearchEmptyTimeline'):
# No results found
minPosition = None
else:
# Unknown error condition
raise RuntimeError('Unable to find min-position')
else:
minPosition = streamContainer['data-min-position']
else:
minPosition = None
return feed, minPosition
def _get_guest_token(self):
logger.info(f'Retrieving guest token from search page')
r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
if not match:
raise snscrape.base.ScraperException('Unable to find guest token')
return match.group(1)
def _check_scroll_response(self, r):
if r.status_code == 429:
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
return True, None
if r.headers.get('content-type') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
if r.status_code != 200:
return False, f'non-200 status code'
return True, None
def get_items(self):
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
# First page
if self._maxPosition is None:
logger.info(f'Retrieving search page for {self._query}')
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers)
feed, maxPosition = self._get_feed_from_html(r.text, True)
if not feed:
logger.warning(f'No results for {self._query}')
return
yield from self._feed_to_items(feed)
else:
maxPosition = self._maxPosition
if not maxPosition:
return
headers = {
'User-Agent': self._userAgent,
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'Referer': self._baseUrl,
}
guestToken = None
cursor = self._cursor
while True:
logger.info(f'Retrieving scroll page {maxPosition}')
r = self._get('https://twitter.com/i/search/timeline',
params = {
'f': 'tweets',
'vertical': 'default',
'lang': 'en',
'q': self._query,
'include_available_features': '1',
'include_entities': '1',
'reset_error_state': 'false',
'src': 'spxr',
'qf': 'off',
'max_position': maxPosition,
},
headers = headers,
responseOkCallback = self._check_json_callback)
if not guestToken:
guestToken = self._get_guest_token()
headers['x-guest-token'] = guestToken
obj = json.loads(r.text)
feed, _ = self._get_feed_from_html(obj['items_html'], False)
if feed:
yield from self._feed_to_items(feed)
if obj['min_position'] == maxPosition:
return
maxPosition = obj['min_position']
logger.info(f'Retrieving scroll page {cursor}')
params = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
'include_blocked_by': '1',
'include_followed_by': '1',
'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_composer_source': 'true',
'include_ext_alt_text': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweets': 'true',
'q': self._query,
'tweet_search_mode': 'live',
'count': '100',
'query_source': 'spelling_expansion_revert_click',
}
if cursor:
params['cursor'] = cursor
params['pc'] = '1'
params['spelling_corrections'] = '1'
params['ext'] = 'mediaStats%2CcameraMoment'
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
if r.status_code == 429:
guestToken = None
continue
try:
obj = r.json()
except json.JSONDecodeError as e:
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
# No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
entries = instruction['addEntries']['entries']
elif 'replaceEntry' in instruction:
entries = [instruction['replaceEntry']['entry']]
else:
continue
for entry in entries:
if entry['entryId'].startswith('sq-I-t-'):
if 'tweet' in entry['content']['item']['content']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}')
tweetID = tweet['id']
content = tweet['full_text']
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
url = f'https://twitter.com/{username}/status/{tweetID}'
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
elif entry['entryId'] == 'sq-cursor-bottom':
newCursor = entry['content']['operation']['cursor']['value']
if not newCursor or newCursor == cursor:
# End of pagination
break
cursor = newCursor
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition')
subparser.add_argument('--cursor', metavar = 'CURSOR')
subparser.add_argument('query', help = 'A Twitter search string')
@classmethod
def from_args(cls, args):
return cls(args.query, maxPosition = args.maxPosition, retries = args.retries)
return cls(args.query, cursor = args.cursor, retries = args.retries)
class TwitterUserScraper(TwitterSearchScraper):
@@ -205,7 +251,7 @@ class TwitterThreadScraper(TwitterCommonScraper):
self._tweetID = tweetID
def get_items(self):
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
# Fetch the page of the last tweet in the thread
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
@@ -283,7 +329,7 @@ class TwitterListMembersScraper(TwitterCommonScraper):
self._user, self._list = listName.split('/')
def get_items(self):
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
r = self._get(baseUrl, headers = headers)
@@ -293,7 +339,7 @@ class TwitterListMembersScraper(TwitterCommonScraper):
soup = bs4.BeautifulSoup(r.text, 'lxml')
container = soup.find('div', 'stream-container')
if not container:
raise RuntimeError('Unable to find container')
raise snscrape.base.ScraperException('Unable to find container')
items = container.find_all('li', 'js-stream-item')
if not items:
logger.warning('Empty list')

View File

@@ -43,17 +43,22 @@ class VKontakteUserScraper(snscrape.base.Scraper):
logger.info('Retrieving initial data')
r = self._get(baseUrl, headers = headers)
if r.status_code == 404:
logger.error('Wall does not exist')
logger.warning('Wall does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
logger.error('Private profile')
logger.warning('Private profile')
return
profileDeleted = soup.find('h5', class_ = 'profile_deleted_text')
if profileDeleted:
# Unclear what this state represents, so just log website text.
logger.warning(profileDeleted.text)
return
newestPost = soup.find('div', class_ = 'post')
@@ -78,16 +83,15 @@ class VKontakteUserScraper(snscrape.base.Scraper):
headers = headers
)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
fields = r.content.split(b'<!>')
if fields[5].startswith(b'<div class="page_block no_posts">'):
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
# Convert to JSON and read the HTML payload. Note that this implicitly converts the data to a Python string (i.e., Unicode), away from a windows-1251-encoded bytes.
posts = r.json()['payload'][1][0]
if posts.startswith('<div class="page_block no_posts">'):
# Reached the end
break
if not fields[5].startswith(b'<div id="post'):
logger.error(f'Got an unknown response: {fields[5][:200]!r}...')
break
soup = bs4.BeautifulSoup(fields[5], 'lxml', from_encoding = r.encoding)
if not posts.startswith('<div id="post'):
raise snscrape.base.ScraperException(f'Got an unknown response: {posts[:200]!r}...')
soup = bs4.BeautifulSoup(posts, 'lxml')
yield from self._soup_to_items(soup, baseUrl)
@classmethod