26 Commits
v0.1 ... v0.2.0

Author SHA1 Message Date
JustAnotherArchivist
abf31764b1 Version 0.2.0 2019-04-21 23:03:21 +02:00
JustAnotherArchivist
64693f74bb Update Instagram query hash 2019-04-19 01:47:38 +02:00
JustAnotherArchivist
a7d08ed51c Remove leftover debugging print 2019-04-19 01:40:29 +02:00
JustAnotherArchivist
f48ca7726e Add support for Gab 2019-04-19 00:40:43 +02:00
JustAnotherArchivist
78c295f7e0 Add support for VKontakte (fixes #13) 2019-04-18 18:39:21 +02:00
JustAnotherArchivist
a5aca1a14f Add support for Instagram hashtags (fixes #29) 2019-04-18 16:14:54 +02:00
JustAnotherArchivist
96f7d871c1 Ignore Scraper subclasses which don't set a name 2019-04-18 16:14:26 +02:00
JustAnotherArchivist
b5dfd37949 Support unix timestamps in --since 2019-04-18 16:01:35 +02:00
JustAnotherArchivist
b511397791 Add --since option to return only results newer than a certain date (fixes #19) 2019-04-18 15:12:29 +02:00
JustAnotherArchivist
536fcb3303 Return proper items from scrapers including clean URLs (fixes #9 and #10) 2019-04-18 14:44:21 +02:00
JustAnotherArchivist
f8d812f799 Include permalink.php, events, and notes (fixes #32) 2019-04-18 04:22:47 +02:00
JustAnotherArchivist
c2cebd9166 Accept-Language header to get an English response unconditionally 2019-04-18 03:58:37 +02:00
JustAnotherArchivist
73bc99596f Treat Twitter responses without a Content-Type header as invalid (fixes #21) 2019-04-18 02:24:35 +02:00
JustAnotherArchivist
8458c12218 Rewrite link extraction on Facebook (fixes #17)
Facebook's returned HTML has a large number of inconsistencies; some (most) pages include a <link rel="canonical" /> but some don't, for example. This was at the root of the failing post extraction for some Facebook pages (#17). The previous link extraction technique was also quite poor for other reasons though. The new method uses the relevant CSS classes instead. Despite probably being the result of a CSS minimiser or similar, these seem to be quite stable: they haven't changed in the past two years (but the more readable ones have!).
2019-04-18 02:14:21 +02:00
JustAnotherArchivist
b59c7e8d8f Merge pull request #28 from peterk/master
Adds socks proxy support (via requests)
2019-03-11 13:32:07 +01:00
Peter Krantz
3ceb849d98 Adds socks proxy support (via requests) 2019-01-10 22:54:42 +01:00
JustAnotherArchivist
f5ee1f7ac5 Merge pull request #26 from ludios/avoid-twitter-bans
twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans
2018-12-25 02:19:17 +01:00
Ivan Kozik
1984110f78 twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans 2018-12-24 08:03:33 +00:00
JustAnotherArchivist
c5a5dcb92c snscrape is now on PyPI 2018-10-09 17:26:03 +02:00
JustAnotherArchivist
cfb1c9a2aa Version 0.1.3 2018-10-01 03:26:22 +02:00
JustAnotherArchivist
d0d3c8b2a6 Better log output for temporary failures (fixes #2) 2018-10-01 03:24:29 +02:00
JustAnotherArchivist
4d0350e541 Disable "quality filter" on Twitter (fixes #3) 2018-10-01 02:51:33 +02:00
JustAnotherArchivist
d17aa15bcb Version 0.1.2 2018-09-11 12:44:07 +02:00
JustAnotherArchivist
d1ef280d6e Fix snscrape.modules not getting installed 2018-09-11 12:43:10 +02:00
JustAnotherArchivist
2823272e0b Version 0.1.1 2018-09-11 12:30:35 +02:00
JustAnotherArchivist
540f557002 Fix typo in setup.py preventing installation 2018-09-11 12:30:21 +02:00
9 changed files with 442 additions and 58 deletions

View File

@@ -13,6 +13,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
## Installation
pip3 install snscrape
If you want to use the development version:
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
## Usage

View File

@@ -3,17 +3,17 @@ import setuptools
setuptools.setup(
name = 'snscrape',
version = '0.1',
version = '0.2.0',
description = 'A social networking service scraper',
author = 'JustAnotherArchivist',
url = 'https://github.com/JustAnotherArchivist/snscrape',
classifiers = [
'Development Status :: 4 - Beta',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)'.
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python :: 3.6',
],
packages = ['snscrape'],
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
packages = ['snscrape', 'snscrape.modules'],
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
entry_points = {
'console_scripts': [
'snscrape = snscrape.cli:main',

View File

@@ -59,11 +59,32 @@ class Scraper:
logger.debug(f'... with data: {data!r}')
try:
r = self._session.send(req, timeout = timeout)
if responseOkCallback is None or responseOkCallback(r):
logger.debug(f'{req.url} retrieved successfully')
return r
except requests.exceptions.RequestException as exc:
logger.error(f'Error retrieving {url}: {exc!r}')
if attempt < self._retries:
retrying = ', retrying'
level = logging.WARNING
else:
retrying = ''
level = logging.ERROR
logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
else:
if responseOkCallback is not None:
success, msg = responseOkCallback(r)
else:
success, msg = (True, None)
msg = f': {msg}' if msg else ''
if success:
logger.debug(f'{req.url} retrieved successfully{msg}')
return r
else:
if attempt < self._retries:
retrying = ', retrying'
level = logging.WARNING
else:
retrying = ''
level = logging.ERROR
logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
if attempt < self._retries:
sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
logger.info(f'Waiting {sleepTime:.0f} seconds')

View File

@@ -1,4 +1,5 @@
import argparse
import datetime
import logging
import snscrape.base
import snscrape.modules
@@ -7,19 +8,42 @@ import snscrape.modules
logger = logging.getLogger(__name__)
def parse_datetime_arg(arg):
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
try:
d = datetime.datetime.strptime(arg, format)
except ValueError:
continue
else:
if d.tzinfo is None:
return d.replace(tzinfo = datetime.timezone.utc)
return d
# Try treating it as a unix timestamp
try:
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
except ValueError:
pass
else:
return d
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
def parse_args():
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
classes = snscrape.base.Scraper.__subclasses__()
for cls in classes:
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
cls.setup_parser(subparser)
subparser.set_defaults(cls = cls)
if cls.name is not None:
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
cls.setup_parser(subparser)
subparser.set_defaults(cls = cls)
classes.extend(cls.__subclasses__())
args = parser.parse_args()
@@ -57,7 +81,13 @@ def main():
i = 0
for i, item in enumerate(scraper.get_items(), start = 1):
print(item)
if args.since is not None and item.date < args.since:
logger.info(f'Exiting due to reaching older results than {args.since}')
break
if args.format is not None:
print(args.format.format(**item._asdict()))
else:
print(item)
if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results')
break

View File

@@ -1,14 +1,26 @@
import bs4
import datetime
import json
import logging
import re
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: typing.Optional[str]
def __str__(self):
return self.cleanUrl
class FacebookUserScraper(snscrape.base.Scraper):
name = 'facebook-user'
@@ -16,18 +28,52 @@ class FacebookUserScraper(snscrape.base.Scraper):
super().__init__(**kwargs)
self._username = username
def _soup_to_items(self, soup, username, baseUrl):
yielded = set()
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')):
href = a.get('href')
if href.startswith(f'/{username}/'):
link = urllib.parse.urljoin(baseUrl, href)
if link not in yielded:
yield snscrape.base.URLItem(link)
yielded.add(link)
def _clean_url(self, dirtyUrl):
u = urllib.parse.urlparse(dirtyUrl)
if u.path == '/permalink.php':
# Retain only story_fbid and id parameters
q = urllib.parse.parse_qs(u.query)
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
# No manipulation of the path needed, but strip the query string
clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.split('/')[2] in ('photos', 'videos'):
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
# But to be safe, also handle URLs that don't have that crap correctly.
if u.path.count('/') == 4:
clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.count('/') == 5:
# Strip out the third path component
pathcomps = u.path.split('/')
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
else:
return dirtyUrl
else:
# If we don't recognise the URL, just return the original one.
return dirtyUrl
return urllib.parse.urlunsplit(clean)
def _soup_to_items(self, soup, baseUrl):
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
href = entryA.get('href')
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/')):
if href != '#' or 'new photo' not in entry.text or 'to the album' not in entry.text:
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
logger.warning(f'Ignoring odd link: {href}')
continue
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
contentDiv = entry.find('div', class_ = '_5pbx')
if contentDiv:
content = contentDiv.text
else:
content = None
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
@@ -42,9 +88,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
logger.error('Got status code {r.status_code}')
return
soup = bs4.BeautifulSoup(r.text, 'lxml')
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation
baseUrl = f'https://www.facebook.com/{username}/'
yield from self._soup_to_items(soup, username, baseUrl)
yield from self._soup_to_items(soup, baseUrl)
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
while nextPageLink:
@@ -65,7 +109,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
assert response['domops'][0][2] == False
assert '__html' in response['domops'][0][3]
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
yield from self._soup_to_items(soup, username, baseUrl)
yield from self._soup_to_items(soup, baseUrl)
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
@classmethod

115
snscrape/modules/gab.py Normal file
View File

@@ -0,0 +1,115 @@
import datetime
import json
import logging
import snscrape.base
import time
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class GabPost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class GabUserCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, username, **kwargs):
super().__init__(**kwargs)
if mode not in ('posts', 'comments', 'media'):
raise ValueError('Invalid mode')
self._mode = mode
self._username = username
if mode == 'posts':
self._baseUrl = f'https://gab.com/api/feed/{username}'
self._beforeGlue = '?'
elif mode == 'comments':
self._baseUrl = f'https://gab.com/api/feed/{username}/comments?includes=post.conversation_parent'
self._beforeGlue = '&'
elif mode == 'media':
self._baseUrl = f'https://gab.com/api/feed/{username}/media'
self._beforeGlue = '?'
def _response_to_items(self, response):
yielded = set()
for post in response['data']:
if post['post']['id'] not in yielded:
yield GabPost(
url = f'https://gab.com/{post["post"]["user"]["username"]}/posts/{post["post"]["id"]}',
date = datetime.datetime.strptime(post['post']['created_at'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z'),
content = post['post']['body'],
)
yielded.add(post['post']['id'])
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
logger.info('Retrieving initial data')
r = self._get(self._baseUrl, headers = headers)
if r.status_code == 404:
logger.error('User does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
if not response['data']:
logger.error('User has no posts')
return
yield from self._response_to_items(response)
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before = 30
while True:
logger.info('Retrieving next page')
r = self._get(f'{self._baseUrl}{self._beforeGlue}before={before}', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
yield from self._response_to_items(response)
if response['no-more'] or not response['data']:
# Last page
return
if self._mode == 'posts':
before = response['data'][-1]['published_at']
elif self._mode in ('comments', 'media'):
before += 30
time.sleep(1) # Gab's API is pretty quick but doesn't like being hammered...
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A Gab username')
class GabUserPostsScraper(GabUserCommonScraper):
name = 'gab-user'
@classmethod
def from_args(cls, args):
return cls('posts', args.username, retries = args.retries)
class GabUserCommentsScraper(GabUserCommonScraper):
name = 'gab-user-comments'
@classmethod
def from_args(cls, args):
return cls('comments', args.username, retries = args.retries)
class GabUserMediaScraper(GabUserCommonScraper):
name = 'gab-user-media'
@classmethod
def from_args(cls, args):
return cls('media', args.username, retries = args.retries)

View File

@@ -1,31 +1,72 @@
import datetime
import hashlib
import json
import logging
import snscrape.base
import typing
logger = logging.getLogger(__name__)
class InstagramUserScraper(snscrape.base.Scraper):
name = 'instagram-user'
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: str
thumbnailUrl: str
displayUrl: str
def __init__(self, username, **kwargs):
def __str__(self):
return self.cleanUrl
class InstagramCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, name, **kwargs):
super().__init__(**kwargs)
self._username = username
if mode not in ('User', 'Hashtag'):
raise ValueError('Invalid mode')
self._mode = mode
self._name = name
def _response_to_items(self, response, username):
for node in response['user']['edge_owner_to_timeline_media']['edges']:
if self._mode == 'User':
self._initialUrl = f'https://www.instagram.com/{self._name}/'
self._pageName = 'ProfilePage'
self._responseContainer = 'user'
self._edgeXToMedia = 'edge_owner_to_timeline_media'
self._pageIDKey = 'id'
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
elif self._mode == 'Hashtag':
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
self._pageName = 'TagPage'
self._responseContainer = 'hashtag'
self._edgeXToMedia = 'edge_hashtag_to_media'
self._pageIDKey = 'name'
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
self._variablesFormat = '{{"tag_name":"{pageID}","show_ranked":false,"first":10,"after":"{endCursor}"}}'
def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
code = node['node']['shortcode']
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
cleanUrl = f'https://www.instagram.com/p/{code}/'
yield InstagramPost(
cleanUrl = cleanUrl,
dirtyUrl = f'{cleanUrl}{usernameQuery}',
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
logger.info('Retrieving initial data')
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
r = self._get(self._initialUrl, headers = headers)
if r.status_code == 404:
logger.warning('User does not exist')
logger.warning(f'{self._mode} does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
@@ -33,42 +74,57 @@ class InstagramUserScraper(snscrape.base.Scraper):
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
response = json.loads(jsonData)
rhxGis = response['rhx_gis']
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
logger.info('User has no posts')
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
logger.info(f'{self._mode} has no posts')
return
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
logger.warning('Private account')
return
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
while True:
logger.info(f'Retrieving endCursor = {endCursor!r}')
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
variables = self._variablesFormat.format(**locals())
headers['X-Requested-With'] = 'XMLHttpRequest'
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
response = json.loads(r.text)
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
return
yield from self._response_to_items(response['data'], username)
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
yield from self._response_to_items(response['data'])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
class InstagramUserScraper(InstagramCommonScraper):
name = 'instagram-user'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'An Instagram username')
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)
return cls('User', args.username, retries = args.retries)
class InstagramHashtagScraper(InstagramCommonScraper):
name = 'instagram-hashtag'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
@classmethod
def from_args(cls, args):
return cls('Hashtag', args.hashtag, retries = args.retries)

View File

@@ -1,12 +1,24 @@
import bs4
import datetime
import json
import random
import logging
import snscrape.base
import typing
logger = logging.getLogger(__name__)
class Tweet(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class TwitterSearchScraper(snscrape.base.Scraper):
name = 'twitter-search'
@@ -23,20 +35,21 @@ class TwitterSearchScraper(snscrape.base.Scraper):
for tweet in feed:
username = tweet.find('span', 'username').find('b').text
tweetID = tweet['data-item-id']
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc)
content = tweet.find('p', 'tweet-text').text
yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content)
def _check_json_callback(self, r):
if r.headers['content-type'] != 'application/json;charset=utf-8':
logger.error(f'Content type of {r.url} is not JSON')
return False
return True
if r.headers.get('content-type') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
return True, None
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
# First page
logger.info(f'Retrieving search page for {self._query}')
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd'}, headers = headers)
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
feed = self._get_feed_from_html(r.text)
if not feed:
@@ -57,6 +70,7 @@ class TwitterSearchScraper(snscrape.base.Scraper):
'include_entities': '1',
'reset_error_state': 'false',
'src': 'typd',
'qf': 'off',
'max_position': maxPosition,
},
headers = headers,

View File

@@ -0,0 +1,100 @@
import bs4
import datetime
import itertools
import logging
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class VKontakteUserScraper(snscrape.base.Scraper):
name = 'vkontakte-user'
def __init__(self, username, **kwargs):
super().__init__(**kwargs)
self._username = username
def _soup_to_items(self, soup, baseUrl):
for post in soup.find_all('div', class_ = 'post'):
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
textDiv = post.find('div', class_ = 'wall_post_text')
yield VKontaktePost(
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
content = textDiv.text if textDiv else None,
)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
baseUrl = f'https://vk.com/{self._username}'
logger.info('Retrieving initial data')
r = self._get(baseUrl, headers = headers)
if r.status_code == 404:
logger.error('Wall does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
logger.error('Private profile')
return
newestPost = soup.find('div', class_ = 'post')
if not newestPost:
logger.info('Wall has no posts')
return
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
# If there is a pinned post, we need its ID for the pagination requests
if 'post_fixed' in newestPost.attrs['class']:
fixedPostID = newestPost.attrs['id'].split('_')[1]
else:
fixedPostID = ''
yield from self._soup_to_items(soup, baseUrl)
headers['X-Requested-With'] = 'XMLHttpRequest'
for offset in itertools.count(start = 10, step = 10):
logger.info('Retrieving next page')
r = self._post(
'https://vk.com/al_wall.php',
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
headers = headers
)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
fields = r.content.split(b'<!>')
if fields[5].startswith(b'<div class="page_block no_posts">'):
# Reached the end
break
if not fields[5].startswith(b'<div id="post'):
logger.error(f'Got an unknown response: {fields[5][:200]!r}...')
break
soup = bs4.BeautifulSoup(fields[5], 'lxml', from_encoding = r.encoding)
yield from self._soup_to_items(soup, baseUrl)
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A VK username')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)