22 Commits

Author SHA1 Message Date
JustAnotherArchivist
abf31764b1 Version 0.2.0 2019-04-21 23:03:21 +02:00
JustAnotherArchivist
64693f74bb Update Instagram query hash 2019-04-19 01:47:38 +02:00
JustAnotherArchivist
a7d08ed51c Remove leftover debugging print 2019-04-19 01:40:29 +02:00
JustAnotherArchivist
f48ca7726e Add support for Gab 2019-04-19 00:40:43 +02:00
JustAnotherArchivist
78c295f7e0 Add support for VKontakte (fixes #13) 2019-04-18 18:39:21 +02:00
JustAnotherArchivist
a5aca1a14f Add support for Instagram hashtags (fixes #29) 2019-04-18 16:14:54 +02:00
JustAnotherArchivist
96f7d871c1 Ignore Scraper subclasses which don't set a name 2019-04-18 16:14:26 +02:00
JustAnotherArchivist
b5dfd37949 Support unix timestamps in --since 2019-04-18 16:01:35 +02:00
JustAnotherArchivist
b511397791 Add --since option to return only results newer than a certain date (fixes #19) 2019-04-18 15:12:29 +02:00
JustAnotherArchivist
536fcb3303 Return proper items from scrapers including clean URLs (fixes #9 and #10) 2019-04-18 14:44:21 +02:00
JustAnotherArchivist
f8d812f799 Include permalink.php, events, and notes (fixes #32) 2019-04-18 04:22:47 +02:00
JustAnotherArchivist
c2cebd9166 Accept-Language header to get an English response unconditionally 2019-04-18 03:58:37 +02:00
JustAnotherArchivist
73bc99596f Treat Twitter responses without a Content-Type header as invalid (fixes #21) 2019-04-18 02:24:35 +02:00
JustAnotherArchivist
8458c12218 Rewrite link extraction on Facebook (fixes #17)
Facebook's returned HTML has a large number of inconsistencies; some (most) pages include a <link rel="canonical" /> but some don't, for example. This was at the root of the failing post extraction for some Facebook pages (#17). The previous link extraction technique was also quite poor for other reasons though. The new method uses the relevant CSS classes instead. Despite probably being the result of a CSS minimiser or similar, these seem to be quite stable: they haven't changed in the past two years (but the more readable ones have!).
2019-04-18 02:14:21 +02:00
JustAnotherArchivist
b59c7e8d8f Merge pull request #28 from peterk/master
Adds socks proxy support (via requests)
2019-03-11 13:32:07 +01:00
Peter Krantz
3ceb849d98 Adds socks proxy support (via requests) 2019-01-10 22:54:42 +01:00
JustAnotherArchivist
f5ee1f7ac5 Merge pull request #26 from ludios/avoid-twitter-bans
twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans
2018-12-25 02:19:17 +01:00
Ivan Kozik
1984110f78 twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans 2018-12-24 08:03:33 +00:00
JustAnotherArchivist
c5a5dcb92c snscrape is now on PyPI 2018-10-09 17:26:03 +02:00
JustAnotherArchivist
cfb1c9a2aa Version 0.1.3 2018-10-01 03:26:22 +02:00
JustAnotherArchivist
d0d3c8b2a6 Better log output for temporary failures (fixes #2) 2018-10-01 03:24:29 +02:00
JustAnotherArchivist
4d0350e541 Disable "quality filter" on Twitter (fixes #3) 2018-10-01 02:51:33 +02:00
9 changed files with 440 additions and 56 deletions

View File

@@ -13,6 +13,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed. Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
## Installation ## Installation
pip3 install snscrape
If you want to use the development version:
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
## Usage ## Usage

View File

@@ -3,7 +3,7 @@ import setuptools
setuptools.setup( setuptools.setup(
name = 'snscrape', name = 'snscrape',
version = '0.1.2', version = '0.2.0',
description = 'A social networking service scraper', description = 'A social networking service scraper',
author = 'JustAnotherArchivist', author = 'JustAnotherArchivist',
url = 'https://github.com/JustAnotherArchivist/snscrape', url = 'https://github.com/JustAnotherArchivist/snscrape',
@@ -13,7 +13,7 @@ setuptools.setup(
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
], ],
packages = ['snscrape', 'snscrape.modules'], packages = ['snscrape', 'snscrape.modules'],
install_requires = ['requests', 'lxml', 'beautifulsoup4'], install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
'snscrape = snscrape.cli:main', 'snscrape = snscrape.cli:main',

View File

@@ -59,11 +59,32 @@ class Scraper:
logger.debug(f'... with data: {data!r}') logger.debug(f'... with data: {data!r}')
try: try:
r = self._session.send(req, timeout = timeout) r = self._session.send(req, timeout = timeout)
if responseOkCallback is None or responseOkCallback(r):
logger.debug(f'{req.url} retrieved successfully')
return r
except requests.exceptions.RequestException as exc: except requests.exceptions.RequestException as exc:
logger.error(f'Error retrieving {url}: {exc!r}') if attempt < self._retries:
retrying = ', retrying'
level = logging.WARNING
else:
retrying = ''
level = logging.ERROR
logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
else:
if responseOkCallback is not None:
success, msg = responseOkCallback(r)
else:
success, msg = (True, None)
msg = f': {msg}' if msg else ''
if success:
logger.debug(f'{req.url} retrieved successfully{msg}')
return r
else:
if attempt < self._retries:
retrying = ', retrying'
level = logging.WARNING
else:
retrying = ''
level = logging.ERROR
logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
if attempt < self._retries: if attempt < self._retries:
sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc. sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
logger.info(f'Waiting {sleepTime:.0f} seconds') logger.info(f'Waiting {sleepTime:.0f} seconds')

View File

@@ -1,4 +1,5 @@
import argparse import argparse
import datetime
import logging import logging
import snscrape.base import snscrape.base
import snscrape.modules import snscrape.modules
@@ -7,19 +8,42 @@ import snscrape.modules
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def parse_datetime_arg(arg):
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
try:
d = datetime.datetime.strptime(arg, format)
except ValueError:
continue
else:
if d.tzinfo is None:
return d.replace(tzinfo = datetime.timezone.utc)
return d
# Try treating it as a unix timestamp
try:
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
except ValueError:
pass
else:
return d
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity') parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N', parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff') help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results') parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use') subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
classes = snscrape.base.Scraper.__subclasses__() classes = snscrape.base.Scraper.__subclasses__()
for cls in classes: for cls in classes:
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter) if cls.name is not None:
cls.setup_parser(subparser) subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
subparser.set_defaults(cls = cls) cls.setup_parser(subparser)
subparser.set_defaults(cls = cls)
classes.extend(cls.__subclasses__()) classes.extend(cls.__subclasses__())
args = parser.parse_args() args = parser.parse_args()
@@ -57,7 +81,13 @@ def main():
i = 0 i = 0
for i, item in enumerate(scraper.get_items(), start = 1): for i, item in enumerate(scraper.get_items(), start = 1):
print(item) if args.since is not None and item.date < args.since:
logger.info(f'Exiting due to reaching older results than {args.since}')
break
if args.format is not None:
print(args.format.format(**item._asdict()))
else:
print(item)
if args.maxResults and i >= args.maxResults: if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results') logger.info(f'Exiting after {i} results')
break break

View File

@@ -1,14 +1,26 @@
import bs4 import bs4
import datetime
import json import json
import logging import logging
import re import re
import snscrape.base import snscrape.base
import typing
import urllib.parse import urllib.parse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: typing.Optional[str]
def __str__(self):
return self.cleanUrl
class FacebookUserScraper(snscrape.base.Scraper): class FacebookUserScraper(snscrape.base.Scraper):
name = 'facebook-user' name = 'facebook-user'
@@ -16,18 +28,52 @@ class FacebookUserScraper(snscrape.base.Scraper):
super().__init__(**kwargs) super().__init__(**kwargs)
self._username = username self._username = username
def _soup_to_items(self, soup, username, baseUrl): def _clean_url(self, dirtyUrl):
yielded = set() u = urllib.parse.urlparse(dirtyUrl)
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')): if u.path == '/permalink.php':
href = a.get('href') # Retain only story_fbid and id parameters
if href.startswith(f'/{username}/'): q = urllib.parse.parse_qs(u.query)
link = urllib.parse.urljoin(baseUrl, href) clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
if link not in yielded: elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
yield snscrape.base.URLItem(link) # No manipulation of the path needed, but strip the query string
yielded.add(link) clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.split('/')[2] in ('photos', 'videos'):
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
# But to be safe, also handle URLs that don't have that crap correctly.
if u.path.count('/') == 4:
clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.count('/') == 5:
# Strip out the third path component
pathcomps = u.path.split('/')
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
else:
return dirtyUrl
else:
# If we don't recognise the URL, just return the original one.
return dirtyUrl
return urllib.parse.urlunsplit(clean)
def _soup_to_items(self, soup, baseUrl):
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
href = entryA.get('href')
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/')):
if href != '#' or 'new photo' not in entry.text or 'to the album' not in entry.text:
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
logger.warning(f'Ignoring odd link: {href}')
continue
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
contentDiv = entry.find('div', class_ = '_5pbx')
if contentDiv:
content = contentDiv.text
else:
content = None
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content)
def get_items(self): def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=') nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
spuriousForLoopPattern = re.compile(r'^for \(;;\);') spuriousForLoopPattern = re.compile(r'^for \(;;\);')
@@ -42,9 +88,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
logger.error('Got status code {r.status_code}') logger.error('Got status code {r.status_code}')
return return
soup = bs4.BeautifulSoup(r.text, 'lxml') soup = bs4.BeautifulSoup(r.text, 'lxml')
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation yield from self._soup_to_items(soup, baseUrl)
baseUrl = f'https://www.facebook.com/{username}/'
yield from self._soup_to_items(soup, username, baseUrl)
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern) nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
while nextPageLink: while nextPageLink:
@@ -65,7 +109,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
assert response['domops'][0][2] == False assert response['domops'][0][2] == False
assert '__html' in response['domops'][0][3] assert '__html' in response['domops'][0][3]
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml') soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
yield from self._soup_to_items(soup, username, baseUrl) yield from self._soup_to_items(soup, baseUrl)
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern) nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
@classmethod @classmethod

115
snscrape/modules/gab.py Normal file
View File

@@ -0,0 +1,115 @@
import datetime
import json
import logging
import snscrape.base
import time
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class GabPost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class GabUserCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, username, **kwargs):
super().__init__(**kwargs)
if mode not in ('posts', 'comments', 'media'):
raise ValueError('Invalid mode')
self._mode = mode
self._username = username
if mode == 'posts':
self._baseUrl = f'https://gab.com/api/feed/{username}'
self._beforeGlue = '?'
elif mode == 'comments':
self._baseUrl = f'https://gab.com/api/feed/{username}/comments?includes=post.conversation_parent'
self._beforeGlue = '&'
elif mode == 'media':
self._baseUrl = f'https://gab.com/api/feed/{username}/media'
self._beforeGlue = '?'
def _response_to_items(self, response):
yielded = set()
for post in response['data']:
if post['post']['id'] not in yielded:
yield GabPost(
url = f'https://gab.com/{post["post"]["user"]["username"]}/posts/{post["post"]["id"]}',
date = datetime.datetime.strptime(post['post']['created_at'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z'),
content = post['post']['body'],
)
yielded.add(post['post']['id'])
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
logger.info('Retrieving initial data')
r = self._get(self._baseUrl, headers = headers)
if r.status_code == 404:
logger.error('User does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
if not response['data']:
logger.error('User has no posts')
return
yield from self._response_to_items(response)
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before = 30
while True:
logger.info('Retrieving next page')
r = self._get(f'{self._baseUrl}{self._beforeGlue}before={before}', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
yield from self._response_to_items(response)
if response['no-more'] or not response['data']:
# Last page
return
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before += 30
time.sleep(1) # Gab's API is pretty quick but doesn't like being hammered...
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A Gab username')
class GabUserPostsScraper(GabUserCommonScraper):
name = 'gab-user'
@classmethod
def from_args(cls, args):
return cls('posts', args.username, retries = args.retries)
class GabUserCommentsScraper(GabUserCommonScraper):
name = 'gab-user-comments'
@classmethod
def from_args(cls, args):
return cls('comments', args.username, retries = args.retries)
class GabUserMediaScraper(GabUserCommonScraper):
name = 'gab-user-media'
@classmethod
def from_args(cls, args):
return cls('media', args.username, retries = args.retries)

View File

@@ -1,31 +1,72 @@
import datetime
import hashlib import hashlib
import json import json
import logging import logging
import snscrape.base import snscrape.base
import typing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class InstagramUserScraper(snscrape.base.Scraper): class InstagramPost(typing.NamedTuple, snscrape.base.Item):
name = 'instagram-user' cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: str
thumbnailUrl: str
displayUrl: str
def __init__(self, username, **kwargs): def __str__(self):
return self.cleanUrl
class InstagramCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, name, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self._username = username if mode not in ('User', 'Hashtag'):
raise ValueError('Invalid mode')
self._mode = mode
self._name = name
def _response_to_items(self, response, username): if self._mode == 'User':
for node in response['user']['edge_owner_to_timeline_media']['edges']: self._initialUrl = f'https://www.instagram.com/{self._name}/'
self._pageName = 'ProfilePage'
self._responseContainer = 'user'
self._edgeXToMedia = 'edge_owner_to_timeline_media'
self._pageIDKey = 'id'
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
elif self._mode == 'Hashtag':
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
self._pageName = 'TagPage'
self._responseContainer = 'hashtag'
self._edgeXToMedia = 'edge_hashtag_to_media'
self._pageIDKey = 'name'
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
self._variablesFormat = '{{"tag_name":"{pageID}","show_ranked":false,"first":10,"after":"{endCursor}"}}'
def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
code = node['node']['shortcode'] code = node['node']['shortcode']
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here? usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
cleanUrl = f'https://www.instagram.com/p/{code}/'
yield InstagramPost(
cleanUrl = cleanUrl,
dirtyUrl = f'{cleanUrl}{usernameQuery}',
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
)
def get_items(self): def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
logger.info('Retrieving initial data') logger.info('Retrieving initial data')
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers) r = self._get(self._initialUrl, headers = headers)
if r.status_code == 404: if r.status_code == 404:
logger.warning('User does not exist') logger.warning(f'{self._mode} does not exist')
return return
elif r.status_code != 200: elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}') logger.error(f'Got status code {r.status_code}')
@@ -33,42 +74,57 @@ class InstagramUserScraper(snscrape.base.Scraper):
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble. jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
response = json.loads(jsonData) response = json.loads(jsonData)
rhxGis = response['rhx_gis'] rhxGis = response['rhx_gis']
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0: if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
logger.info('User has no posts') logger.info(f'{self._mode} has no posts')
return return
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']: if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
logger.warning('Private account') logger.warning('Private account')
return return
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id'] pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username) if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
return return
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor'] endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
while True: while True:
logger.info(f'Retrieving endCursor = {endCursor!r}') logger.info(f'Retrieving endCursor = {endCursor!r}')
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}' variables = self._variablesFormat.format(**locals())
headers['X-Requested-With'] = 'XMLHttpRequest' headers['X-Requested-With'] = 'XMLHttpRequest'
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest() headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers) r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers)
if r.status_code != 200: if r.status_code != 200:
logger.error(f'Got status code {r.status_code}') logger.error(f'Got status code {r.status_code}')
return return
response = json.loads(r.text) response = json.loads(r.text)
if not response['data']['user']['edge_owner_to_timeline_media']['edges']: if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
return return
yield from self._response_to_items(response['data'], username) yield from self._response_to_items(response['data'])
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']: if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return return
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor'] endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
class InstagramUserScraper(InstagramCommonScraper):
name = 'instagram-user'
@classmethod @classmethod
def setup_parser(cls, subparser): def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'An Instagram username') subparser.add_argument('username', help = 'An Instagram username (no leading @)')
@classmethod @classmethod
def from_args(cls, args): def from_args(cls, args):
return cls(args.username, retries = args.retries) return cls('User', args.username, retries = args.retries)
class InstagramHashtagScraper(InstagramCommonScraper):
name = 'instagram-hashtag'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
@classmethod
def from_args(cls, args):
return cls('Hashtag', args.hashtag, retries = args.retries)

View File

@@ -1,12 +1,24 @@
import bs4 import bs4
import datetime
import json import json
import random
import logging import logging
import snscrape.base import snscrape.base
import typing
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Tweet(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class TwitterSearchScraper(snscrape.base.Scraper): class TwitterSearchScraper(snscrape.base.Scraper):
name = 'twitter-search' name = 'twitter-search'
@@ -23,20 +35,21 @@ class TwitterSearchScraper(snscrape.base.Scraper):
for tweet in feed: for tweet in feed:
username = tweet.find('span', 'username').find('b').text username = tweet.find('span', 'username').find('b').text
tweetID = tweet['data-item-id'] tweetID = tweet['data-item-id']
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}') date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc)
content = tweet.find('p', 'tweet-text').text
yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content)
def _check_json_callback(self, r): def _check_json_callback(self, r):
if r.headers['content-type'] != 'application/json;charset=utf-8': if r.headers.get('content-type') != 'application/json;charset=utf-8':
logger.error(f'Content type of {r.url} is not JSON') return False, f'content type is not JSON'
return False return True, None
return True
def get_items(self): def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
# First page # First page
logger.info(f'Retrieving search page for {self._query}') logger.info(f'Retrieving search page for {self._query}')
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd'}, headers = headers) r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
feed = self._get_feed_from_html(r.text) feed = self._get_feed_from_html(r.text)
if not feed: if not feed:
@@ -57,6 +70,7 @@ class TwitterSearchScraper(snscrape.base.Scraper):
'include_entities': '1', 'include_entities': '1',
'reset_error_state': 'false', 'reset_error_state': 'false',
'src': 'typd', 'src': 'typd',
'qf': 'off',
'max_position': maxPosition, 'max_position': maxPosition,
}, },
headers = headers, headers = headers,

