mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 10:38:28 +03:00
Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
abf31764b1 | ||
|
|
64693f74bb | ||
|
|
a7d08ed51c | ||
|
|
f48ca7726e | ||
|
|
78c295f7e0 | ||
|
|
a5aca1a14f | ||
|
|
96f7d871c1 | ||
|
|
b5dfd37949 | ||
|
|
b511397791 | ||
|
|
536fcb3303 | ||
|
|
f8d812f799 | ||
|
|
c2cebd9166 | ||
|
|
73bc99596f | ||
|
|
8458c12218 | ||
|
|
b59c7e8d8f | ||
|
|
3ceb849d98 | ||
|
|
f5ee1f7ac5 | ||
|
|
1984110f78 | ||
|
|
c5a5dcb92c | ||
|
|
cfb1c9a2aa | ||
|
|
d0d3c8b2a6 | ||
|
|
4d0350e541 | ||
|
|
d17aa15bcb | ||
|
|
d1ef280d6e |
@@ -13,6 +13,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
|
||||
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
|
||||
|
||||
## Installation
|
||||
pip3 install snscrape
|
||||
|
||||
If you want to use the development version:
|
||||
|
||||
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
|
||||
|
||||
## Usage
|
||||
|
||||
6
setup.py
6
setup.py
@@ -3,7 +3,7 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
name = 'snscrape',
|
||||
version = '0.1.1',
|
||||
version = '0.2.0',
|
||||
description = 'A social networking service scraper',
|
||||
author = 'JustAnotherArchivist',
|
||||
url = 'https://github.com/JustAnotherArchivist/snscrape',
|
||||
@@ -12,8 +12,8 @@ setuptools.setup(
|
||||
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
],
|
||||
packages = ['snscrape'],
|
||||
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
|
||||
packages = ['snscrape', 'snscrape.modules'],
|
||||
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'snscrape = snscrape.cli:main',
|
||||
|
||||
@@ -59,11 +59,32 @@ class Scraper:
|
||||
logger.debug(f'... with data: {data!r}')
|
||||
try:
|
||||
r = self._session.send(req, timeout = timeout)
|
||||
if responseOkCallback is None or responseOkCallback(r):
|
||||
logger.debug(f'{req.url} retrieved successfully')
|
||||
return r
|
||||
except requests.exceptions.RequestException as exc:
|
||||
logger.error(f'Error retrieving {url}: {exc!r}')
|
||||
if attempt < self._retries:
|
||||
retrying = ', retrying'
|
||||
level = logging.WARNING
|
||||
else:
|
||||
retrying = ''
|
||||
level = logging.ERROR
|
||||
logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
|
||||
else:
|
||||
if responseOkCallback is not None:
|
||||
success, msg = responseOkCallback(r)
|
||||
else:
|
||||
success, msg = (True, None)
|
||||
msg = f': {msg}' if msg else ''
|
||||
|
||||
if success:
|
||||
logger.debug(f'{req.url} retrieved successfully{msg}')
|
||||
return r
|
||||
else:
|
||||
if attempt < self._retries:
|
||||
retrying = ', retrying'
|
||||
level = logging.WARNING
|
||||
else:
|
||||
retrying = ''
|
||||
level = logging.ERROR
|
||||
logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
|
||||
if attempt < self._retries:
|
||||
sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
|
||||
logger.info(f'Waiting {sleepTime:.0f} seconds')
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import argparse
|
||||
import datetime
|
||||
import logging
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
@@ -7,19 +8,42 @@ import snscrape.modules
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_datetime_arg(arg):
|
||||
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
|
||||
try:
|
||||
d = datetime.datetime.strptime(arg, format)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
if d.tzinfo is None:
|
||||
return d.replace(tzinfo = datetime.timezone.utc)
|
||||
return d
|
||||
# Try treating it as a unix timestamp
|
||||
try:
|
||||
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return d
|
||||
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
|
||||
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
classes = snscrape.base.Scraper.__subclasses__()
|
||||
for cls in classes:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
if cls.name is not None:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
classes.extend(cls.__subclasses__())
|
||||
|
||||
args = parser.parse_args()
|
||||
@@ -57,7 +81,13 @@ def main():
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.since is not None and item.date < args.since:
|
||||
logger.info(f'Exiting due to reaching older results than {args.since}')
|
||||
break
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
|
||||
@@ -1,14 +1,26 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: typing.Optional[str]
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class FacebookUserScraper(snscrape.base.Scraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
@@ -16,18 +28,52 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, username, baseUrl):
|
||||
yielded = set()
|
||||
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')):
|
||||
href = a.get('href')
|
||||
if href.startswith(f'/{username}/'):
|
||||
link = urllib.parse.urljoin(baseUrl, href)
|
||||
if link not in yielded:
|
||||
yield snscrape.base.URLItem(link)
|
||||
yielded.add(link)
|
||||
def _clean_url(self, dirtyUrl):
|
||||
u = urllib.parse.urlparse(dirtyUrl)
|
||||
if u.path == '/permalink.php':
|
||||
# Retain only story_fbid and id parameters
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
|
||||
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
|
||||
# No manipulation of the path needed, but strip the query string
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.split('/')[2] in ('photos', 'videos'):
|
||||
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
|
||||
# But to be safe, also handle URLs that don't have that crap correctly.
|
||||
if u.path.count('/') == 4:
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.count('/') == 5:
|
||||
# Strip out the third path component
|
||||
pathcomps = u.path.split('/')
|
||||
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
|
||||
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
|
||||
else:
|
||||
return dirtyUrl
|
||||
else:
|
||||
# If we don't recognise the URL, just return the original one.
|
||||
return dirtyUrl
|
||||
return urllib.parse.urlunsplit(clean)
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||
href = entryA.get('href')
|
||||
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/')):
|
||||
if href != '#' or 'new photo' not in entry.text or 'to the album' not in entry.text:
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
logger.warning(f'Ignoring odd link: {href}')
|
||||
continue
|
||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||
date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||
if contentDiv:
|
||||
content = contentDiv.text
|
||||
else:
|
||||
content = None
|
||||
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
@@ -42,9 +88,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation
|
||||
baseUrl = f'https://www.facebook.com/{username}/'
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
while nextPageLink:
|
||||
@@ -65,7 +109,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
assert response['domops'][0][2] == False
|
||||
assert '__html' in response['domops'][0][3]
|
||||
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
@classmethod
|
||||
|
||||
115
snscrape/modules/gab.py
Normal file
115
snscrape/modules/gab.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import time
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GabPost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class GabUserCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
if mode not in ('posts', 'comments', 'media'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._username = username
|
||||
if mode == 'posts':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}'
|
||||
self._beforeGlue = '?'
|
||||
elif mode == 'comments':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}/comments?includes=post.conversation_parent'
|
||||
self._beforeGlue = '&'
|
||||
elif mode == 'media':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}/media'
|
||||
self._beforeGlue = '?'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
yielded = set()
|
||||
for post in response['data']:
|
||||
if post['post']['id'] not in yielded:
|
||||
yield GabPost(
|
||||
url = f'https://gab.com/{post["post"]["user"]["username"]}/posts/{post["post"]["id"]}',
|
||||
date = datetime.datetime.strptime(post['post']['created_at'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z'),
|
||||
content = post['post']['body'],
|
||||
)
|
||||
yielded.add(post['post']['id'])
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(self._baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.error('User does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']:
|
||||
logger.error('User has no posts')
|
||||
return
|
||||
yield from self._response_to_items(response)
|
||||
if self._mode == 'posts':
|
||||
before = response['data'][-1]['published_at']
|
||||
elif self._mode in ('comments', 'media'):
|
||||
before = 30
|
||||
|
||||
while True:
|
||||
logger.info('Retrieving next page')
|
||||
r = self._get(f'{self._baseUrl}{self._beforeGlue}before={before}', headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
response = json.loads(r.text)
|
||||
yield from self._response_to_items(response)
|
||||
if response['no-more'] or not response['data']:
|
||||
# Last page
|
||||
return
|
||||
if self._mode == 'posts':
|
||||
before = response['data'][-1]['published_at']
|
||||
elif self._mode in ('comments', 'media'):
|
||||
before += 30
|
||||
time.sleep(1) # Gab's API is pretty quick but doesn't like being hammered...
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A Gab username')
|
||||
|
||||
|
||||
class GabUserPostsScraper(GabUserCommonScraper):
|
||||
name = 'gab-user'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('posts', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class GabUserCommentsScraper(GabUserCommonScraper):
|
||||
name = 'gab-user-comments'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('comments', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class GabUserMediaScraper(GabUserCommonScraper):
|
||||
name = 'gab-user-media'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('media', args.username, retries = args.retries)
|
||||
@@ -1,31 +1,72 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstagramUserScraper(snscrape.base.Scraper):
|
||||
name = 'instagram-user'
|
||||
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
thumbnailUrl: str
|
||||
displayUrl: str
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
if mode not in ('User', 'Hashtag'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._name = name
|
||||
|
||||
def _response_to_items(self, response, username):
|
||||
for node in response['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if self._mode == 'User':
|
||||
self._initialUrl = f'https://www.instagram.com/{self._name}/'
|
||||
self._pageName = 'ProfilePage'
|
||||
self._responseContainer = 'user'
|
||||
self._edgeXToMedia = 'edge_owner_to_timeline_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Hashtag':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
|
||||
self._pageName = 'TagPage'
|
||||
self._responseContainer = 'hashtag'
|
||||
self._edgeXToMedia = 'edge_hashtag_to_media'
|
||||
self._pageIDKey = 'name'
|
||||
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","show_ranked":false,"first":10,"after":"{endCursor}"}}'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
code = node['node']['shortcode']
|
||||
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
|
||||
usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||
yield InstagramPost(
|
||||
cleanUrl = cleanUrl,
|
||||
dirtyUrl = f'{cleanUrl}{usernameQuery}',
|
||||
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
||||
thumbnailUrl = node['node']['thumbnail_src'],
|
||||
displayUrl = node['node']['display_url'],
|
||||
)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
|
||||
r = self._get(self._initialUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('User does not exist')
|
||||
logger.warning(f'{self._mode} does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
@@ -33,42 +74,57 @@ class InstagramUserScraper(snscrape.base.Scraper):
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
response = json.loads(jsonData)
|
||||
rhxGis = response['rhx_gis']
|
||||
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
|
||||
logger.info('User has no posts')
|
||||
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
|
||||
logger.info(f'{self._mode} has no posts')
|
||||
return
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
logger.warning('Private account')
|
||||
return
|
||||
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
|
||||
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
|
||||
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
|
||||
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving endCursor = {endCursor!r}')
|
||||
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
|
||||
variables = self._variablesFormat.format(**locals())
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
return
|
||||
yield from self._response_to_items(response['data'], username)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
yield from self._response_to_items(response['data'])
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
|
||||
class InstagramUserScraper(InstagramCommonScraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'An Instagram username')
|
||||
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
return cls('User', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramHashtagScraper(InstagramCommonScraper):
|
||||
name = 'instagram-hashtag'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Hashtag', args.hashtag, retries = args.retries)
|
||||
|
||||
@@ -1,12 +1,24 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Tweet(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
name = 'twitter-search'
|
||||
|
||||
@@ -23,20 +35,21 @@ class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc)
|
||||
content = tweet.find('p', 'tweet-text').text
|
||||
yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content)
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers['content-type'] != 'application/json;charset=utf-8':
|
||||
logger.error(f'Content type of {r.url} is not JSON')
|
||||
return False
|
||||
return True
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {self._query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd'}, headers = headers)
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
@@ -57,6 +70,7 @@ class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'qf': 'off',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
|
||||
100
snscrape/modules/vkontakte.py
Normal file
100
snscrape/modules/vkontakte.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class VKontakteUserScraper(snscrape.base.Scraper):
|
||||
name = 'vkontakte-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
for post in soup.find_all('div', class_ = 'post'):
|
||||
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
|
||||
textDiv = post.find('div', class_ = 'wall_post_text')
|
||||
yield VKontaktePost(
|
||||
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
|
||||
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
|
||||
content = textDiv.text if textDiv else None,
|
||||
)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
baseUrl = f'https://vk.com/{self._username}'
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.error('Wall does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
|
||||
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
|
||||
|
||||
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
|
||||
logger.error('Private profile')
|
||||
return
|
||||
|
||||
newestPost = soup.find('div', class_ = 'post')
|
||||
if not newestPost:
|
||||
logger.info('Wall has no posts')
|
||||
return
|
||||
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
|
||||
# If there is a pinned post, we need its ID for the pagination requests
|
||||
if 'post_fixed' in newestPost.attrs['class']:
|
||||
fixedPostID = newestPost.attrs['id'].split('_')[1]
|
||||
else:
|
||||
fixedPostID = ''
|
||||
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
for offset in itertools.count(start = 10, step = 10):
|
||||
logger.info('Retrieving next page')
|
||||
r = self._post(
|
||||
'https://vk.com/al_wall.php',
|
||||
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
|
||||
headers = headers
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
fields = r.content.split(b'<!>')
|
||||
if fields[5].startswith(b'<div class="page_block no_posts">'):
|
||||
# Reached the end
|
||||
break
|
||||
if not fields[5].startswith(b'<div id="post'):
|
||||
logger.error(f'Got an unknown response: {fields[5][:200]!r}...')
|
||||
break
|
||||
soup = bs4.BeautifulSoup(fields[5], 'lxml', from_encoding = r.encoding)
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A VK username')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
Reference in New Issue
Block a user