mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-09 10:58:28 +03:00
Compare commits
40 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e6aae35304 | ||
|
|
b698a201f5 | ||
|
|
7fe72cf708 | ||
|
|
4651cde447 | ||
|
|
c99cc4b5d3 | ||
|
|
628074d6fc | ||
|
|
64b293bd9e | ||
|
|
180f4dfeb7 | ||
|
|
6d6e3fa16c | ||
|
|
5f7e6936c1 | ||
|
|
e2c05c9e0c | ||
|
|
14e11b28d2 | ||
|
|
1a07b3b7e8 | ||
|
|
4d8cc7bdb9 | ||
|
|
eec83f181e | ||
|
|
fae7432c64 | ||
|
|
757818474d | ||
|
|
e6c934c0b8 | ||
|
|
d2315feec1 | ||
|
|
765ceeeb10 | ||
|
|
731a2e8c8b | ||
|
|
7d1916292c | ||
|
|
0d509c4ba0 | ||
|
|
907a003a59 | ||
|
|
8ada279b57 | ||
|
|
900eae54a6 | ||
|
|
7989af27b5 | ||
|
|
e528ca3f26 | ||
|
|
32a427dac3 | ||
|
|
7001983556 | ||
|
|
64438afc92 | ||
|
|
9e6538556a | ||
|
|
9c8bbf051c | ||
|
|
c6a11298ac | ||
|
|
02cbf6ddf6 | ||
|
|
3817aa59d4 | ||
|
|
46a51008f8 | ||
|
|
f91979eb32 | ||
|
|
85fff319bc | ||
|
|
6b145526b7 |
15
README.md
15
README.md
@@ -2,10 +2,12 @@
|
||||
snscrape is a scraper for social networking services (SNS). It scrapes things like user profiles, hashtags, or searches and returns the discovered items, e.g. the relevant posts.
|
||||
|
||||
The following services are currently supported:
|
||||
* Facebook: user profiles
|
||||
* Google Plus: user profiles
|
||||
* Instagram: user profiles
|
||||
* Twitter: user profiles, hashtags, and searches
|
||||
* Facebook: user profiles and groups
|
||||
* Gab: user profile posts, media, and comments
|
||||
* Google+: user profiles
|
||||
* Instagram: user profiles, hashtags, and locations
|
||||
* Twitter: user profiles, hashtags, searches, threads, and lists (members as well as posts)
|
||||
* VKontakte: user profiles
|
||||
|
||||
## Requirements
|
||||
snscrape requires Python 3.6 or higher. The Python package dependencies are installed automatically when you install snscrape.
|
||||
@@ -26,7 +28,7 @@ To get all tweets by Jason Scott (@textfiles):
|
||||
|
||||
It's usually useful to redirect the output to a file for further processing, e.g. in bash using the filename `@textfiles-tweets`:
|
||||
```bash
|
||||
snscrape twitter-user textfiles >@textfiles-tweets
|
||||
snscrape twitter-user textfiles >twitter-@textfiles
|
||||
```
|
||||
|
||||
To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
@@ -37,6 +39,9 @@ To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
|
||||
It is also possible to use snscrape as a library in Python, but this is currently undocumented.
|
||||
|
||||
## Issue reporting
|
||||
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If possible please run snscrape with `-vv` and `--dump-locals` and include the log output as well as the dump files referenced in the log in the issue. Note that the files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
|
||||
|
||||
## License
|
||||
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
|
||||
|
||||
3
setup.py
3
setup.py
@@ -3,7 +3,6 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
name = 'snscrape',
|
||||
version = '0.2.0',
|
||||
description = 'A social networking service scraper',
|
||||
author = 'JustAnotherArchivist',
|
||||
url = 'https://github.com/JustAnotherArchivist/snscrape',
|
||||
@@ -13,6 +12,8 @@ setuptools.setup(
|
||||
'Programming Language :: Python :: 3.6',
|
||||
],
|
||||
packages = ['snscrape', 'snscrape.modules'],
|
||||
setup_requires = ['setuptools_scm'],
|
||||
use_scm_version = True,
|
||||
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
|
||||
175
snscrape/cli.py
175
snscrape/cli.py
@@ -1,11 +1,131 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import datetime
|
||||
import inspect
|
||||
import logging
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import requests.models
|
||||
# Imported in parse_args() after setting up the logger:
|
||||
#import snscrape.base
|
||||
#import snscrape.modules
|
||||
#import snscrape.version
|
||||
import tempfile
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
## Logging
|
||||
dumpLocals = False
|
||||
logger = logging # Replaced below after setting the logger class
|
||||
|
||||
|
||||
class Logger(logging.Logger):
|
||||
def _log_with_stack(self, level, *args, **kwargs):
|
||||
super().log(level, *args, **kwargs)
|
||||
if dumpLocals:
|
||||
stack = inspect.stack()
|
||||
if len(stack) >= 3:
|
||||
name = _dump_stack_and_locals(stack[2:][::-1])
|
||||
super().log(level, f'Dumped stack and locals to {name}')
|
||||
|
||||
def warning(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.WARNING, *args, **kwargs)
|
||||
|
||||
def error(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.ERROR, *args, **kwargs)
|
||||
|
||||
def critical(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.CRITICAL, *args, **kwargs)
|
||||
|
||||
def log(self, level, *args, **kwargs):
|
||||
if level >= logging.WARNING:
|
||||
self._log_with_stack(level, *args, **kwargs)
|
||||
else:
|
||||
super().log(level, *args, **kwargs)
|
||||
|
||||
|
||||
def _requests_preparedrequest_repr(name, request):
|
||||
ret = []
|
||||
ret.append(repr(request))
|
||||
ret.append(f'\n {name}.method = {request.method}')
|
||||
ret.append(f'\n {name}.url = {request.url}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in request.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", request.headers[field])}')
|
||||
if request.body:
|
||||
ret.append(f'\n {name}.body = ')
|
||||
ret.append(_repr('_', request.body).replace('\n', '\n '))
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _requests_response_repr(name, response, withHistory = True):
|
||||
ret = []
|
||||
ret.append(repr(response))
|
||||
ret.append(f'\n {name}.url = {response.url}')
|
||||
ret.append(f'\n {name}.request = ')
|
||||
ret.append(_repr('_', response.request).replace('\n', '\n '))
|
||||
if withHistory and response.history:
|
||||
ret.append(f'\n {name}.history = [')
|
||||
for previousResponse in response.history:
|
||||
ret.append(f'\n ')
|
||||
ret.append(_requests_response_repr('_', previousResponse, withHistory = False).replace('\n', '\n '))
|
||||
ret.append('\n ]')
|
||||
ret.append(f'\n {name}.status_code = {response.status_code}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in response.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", response.headers[field])}')
|
||||
ret.append(f'\n {name}.content = {_repr("_", response.content)}')
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _repr(name, value):
|
||||
if type(value) is requests.models.Response:
|
||||
return _requests_response_repr(name, value)
|
||||
if type(value) is requests.models.PreparedRequest:
|
||||
return _requests_preparedrequest_repr(name, value)
|
||||
valueRepr = repr(value)
|
||||
if '\n' in valueRepr:
|
||||
return ''.join(['\\\n ', valueRepr.replace('\n', '\n ')])
|
||||
return valueRepr
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dump_locals_on_exception():
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
trace = inspect.trace()
|
||||
if len(trace) >= 2:
|
||||
name = _dump_stack_and_locals(trace[1:])
|
||||
logger.fatal(f'Dumped stack and locals to {name}')
|
||||
raise
|
||||
|
||||
|
||||
def _dump_stack_and_locals(trace):
|
||||
with tempfile.NamedTemporaryFile('w', prefix = 'snscrape_locals_', delete = False) as fp:
|
||||
fp.write('Stack:\n')
|
||||
for frameRecord in trace:
|
||||
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
|
||||
for line in frameRecord.code_context:
|
||||
fp.write(f' {line.strip()}\n')
|
||||
fp.write('\n')
|
||||
|
||||
for frameRecord in trace:
|
||||
module = inspect.getmodule(frameRecord[0])
|
||||
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
|
||||
continue
|
||||
locals_ = frameRecord[0].f_locals
|
||||
fp.write(f'Locals from file "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}:\n')
|
||||
for variableName in locals_:
|
||||
variable = locals_[variableName]
|
||||
varRepr = _repr(variableName, variable)
|
||||
fp.write(f' {variableName} {type(variable)} = ')
|
||||
fp.write(varRepr.replace('\n', '\n '))
|
||||
fp.write('\n')
|
||||
fp.write('\n')
|
||||
if 'self' in locals_ and hasattr(locals_['self'], '__dict__'):
|
||||
fp.write(f'Object dict:\n')
|
||||
fp.write(repr(locals_['self'].__dict__))
|
||||
fp.write('\n\n')
|
||||
name = fp.name
|
||||
return name
|
||||
|
||||
|
||||
def parse_datetime_arg(arg):
|
||||
@@ -29,8 +149,14 @@ def parse_datetime_arg(arg):
|
||||
|
||||
|
||||
def parse_args():
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import snscrape.version
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('--version', action = 'version', version = f'snscrape {snscrape.version.__version__}')
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--dump-locals', dest = 'dumpLocals', action = 'store_true', default = False, help = 'Dump local variables on serious log messages (warnings or higher)')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
@@ -55,7 +181,16 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(verbosity):
|
||||
def setup_logging():
|
||||
logging.setLoggerClass(Logger)
|
||||
global logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def configure_logging(verbosity, dumpLocals_):
|
||||
global dumpLocals
|
||||
dumpLocals = dumpLocals_
|
||||
|
||||
rootLogger = logging.getLogger()
|
||||
|
||||
# Set level
|
||||
@@ -68,6 +203,10 @@ def setup_logging(verbosity):
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
|
||||
|
||||
# Remove existing handlers
|
||||
for handler in rootLogger.handlers:
|
||||
rootLogger.removeHandler(handler)
|
||||
|
||||
# Add stream handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
@@ -75,21 +214,23 @@ def setup_logging(verbosity):
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
args = parse_args()
|
||||
setup_logging(args.verbosity)
|
||||
configure_logging(args.verbosity, args.dumpLocals)
|
||||
scraper = args.cls.from_args(args)
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
if args.since is not None and item.date < args.since:
|
||||
logger.info(f'Exiting due to reaching older results than {args.since}')
|
||||
break
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
with _dump_locals_on_exception():
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
if args.since is not None and item.date < args.since:
|
||||
logger.info(f'Exiting due to reaching older results than {args.since}')
|
||||
break
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
logger.info(f'Done, found {i} results')
|
||||
|
||||
@@ -16,24 +16,31 @@ class FacebookPost(typing.NamedTuple, snscrape.base.Item):
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: typing.Optional[str]
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class FacebookUserScraper(snscrape.base.Scraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
class FacebookCommonScraper(snscrape.base.Scraper):
|
||||
def _clean_url(self, dirtyUrl):
|
||||
u = urllib.parse.urlparse(dirtyUrl)
|
||||
if u.path == '/permalink.php':
|
||||
# Retain only story_fbid and id parameters
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
|
||||
elif u.path == '/photo.php':
|
||||
# Retain only the fbid parameter
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('fbid', q['fbid'][0]),)), '')
|
||||
elif u.path == '/media/set/':
|
||||
# Retain only the set parameter and try to shorten it to the minimum
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
setVal = q['set'][0]
|
||||
if setVal.rstrip('0123456789').endswith('.a.'):
|
||||
setVal = f'a.{setVal.rsplit(".", 1)[1]}'
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('set', setVal),)), '')
|
||||
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
|
||||
# No manipulation of the path needed, but strip the query string
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
@@ -54,23 +61,70 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
return dirtyUrl
|
||||
return urllib.parse.urlunsplit(clean)
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
def _is_odd_link(self, href, entryText, mode):
|
||||
# Returns (isOddLink: bool, warn: bool|None)
|
||||
if mode == 'user':
|
||||
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/', '/photo.php?', '/media/set/')):
|
||||
if href == '#' and 'new photo' in entryText and 'to the album' in entryText:
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
return True, False
|
||||
elif href.startswith('/business/help/788160621327601/?'):
|
||||
# Skip the help article about branded content
|
||||
return True, False
|
||||
else:
|
||||
return True, True
|
||||
return False, None
|
||||
elif mode == 'group':
|
||||
if not re.match(r'^/groups/[^/]+/permalink/\d+/(\?|$)', href):
|
||||
return True, True
|
||||
return False, None
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl, mode):
|
||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||
href = entryA.get('href')
|
||||
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/')):
|
||||
if href != '#' or 'new photo' not in entry.text or 'to the album' not in entry.text:
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
mediaSetA = entry.find('a', class_ = '_17z-')
|
||||
if not mediaSetA and not entryA:
|
||||
logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}')
|
||||
continue
|
||||
if mediaSetA and (not entryA or entryA['href'] == '#'):
|
||||
href = mediaSetA['href']
|
||||
elif entryA:
|
||||
href = entryA['href']
|
||||
oddLink, warn = self._is_odd_link(href, entry.text, mode)
|
||||
if oddLink:
|
||||
if warn:
|
||||
logger.warning(f'Ignoring odd link: {href}')
|
||||
continue
|
||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||
date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||
if contentDiv:
|
||||
content = contentDiv.text
|
||||
else:
|
||||
content = None
|
||||
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content)
|
||||
outlinks = []
|
||||
for a in entry.find_all('a'):
|
||||
if not a.has_attr('href'):
|
||||
continue
|
||||
href = a.get('href')
|
||||
if not href.startswith('https://l.facebook.com/l.php?'):
|
||||
continue
|
||||
query = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
|
||||
if 'u' not in query or len(query['u']) != 1:
|
||||
logger.warning(f'Ignoring odd outlink: {href}')
|
||||
continue
|
||||
outlink = query['u'][0]
|
||||
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
|
||||
outlinks.append(outlink)
|
||||
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
|
||||
|
||||
|
||||
class FacebookUserScraper(FacebookCommonScraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
@@ -88,7 +142,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
while nextPageLink:
|
||||
@@ -109,7 +163,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
assert response['domops'][0][2] == False
|
||||
assert '__html' in response['domops'][0][3]
|
||||
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
@classmethod
|
||||
@@ -119,3 +173,72 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
|
||||
class FacebookGroupScraper(FacebookCommonScraper):
|
||||
name = 'facebook-group'
|
||||
|
||||
def __init__(self, group, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._group = group
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
pageletDataPattern = re.compile(r'"GroupEntstreamPagelet",\{.*?\}(?=,\{)')
|
||||
pageletDataPrefixLength = len('"GroupEntstreamPagelet",')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
|
||||
baseUrl = f'https://www.facebook.com/groups/{self._group}/'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('Group does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
if 'content:{pagelet_group_mall:{container_id:"' not in r.text:
|
||||
logger.error('Code container ID marker not found (does the group exist?)')
|
||||
return
|
||||
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Posts are inside an HTML comment in two code tags with IDs listed in JS...
|
||||
for codeContainerIdStart in ('content:{pagelet_group_mall:{container_id:"', 'content:{group_mall_after_tti:{container_id:"'):
|
||||
codeContainerIdPos = r.text.index(codeContainerIdStart) + len(codeContainerIdStart)
|
||||
codeContainerId = r.text[codeContainerIdPos : r.text.index('"', codeContainerIdPos)]
|
||||
codeContainer = soup.find('code', id = codeContainerId)
|
||||
if not codeContainer:
|
||||
raise RuntimeError('Code container not found')
|
||||
if type(codeContainer.string) is not bs4.element.Comment:
|
||||
raise RuntimeError('Code container does not contain a comment')
|
||||
codeSoup = bs4.BeautifulSoup(codeContainer.string, 'lxml')
|
||||
yield from self._soup_to_items(codeSoup, baseUrl, 'group')
|
||||
|
||||
# Pagination
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
while True:
|
||||
# As on the user profile pages, the web app sends a lot of additional parameters, but those all seem to be unnecessary (although some change the response format, e.g. from JSON to HTML)
|
||||
r = self._get(
|
||||
f'https://www.facebook.com/ajax/pagelet/generic.php/GroupEntstreamPagelet',
|
||||
params = {'data': data, '__a': 1},
|
||||
headers = headers,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise RuntimeError(f'Got status code {r.status_code}')
|
||||
obj = json.loads(spuriousForLoopPattern.sub('', r.text))
|
||||
if obj['payload'] == '':
|
||||
# End of pagination
|
||||
break
|
||||
soup = bs4.BeautifulSoup(obj['payload'], 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl, 'group')
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('group', help = 'A group name or ID')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.group, retries = args.retries)
|
||||
|
||||
@@ -24,7 +24,7 @@ class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
if mode not in ('User', 'Hashtag'):
|
||||
if mode not in ('User', 'Hashtag', 'Location'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._name = name
|
||||
@@ -44,7 +44,15 @@ class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
self._edgeXToMedia = 'edge_hashtag_to_media'
|
||||
self._pageIDKey = 'name'
|
||||
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","show_ranked":false,"first":10,"after":"{endCursor}"}}'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Location':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/locations/{self._name}/'
|
||||
self._pageName = 'LocationsPage'
|
||||
self._responseContainer = 'location'
|
||||
self._edgeXToMedia = 'edge_location_to_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
@@ -60,20 +68,40 @@ class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
displayUrl = node['node']['display_url'],
|
||||
)
|
||||
|
||||
def _check_initial_page_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return True, None
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
try:
|
||||
obj = json.loads(jsonData)
|
||||
except json.JSONDecodeError:
|
||||
return False, 'invalid JSON'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return False, f'status code {r.status_code}'
|
||||
try:
|
||||
obj = json.loads(r.text)
|
||||
except json.JSONDecodeError as e:
|
||||
return False, f'invalid JSON ({e!r})'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(self._initialUrl, headers = headers)
|
||||
r = self._get(self._initialUrl, headers = headers, responseOkCallback = self._check_initial_page_callback)
|
||||
if r.status_code == 404:
|
||||
logger.warning(f'{self._mode} does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
response = json.loads(jsonData)
|
||||
rhxGis = response['rhx_gis']
|
||||
response = r._snscrape_json_obj
|
||||
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
|
||||
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
|
||||
logger.info(f'{self._mode} has no posts')
|
||||
return
|
||||
@@ -91,13 +119,13 @@ class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
variables = self._variablesFormat.format(**locals())
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers)
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
response = r._snscrape_json_obj
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
return
|
||||
yield from self._response_to_items(response['data'])
|
||||
@@ -128,3 +156,15 @@ class InstagramHashtagScraper(InstagramCommonScraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Hashtag', args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramLocationScraper(InstagramCommonScraper):
|
||||
name = 'instagram-location'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('locationid', help = 'An Instagram location ID', type = int)
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Location', args.locationid, retries = args.retries)
|
||||
|
||||
@@ -14,49 +14,119 @@ class Tweet(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
id: int
|
||||
username: str
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
tcooutlinks: list
|
||||
tcooutlinksss: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
name = 'twitter-search'
|
||||
class Account(typing.NamedTuple, snscrape.base.Item):
|
||||
username: str
|
||||
|
||||
def __init__(self, query, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
@property
|
||||
def url(self):
|
||||
return f'https://twitter.com/{self.username}'
|
||||
|
||||
def _get_feed_from_html(self, html):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
return feed
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterCommonScraper(snscrape.base.Scraper):
|
||||
def _feed_to_items(self, feed):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc)
|
||||
content = tweet.find('p', 'tweet-text').text
|
||||
yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content)
|
||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||
|
||||
date = None
|
||||
timestampA = tweet.find('a', 'tweet-timestamp')
|
||||
if timestampA:
|
||||
timestampSpan = timestampA.find('span', '_timestamp')
|
||||
if timestampSpan and timestampSpan.has_attr('data-time'):
|
||||
date = datetime.datetime.fromtimestamp(int(timestampSpan['data-time']), datetime.timezone.utc)
|
||||
if not date:
|
||||
logger.warning(f'Failed to extract date for {url}')
|
||||
|
||||
contentP = tweet.find('p', 'tweet-text')
|
||||
content = None
|
||||
outlinks = []
|
||||
tcooutlinks = []
|
||||
if contentP:
|
||||
content = contentP.text
|
||||
for a in contentP.find_all('a'):
|
||||
if a.has_attr('href') and not a['href'].startswith('/') and (not a.has_attr('class') or 'u-hidden' not in a['class']):
|
||||
if a.has_attr('data-expanded-url'):
|
||||
outlinks.append(a['data-expanded-url'])
|
||||
else:
|
||||
logger.warning(f'Ignoring link without expanded URL on {url}: {a["href"]}')
|
||||
tcooutlinks.append(a['href'])
|
||||
else:
|
||||
logger.warning(f'Failed to extract content for {url}')
|
||||
card = tweet.find('div', 'card2')
|
||||
if card and 'has-autoplayable-media' not in card['class']:
|
||||
for div in card.find_all('div'):
|
||||
if div.has_attr('data-card-url'):
|
||||
outlinks.append(div['data-card-url'])
|
||||
tcooutlinks.append(div['data-card-url'])
|
||||
outlinks = list(dict.fromkeys(outlinks)) # Deduplicate in case the same link was shared more than once within this tweet; may change order on Python 3.6 or older
|
||||
tcooutlinks = list(dict.fromkeys(tcooutlinks))
|
||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
return True, None
|
||||
|
||||
|
||||
class TwitterSearchScraper(TwitterCommonScraper):
|
||||
name = 'twitter-search'
|
||||
|
||||
def __init__(self, query, maxPosition = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
self._maxPosition = maxPosition
|
||||
|
||||
def _get_feed_from_html(self, html, withMinPosition):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
if withMinPosition:
|
||||
streamContainer = soup.find('div', 'stream-container')
|
||||
if not streamContainer or not streamContainer.has_attr('data-min-position'):
|
||||
if soup.find('div', 'SearchEmptyTimeline'):
|
||||
# No results found
|
||||
minPosition = None
|
||||
else:
|
||||
# Unknown error condition
|
||||
raise RuntimeError('Unable to find min-position')
|
||||
else:
|
||||
minPosition = streamContainer['data-min-position']
|
||||
else:
|
||||
minPosition = None
|
||||
return feed, minPosition
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {self._query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
|
||||
if self._maxPosition is None:
|
||||
logger.info(f'Retrieving search page for {self._query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
feed, maxPosition = self._get_feed_from_html(r.text, True)
|
||||
if not feed:
|
||||
logger.warning(f'No results for {self._query}')
|
||||
return
|
||||
yield from self._feed_to_items(feed)
|
||||
else:
|
||||
maxPosition = self._maxPosition
|
||||
|
||||
if not maxPosition:
|
||||
return
|
||||
newestID = feed[0]['data-item-id']
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
||||
@@ -69,26 +139,29 @@ class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
'include_available_features': '1',
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'src': 'spxr',
|
||||
'qf': 'off',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback)
|
||||
|
||||
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
|
||||
if not feed:
|
||||
obj = json.loads(r.text)
|
||||
feed, _ = self._get_feed_from_html(obj['items_html'], False)
|
||||
if feed:
|
||||
yield from self._feed_to_items(feed)
|
||||
if obj['min_position'] == maxPosition:
|
||||
return
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
maxPosition = obj['min_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition')
|
||||
subparser.add_argument('query', help = 'A Twitter search string')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.query, retries = args.retries)
|
||||
return cls(args.query, maxPosition = args.maxPosition, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterUserScraper(TwitterSearchScraper):
|
||||
@@ -120,3 +193,136 @@ class TwitterHashtagScraper(TwitterSearchScraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterThreadScraper(TwitterCommonScraper):
|
||||
name = 'twitter-thread'
|
||||
|
||||
def __init__(self, tweetID = None, **kwargs):
|
||||
if tweetID is not None and tweetID.strip('0123456789') != '':
|
||||
raise ValueError('Invalid tweet ID, must be numeric')
|
||||
super().__init__(**kwargs)
|
||||
self._tweetID = tweetID
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
||||
|
||||
# Fetch the page of the last tweet in the thread
|
||||
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Extract tweets on that page in the correct order; first, the tweet that was supplied, then the ancestors with pagination if necessary
|
||||
tweet = soup.find('div', 'ThreadedConversation--permalinkTweetWithAncestors')
|
||||
if tweet:
|
||||
tweet = tweet.find('div', 'tweet')
|
||||
if not tweet:
|
||||
logger.warning('Tweet does not exist, is not a thread, or does not have ancestors')
|
||||
return
|
||||
items = list(self._feed_to_items([tweet]))
|
||||
assert len(items) == 1
|
||||
yield items[0]
|
||||
username = items[0].username
|
||||
|
||||
ancestors = soup.find('div', 'ThreadedConversation--ancestors')
|
||||
if not ancestors:
|
||||
logger.warning('Tweet does not have ancestors despite claiming to')
|
||||
return
|
||||
feed = reversed(ancestors.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
# If necessary, iterate through pagination until reaching the initial tweet
|
||||
streamContainer = ancestors.find('div', 'stream-container')
|
||||
if not streamContainer.has_attr('data-max-position') or streamContainer['data-max-position'] == '':
|
||||
return
|
||||
minPosition = streamContainer['data-max-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'https://twitter.com/i/{username}/conversation/{self._tweetID}?include_available_features=1&include_entities=1&min_position={minPosition}',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
feed = reversed(soup.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
minPosition = obj['max_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('tweetID', help = 'A tweet ID of the last tweet in a thread')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(tweetID = args.tweetID, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListPostsScraper(TwitterSearchScraper):
|
||||
name = 'twitter-list-posts'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(f'list:{listName}', **kwargs)
|
||||
self._listName = listName
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListMembersScraper(TwitterCommonScraper):
|
||||
name = 'twitter-list-members'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._user, self._list = listName.split('/')
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
||||
|
||||
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.warning('List not found')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
container = soup.find('div', 'stream-container')
|
||||
if not container:
|
||||
raise RuntimeError('Unable to find container')
|
||||
items = container.find_all('li', 'js-stream-item')
|
||||
if not items:
|
||||
logger.warning('Empty list')
|
||||
return
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
|
||||
if not container.has_attr('data-min-position') or container['data-min-position'] == '':
|
||||
return
|
||||
maxPosition = container['data-min-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'{baseUrl}/timeline?include_available_features=1&include_entities=1&max_position={maxPosition}&reset_error_state=false',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
items = soup.find_all('li', 'js-stream-item')
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
maxPosition = obj['min_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
7
snscrape/version.py
Normal file
7
snscrape/version.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import pkg_resources
|
||||
|
||||
|
||||
try:
|
||||
__version__ = pkg_resources.get_distribution('snscrape').version
|
||||
except pkg_resources.DistributionNotFound:
|
||||
__version__ = None
|
||||
Reference in New Issue
Block a user