View File

@@ -0,0 +1,100 @@
import bs4
import datetime
import itertools
import logging
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class VKontakteUserScraper(snscrape.base.Scraper):
name = 'vkontakte-user'
def __init__(self, username, **kwargs):
super().__init__(**kwargs)
self._username = username
def _soup_to_items(self, soup, baseUrl):
for post in soup.find_all('div', class_ = 'post'):
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
textDiv = post.find('div', class_ = 'wall_post_text')
yield VKontaktePost(
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
content = textDiv.text if textDiv else None,
)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
baseUrl = f'https://vk.com/{self._username}'
logger.info('Retrieving initial data')
r = self._get(baseUrl, headers = headers)
if r.status_code == 404:
logger.error('Wall does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
logger.error('Private profile')
return
newestPost = soup.find('div', class_ = 'post')
if not newestPost:
logger.info('Wall has no posts')
return
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
# If there is a pinned post, we need its ID for the pagination requests
if 'post_fixed' in newestPost.attrs['class']:
fixedPostID = newestPost.attrs['id'].split('_')[1]
else:
fixedPostID = ''
yield from self._soup_to_items(soup, baseUrl)
headers['X-Requested-With'] = 'XMLHttpRequest'
for offset in itertools.count(start = 10, step = 10):
logger.info('Retrieving next page')
r = self._post(
'https://vk.com/al_wall.php',
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
headers = headers
)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
fields = r.content.split(b'<!>')
if fields[5].startswith(b'<div class="page_block no_posts">'):
# Reached the end
break
if not fields[5].startswith(b'<div id="post'):
logger.error(f'Got an unknown response: {fields[5][:200]!r}...')
break
soup = bs4.BeautifulSoup(fields[5], 'lxml', from_encoding = r.encoding)
yield from self._soup_to_items(soup, baseUrl)
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A VK username')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)