mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 18:48:28 +03:00
Compare commits
76 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b3c7deb28 | ||
|
|
040a11656c | ||
|
|
1459245258 | ||
|
|
dbe4c5ce55 | ||
|
|
80491ecc2c | ||
|
|
1a71b58101 | ||
|
|
0ce37a69d4 | ||
|
|
722bfd5f7c | ||
|
|
b6cc3180d9 | ||
|
|
613395d1c2 | ||
|
|
82a87b7b5a | ||
|
|
9568028bf9 | ||
|
|
6df351772e | ||
|
|
541173b0c8 | ||
|
|
b6772d3778 | ||
|
|
20ea117a2c | ||
|
|
ff54c350bc | ||
|
|
e6aae35304 | ||
|
|
b698a201f5 | ||
|
|
7fe72cf708 | ||
|
|
4651cde447 | ||
|
|
c99cc4b5d3 | ||
|
|
628074d6fc | ||
|
|
64b293bd9e | ||
|
|
180f4dfeb7 | ||
|
|
6d6e3fa16c | ||
|
|
5f7e6936c1 | ||
|
|
e2c05c9e0c | ||
|
|
14e11b28d2 | ||
|
|
1a07b3b7e8 | ||
|
|
4d8cc7bdb9 | ||
|
|
eec83f181e | ||
|
|
fae7432c64 | ||
|
|
757818474d | ||
|
|
e6c934c0b8 | ||
|
|
d2315feec1 | ||
|
|
765ceeeb10 | ||
|
|
731a2e8c8b | ||
|
|
7d1916292c | ||
|
|
0d509c4ba0 | ||
|
|
907a003a59 | ||
|
|
8ada279b57 | ||
|
|
900eae54a6 | ||
|
|
7989af27b5 | ||
|
|
e528ca3f26 | ||
|
|
32a427dac3 | ||
|
|
7001983556 | ||
|
|
64438afc92 | ||
|
|
9e6538556a | ||
|
|
9c8bbf051c | ||
|
|
c6a11298ac | ||
|
|
02cbf6ddf6 | ||
|
|
3817aa59d4 | ||
|
|
46a51008f8 | ||
|
|
f91979eb32 | ||
|
|
85fff319bc | ||
|
|
6b145526b7 | ||
|
|
abf31764b1 | ||
|
|
64693f74bb | ||
|
|
a7d08ed51c | ||
|
|
f48ca7726e | ||
|
|
78c295f7e0 | ||
|
|
a5aca1a14f | ||
|
|
96f7d871c1 | ||
|
|
b5dfd37949 | ||
|
|
b511397791 | ||
|
|
536fcb3303 | ||
|
|
f8d812f799 | ||
|
|
c2cebd9166 | ||
|
|
73bc99596f | ||
|
|
8458c12218 | ||
|
|
b59c7e8d8f | ||
|
|
3ceb849d98 | ||
|
|
f5ee1f7ac5 | ||
|
|
1984110f78 | ||
|
|
c5a5dcb92c |
18
README.md
18
README.md
@@ -2,10 +2,11 @@
|
||||
snscrape is a scraper for social networking services (SNS). It scrapes things like user profiles, hashtags, or searches and returns the discovered items, e.g. the relevant posts.
|
||||
|
||||
The following services are currently supported:
|
||||
* Facebook: user profiles
|
||||
* Google Plus: user profiles
|
||||
* Instagram: user profiles
|
||||
* Twitter: user profiles, hashtags, and searches
|
||||
* Facebook: user profiles and groups
|
||||
* Instagram: user profiles, hashtags, and locations
|
||||
* Telegram: channels
|
||||
* Twitter: user profiles, hashtags, searches, threads, and lists (members as well as posts)
|
||||
* VKontakte: user profiles
|
||||
|
||||
## Requirements
|
||||
snscrape requires Python 3.6 or higher. The Python package dependencies are installed automatically when you install snscrape.
|
||||
@@ -13,6 +14,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
|
||||
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
|
||||
|
||||
## Installation
|
||||
pip3 install snscrape
|
||||
|
||||
If you want to use the development version:
|
||||
|
||||
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
|
||||
|
||||
## Usage
|
||||
@@ -22,7 +27,7 @@ To get all tweets by Jason Scott (@textfiles):
|
||||
|
||||
It's usually useful to redirect the output to a file for further processing, e.g. in bash using the filename `@textfiles-tweets`:
|
||||
```bash
|
||||
snscrape twitter-user textfiles >@textfiles-tweets
|
||||
snscrape twitter-user textfiles >twitter-@textfiles
|
||||
```
|
||||
|
||||
To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
@@ -33,6 +38,9 @@ To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
|
||||
It is also possible to use snscrape as a library in Python, but this is currently undocumented.
|
||||
|
||||
## Issue reporting
|
||||
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If possible please run snscrape with `-vv` and `--dump-locals` and include the log output as well as the dump files referenced in the log in the issue. Note that the files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
|
||||
|
||||
## License
|
||||
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
|
||||
|
||||
5
setup.py
5
setup.py
@@ -3,7 +3,6 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
name = 'snscrape',
|
||||
version = '0.1.3',
|
||||
description = 'A social networking service scraper',
|
||||
author = 'JustAnotherArchivist',
|
||||
url = 'https://github.com/JustAnotherArchivist/snscrape',
|
||||
@@ -13,7 +12,9 @@ setuptools.setup(
|
||||
'Programming Language :: Python :: 3.6',
|
||||
],
|
||||
packages = ['snscrape', 'snscrape.modules'],
|
||||
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
|
||||
setup_requires = ['setuptools_scm'],
|
||||
use_scm_version = True,
|
||||
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'snscrape = snscrape.cli:main',
|
||||
|
||||
207
snscrape/cli.py
207
snscrape/cli.py
@@ -1,25 +1,181 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import datetime
|
||||
import inspect
|
||||
import logging
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import requests.models
|
||||
# Imported in parse_args() after setting up the logger:
|
||||
#import snscrape.base
|
||||
#import snscrape.modules
|
||||
#import snscrape.version
|
||||
import tempfile
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
## Logging
|
||||
dumpLocals = False
|
||||
logger = logging # Replaced below after setting the logger class
|
||||
|
||||
|
||||
class Logger(logging.Logger):
|
||||
def _log_with_stack(self, level, *args, **kwargs):
|
||||
super().log(level, *args, **kwargs)
|
||||
if dumpLocals:
|
||||
stack = inspect.stack()
|
||||
if len(stack) >= 3:
|
||||
name = _dump_stack_and_locals(stack[2:][::-1])
|
||||
super().log(level, f'Dumped stack and locals to {name}')
|
||||
|
||||
def warning(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.WARNING, *args, **kwargs)
|
||||
|
||||
def error(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.ERROR, *args, **kwargs)
|
||||
|
||||
def critical(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.CRITICAL, *args, **kwargs)
|
||||
|
||||
def log(self, level, *args, **kwargs):
|
||||
if level >= logging.WARNING:
|
||||
self._log_with_stack(level, *args, **kwargs)
|
||||
else:
|
||||
super().log(level, *args, **kwargs)
|
||||
|
||||
|
||||
def _requests_preparedrequest_repr(name, request):
|
||||
ret = []
|
||||
ret.append(repr(request))
|
||||
ret.append(f'\n {name}.method = {request.method}')
|
||||
ret.append(f'\n {name}.url = {request.url}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in request.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", request.headers[field])}')
|
||||
if request.body:
|
||||
ret.append(f'\n {name}.body = ')
|
||||
ret.append(_repr('_', request.body).replace('\n', '\n '))
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _requests_response_repr(name, response, withHistory = True):
|
||||
ret = []
|
||||
ret.append(repr(response))
|
||||
ret.append(f'\n {name}.url = {response.url}')
|
||||
ret.append(f'\n {name}.request = ')
|
||||
ret.append(_repr('_', response.request).replace('\n', '\n '))
|
||||
if withHistory and response.history:
|
||||
ret.append(f'\n {name}.history = [')
|
||||
for previousResponse in response.history:
|
||||
ret.append(f'\n ')
|
||||
ret.append(_requests_response_repr('_', previousResponse, withHistory = False).replace('\n', '\n '))
|
||||
ret.append('\n ]')
|
||||
ret.append(f'\n {name}.status_code = {response.status_code}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in response.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", response.headers[field])}')
|
||||
ret.append(f'\n {name}.content = {_repr("_", response.content)}')
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _repr(name, value):
|
||||
if type(value) is requests.models.Response:
|
||||
return _requests_response_repr(name, value)
|
||||
if type(value) is requests.models.PreparedRequest:
|
||||
return _requests_preparedrequest_repr(name, value)
|
||||
valueRepr = repr(value)
|
||||
if '\n' in valueRepr:
|
||||
return ''.join(['\\\n ', valueRepr.replace('\n', '\n ')])
|
||||
return valueRepr
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dump_locals_on_exception():
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
trace = inspect.trace()
|
||||
if len(trace) >= 2:
|
||||
name = _dump_stack_and_locals(trace[1:], exc = e)
|
||||
logger.fatal(f'Dumped stack and locals to {name}')
|
||||
raise
|
||||
|
||||
|
||||
def _dump_stack_and_locals(trace, exc = None):
|
||||
with tempfile.NamedTemporaryFile('w', prefix = 'snscrape_locals_', delete = False) as fp:
|
||||
if exc is not None:
|
||||
fp.write('Exception:\n')
|
||||
fp.write(f' {type(exc).__module__}.{type(exc).__name__}: {exc!s}\n')
|
||||
fp.write(f' args: {exc.args!r}\n')
|
||||
fp.write('\n')
|
||||
|
||||
fp.write('Stack:\n')
|
||||
for frameRecord in trace:
|
||||
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
|
||||
for line in frameRecord.code_context:
|
||||
fp.write(f' {line.strip()}\n')
|
||||
fp.write('\n')
|
||||
|
||||
for frameRecord in trace:
|
||||
module = inspect.getmodule(frameRecord[0])
|
||||
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
|
||||
continue
|
||||
locals_ = frameRecord[0].f_locals
|
||||
fp.write(f'Locals from file "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}:\n')
|
||||
for variableName in locals_:
|
||||
variable = locals_[variableName]
|
||||
varRepr = _repr(variableName, variable)
|
||||
fp.write(f' {variableName} {type(variable)} = ')
|
||||
fp.write(varRepr.replace('\n', '\n '))
|
||||
fp.write('\n')
|
||||
fp.write('\n')
|
||||
if 'self' in locals_ and hasattr(locals_['self'], '__dict__'):
|
||||
fp.write(f'Object dict:\n')
|
||||
fp.write(repr(locals_['self'].__dict__))
|
||||
fp.write('\n\n')
|
||||
name = fp.name
|
||||
return name
|
||||
|
||||
|
||||
def parse_datetime_arg(arg):
|
||||
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
|
||||
try:
|
||||
d = datetime.datetime.strptime(arg, format)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
if d.tzinfo is None:
|
||||
return d.replace(tzinfo = datetime.timezone.utc)
|
||||
return d
|
||||
# Try treating it as a unix timestamp
|
||||
try:
|
||||
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return d
|
||||
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
|
||||
|
||||
|
||||
def parse_args():
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import snscrape.version
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('--version', action = 'version', version = f'snscrape {snscrape.version.__version__}')
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--dump-locals', dest = 'dumpLocals', action = 'store_true', default = False, help = 'Dump local variables on serious log messages (warnings or higher)')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
|
||||
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
classes = snscrape.base.Scraper.__subclasses__()
|
||||
for cls in classes:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
if cls.name is not None:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
classes.extend(cls.__subclasses__())
|
||||
|
||||
args = parser.parse_args()
|
||||
@@ -31,7 +187,16 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(verbosity):
|
||||
def setup_logging():
|
||||
logging.setLoggerClass(Logger)
|
||||
global logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def configure_logging(verbosity, dumpLocals_):
|
||||
global dumpLocals
|
||||
dumpLocals = dumpLocals_
|
||||
|
||||
rootLogger = logging.getLogger()
|
||||
|
||||
# Set level
|
||||
@@ -44,6 +209,10 @@ def setup_logging(verbosity):
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
|
||||
|
||||
# Remove existing handlers
|
||||
for handler in rootLogger.handlers:
|
||||
rootLogger.removeHandler(handler)
|
||||
|
||||
# Add stream handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
@@ -51,15 +220,23 @@ def setup_logging(verbosity):
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
args = parse_args()
|
||||
setup_logging(args.verbosity)
|
||||
configure_logging(args.verbosity, args.dumpLocals)
|
||||
scraper = args.cls.from_args(args)
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
with _dump_locals_on_exception():
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
if args.since is not None and item.date < args.since:
|
||||
logger.info(f'Exiting due to reaching older results than {args.since}')
|
||||
break
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
|
||||
@@ -1,33 +1,135 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FacebookUserScraper(snscrape.base.Scraper):
|
||||
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: typing.Optional[str]
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class FacebookCommonScraper(snscrape.base.Scraper):
|
||||
def _clean_url(self, dirtyUrl):
|
||||
u = urllib.parse.urlparse(dirtyUrl)
|
||||
if u.path == '/permalink.php':
|
||||
# Retain only story_fbid and id parameters
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
|
||||
elif u.path == '/photo.php':
|
||||
# Retain only the fbid parameter
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('fbid', q['fbid'][0]),)), '')
|
||||
elif u.path == '/media/set/':
|
||||
# Retain only the set parameter and try to shorten it to the minimum
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
setVal = q['set'][0]
|
||||
if setVal.rstrip('0123456789').endswith('.a.'):
|
||||
setVal = f'a.{setVal.rsplit(".", 1)[1]}'
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('set', setVal),)), '')
|
||||
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
|
||||
# No manipulation of the path needed, but strip the query string
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.split('/')[2] in ('photos', 'videos'):
|
||||
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
|
||||
# But to be safe, also handle URLs that don't have that crap correctly.
|
||||
if u.path.count('/') == 4:
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.count('/') == 5:
|
||||
# Strip out the third path component
|
||||
pathcomps = u.path.split('/')
|
||||
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
|
||||
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
|
||||
else:
|
||||
return dirtyUrl
|
||||
else:
|
||||
# If we don't recognise the URL, just return the original one.
|
||||
return dirtyUrl
|
||||
return urllib.parse.urlunsplit(clean)
|
||||
|
||||
def _is_odd_link(self, href, entryText, mode):
|
||||
# Returns (isOddLink: bool, warn: bool|None)
|
||||
if mode == 'user':
|
||||
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/', '/photo.php?', '/media/set/')):
|
||||
if href == '#' and 'new photo' in entryText and 'to the album' in entryText:
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
return True, False
|
||||
elif href.startswith('/business/help/788160621327601/?'):
|
||||
# Skip the help article about branded content
|
||||
return True, False
|
||||
else:
|
||||
return True, True
|
||||
return False, None
|
||||
elif mode == 'group':
|
||||
if not re.match(r'^/groups/[^/]+/permalink/\d+/(\?|$)', href):
|
||||
return True, True
|
||||
return False, None
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl, mode):
|
||||
cleanUrl = None # Value from previous iteration is used for warning on link-less entries
|
||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||
mediaSetA = entry.find('a', class_ = '_17z-')
|
||||
if not mediaSetA and not entryA:
|
||||
logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}')
|
||||
continue
|
||||
if mediaSetA and (not entryA or entryA['href'] == '#'):
|
||||
href = mediaSetA['href']
|
||||
elif entryA:
|
||||
href = entryA['href']
|
||||
oddLink, warn = self._is_odd_link(href, entry.text, mode)
|
||||
if oddLink:
|
||||
if warn:
|
||||
logger.warning(f'Ignoring odd link: {href}')
|
||||
continue
|
||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||
cleanUrl = self._clean_url(dirtyUrl)
|
||||
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||
if contentDiv:
|
||||
content = contentDiv.text
|
||||
else:
|
||||
content = None
|
||||
outlinks = []
|
||||
for a in entry.find_all('a'):
|
||||
if not a.has_attr('href'):
|
||||
continue
|
||||
href = a.get('href')
|
||||
if not href.startswith('https://l.facebook.com/l.php?'):
|
||||
continue
|
||||
query = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
|
||||
if 'u' not in query or len(query['u']) != 1:
|
||||
logger.warning(f'Ignoring odd outlink: {href}')
|
||||
continue
|
||||
outlink = query['u'][0]
|
||||
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
|
||||
outlinks.append(outlink)
|
||||
yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
|
||||
|
||||
|
||||
class FacebookUserScraper(FacebookCommonScraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, username, baseUrl):
|
||||
yielded = set()
|
||||
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')):
|
||||
href = a.get('href')
|
||||
if href.startswith(f'/{username}/'):
|
||||
link = urllib.parse.urljoin(baseUrl, href)
|
||||
if link not in yielded:
|
||||
yield snscrape.base.URLItem(link)
|
||||
yielded.add(link)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
@@ -39,12 +141,9 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
logger.warning('User does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
raise snscrape.base.ScraperException('Got status code {r.status_code}')
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation
|
||||
baseUrl = f'https://www.facebook.com/{username}/'
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
while nextPageLink:
|
||||
@@ -54,8 +153,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
# Reproducing that would be difficult to get right, especially as Facebook's codebase evolves, so it's just not sent at all here.
|
||||
r = self._get(urllib.parse.urljoin(baseUrl, nextPageLink.get('ajaxify')) + '&__a=1', headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
response = json.loads(spuriousForLoopPattern.sub('', r.text))
|
||||
assert 'domops' in response
|
||||
assert len(response['domops']) == 1
|
||||
@@ -65,7 +163,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
assert response['domops'][0][2] == False
|
||||
assert '__html' in response['domops'][0][3]
|
||||
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
@classmethod
|
||||
@@ -75,3 +173,70 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
|
||||
class FacebookGroupScraper(FacebookCommonScraper):
|
||||
name = 'facebook-group'
|
||||
|
||||
def __init__(self, group, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._group = group
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
pageletDataPattern = re.compile(r'"GroupEntstreamPagelet",\{.*?\}(?=,\{)')
|
||||
pageletDataPrefixLength = len('"GroupEntstreamPagelet",')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
|
||||
baseUrl = f'https://www.facebook.com/groups/{self._group}/'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('Group does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
raise snscrape.base.ScraperException('Got status code {r.status_code}')
|
||||
|
||||
if 'content:{pagelet_group_mall:{container_id:"' not in r.text:
|
||||
raise snscrape.base.ScraperException('Code container ID marker not found (does the group exist?)')
|
||||
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Posts are inside an HTML comment in two code tags with IDs listed in JS...
|
||||
for codeContainerIdStart in ('content:{pagelet_group_mall:{container_id:"', 'content:{group_mall_after_tti:{container_id:"'):
|
||||
codeContainerIdPos = r.text.index(codeContainerIdStart) + len(codeContainerIdStart)
|
||||
codeContainerId = r.text[codeContainerIdPos : r.text.index('"', codeContainerIdPos)]
|
||||
codeContainer = soup.find('code', id = codeContainerId)
|
||||
if not codeContainer:
|
||||
raise snscrape.base.ScraperException('Code container not found')
|
||||
if type(codeContainer.string) is not bs4.element.Comment:
|
||||
raise snscrape.base.ScraperException('Code container does not contain a comment')
|
||||
codeSoup = bs4.BeautifulSoup(codeContainer.string, 'lxml')
|
||||
yield from self._soup_to_items(codeSoup, baseUrl, 'group')
|
||||
|
||||
# Pagination
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
while True:
|
||||
# As on the user profile pages, the web app sends a lot of additional parameters, but those all seem to be unnecessary (although some change the response format, e.g. from JSON to HTML)
|
||||
r = self._get(
|
||||
f'https://www.facebook.com/ajax/pagelet/generic.php/GroupEntstreamPagelet',
|
||||
params = {'data': data, '__a': 1},
|
||||
headers = headers,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
obj = json.loads(spuriousForLoopPattern.sub('', r.text))
|
||||
if obj['payload'] == '':
|
||||
# End of pagination
|
||||
break
|
||||
soup = bs4.BeautifulSoup(obj['payload'], 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl, 'group')
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('group', help = 'A group name or ID')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.group, retries = args.retries)
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GooglePlusUserScraper(snscrape.base.Scraper):
|
||||
name = 'googleplus-user'
|
||||
|
||||
def __init__(self, user, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._user = user
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(f'https://plus.google.com/{self._user}', headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('User does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
# Global data; only needed for the session ID
|
||||
#TODO: Make this more robust somehow
|
||||
match = re.search(r'''(['"])FdrFJe\1\s*:\s*(['"])(?P<sid>.*?)\2''', r.text)
|
||||
if not match:
|
||||
logger.error('Unable to find session ID')
|
||||
return
|
||||
sid = match.group('sid')
|
||||
|
||||
# Page data
|
||||
# As of 2018-05-18, the much simpler regex r'''<script[^>]*>AF_initDataCallback\(\{key: 'ds:6',.*?return (.*?)\}\}\);</script>''' would work also, but this is more generic and less likely to break:
|
||||
match = re.search(r'''<script[^>]*>\s*(?:.*?)\s*\(\s*\{(?:|.*?,)\s*key\s*:\s*(['"])ds:6\1\s*,.*?,\s*data\s*:\s*function\s*\(\s*\)\s*\{\s*return\s*(?P<data>.*?)\}\s*\}\s*\)\s*;\s*</script>''', r.text, re.DOTALL)
|
||||
if not match:
|
||||
logger.error('Unable to extract data')
|
||||
return
|
||||
jsonData = match.group('data')
|
||||
response = json.loads(jsonData)
|
||||
if response[0][7] is None:
|
||||
logger.info('User has no posts')
|
||||
return
|
||||
for postObj in response[0][7]:
|
||||
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
|
||||
cursor = response[0][1] # 'ADSJ_x'
|
||||
if cursor is None:
|
||||
# No further pages
|
||||
return
|
||||
baseDate = datetime.datetime.utcnow()
|
||||
baseSeconds = baseDate.hour * 3600 + baseDate.minute * 60 + baseDate.second
|
||||
userid = response[1] # Alternatively and more ugly: response[0][7][0][6]['33558957'][16]
|
||||
|
||||
for counter in itertools.count(start = 2):
|
||||
logger.info('Retrieving next page')
|
||||
reqid = 1 + baseSeconds + int(1e5) * counter
|
||||
r = self._post(
|
||||
f'https://plus.google.com/_/PlusAppUi/data?ds.extension=74333095&f.sid={sid}&hl=en-US&soc-app=199&soc-platform=1&soc-device=1&_reqid={reqid}&rt=c',
|
||||
data = [('f.req', '[[[74333095,[{"74333095":["' + cursor + '","' + userid + '"]}],null,null,0]]]'), ('', '')],
|
||||
headers = headers
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
# As if everything up to here wasn't terrible already, this is where it gets *really* bad.
|
||||
# The API contains a few junk characters at the beginning, apparently as an anti-CSRF measure.
|
||||
# The remainder is effectively a self-made chunked transfer encoding but with decimal digits and including everything except the digits themselves in the chunk size.
|
||||
# It sucks.
|
||||
# Each chunk is actually one JSON object; you'd think that we can just read the first one and parse that, but there are some quirks that make this difficult.
|
||||
# I was unable to figure out what the "chunk size" actually covers exactly; the response is UTF-8 encoded, but the chunk size matches neither the binary nor the decoded length.
|
||||
# Enter the awful workaround: strip away the initial chunk size, then parse the beginning of the remaining data using a parser that doesn't care if there's junk after the JSON.
|
||||
|
||||
garbage = r.text
|
||||
assert garbage[:6] == ")]}'\n\n" # anti-CSRF and two newlines
|
||||
data = []
|
||||
pos = 6
|
||||
while garbage[pos].isdigit() or garbage[pos].isspace(): # Also strip leading whitespace
|
||||
pos += 1
|
||||
response = json.JSONDecoder().raw_decode(''.join(garbage[pos:]))[0] # Parses only the first structure in the data stream without throwing an error about the extra data at the end
|
||||
|
||||
for postObj in response[0][2]['74333095'][0][7]:
|
||||
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
|
||||
|
||||
cursor = response[0][2]['74333095'][0][1]
|
||||
|
||||
if cursor is None:
|
||||
break
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('user', help = 'A Google Plus username (with leading "+") or numeric ID')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.user, retries = args.retries)
|
||||
@@ -1,74 +1,181 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstagramUserScraper(snscrape.base.Scraper):
|
||||
name = 'instagram-user'
|
||||
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
thumbnailUrl: str
|
||||
displayUrl: str
|
||||
username: str
|
||||
likes: int
|
||||
comments: int
|
||||
commentsDisabled: bool
|
||||
isVideo: bool
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
if mode not in ('User', 'Hashtag', 'Location'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._name = name
|
||||
|
||||
def _response_to_items(self, response, username):
|
||||
for node in response['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if self._mode == 'User':
|
||||
self._initialUrl = f'https://www.instagram.com/{self._name}/'
|
||||
self._pageName = 'ProfilePage'
|
||||
self._responseContainer = 'user'
|
||||
self._edgeXToMedia = 'edge_owner_to_timeline_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Hashtag':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
|
||||
self._pageName = 'TagPage'
|
||||
self._responseContainer = 'hashtag'
|
||||
self._edgeXToMedia = 'edge_hashtag_to_media'
|
||||
self._pageIDKey = 'name'
|
||||
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Location':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/locations/{self._name}/'
|
||||
self._pageName = 'LocationsPage'
|
||||
self._responseContainer = 'location'
|
||||
self._edgeXToMedia = 'edge_location_to_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
code = node['node']['shortcode']
|
||||
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
|
||||
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
||||
usernameQuery = '?taken-by=' + username
|
||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||
yield InstagramPost(
|
||||
cleanUrl = cleanUrl,
|
||||
dirtyUrl = f'{cleanUrl}{usernameQuery}',
|
||||
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
||||
thumbnailUrl = node['node']['thumbnail_src'],
|
||||
displayUrl = node['node']['display_url'],
|
||||
username = username,
|
||||
likes = node['node']['edge_media_preview_like']['count'],
|
||||
comments = node['node']['edge_media_to_comment']['count'],
|
||||
commentsDisabled = node['node']['comments_disabled'],
|
||||
isVideo = node['node']['is_video'],
|
||||
)
|
||||
|
||||
def _check_initial_page_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return True, None
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
try:
|
||||
obj = json.loads(jsonData)
|
||||
except json.JSONDecodeError:
|
||||
return False, 'invalid JSON'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return False, f'status code {r.status_code}'
|
||||
try:
|
||||
obj = json.loads(r.text)
|
||||
except json.JSONDecodeError as e:
|
||||
return False, f'invalid JSON ({e!r})'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
|
||||
r = self._get(self._initialUrl, headers = headers, responseOkCallback = self._check_initial_page_callback)
|
||||
if r.status_code == 404:
|
||||
logger.warning('User does not exist')
|
||||
logger.warning(f'{self._mode} does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
elif r.url.startswith('https://www.instagram.com/accounts/login/'):
|
||||
raise snscrape.base.ScraperException('Redirected to login page')
|
||||
response = r._snscrape_json_obj
|
||||
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
|
||||
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
|
||||
logger.info(f'{self._mode} has no posts')
|
||||
return
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
response = json.loads(jsonData)
|
||||
rhxGis = response['rhx_gis']
|
||||
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
|
||||
logger.info('User has no posts')
|
||||
return
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
logger.warning('Private account')
|
||||
return
|
||||
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
|
||||
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
|
||||
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
|
||||
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving endCursor = {endCursor!r}')
|
||||
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
|
||||
variables = self._variablesFormat.format(**locals())
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
response = r._snscrape_json_obj
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
return
|
||||
yield from self._response_to_items(response['data'], username)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
yield from self._response_to_items(response['data'])
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
|
||||
class InstagramUserScraper(InstagramCommonScraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'An Instagram username')
|
||||
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
return cls('User', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramHashtagScraper(InstagramCommonScraper):
|
||||
name = 'instagram-hashtag'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Hashtag', args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramLocationScraper(InstagramCommonScraper):
|
||||
name = 'instagram-location'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('locationid', help = 'An Instagram location ID', type = int)
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Location', args.locationid, retries = args.retries)
|
||||
|
||||
66
snscrape/modules/telegram.py
Normal file
66
snscrape/modules/telegram.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TelegramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TelegramChannelScraper(snscrape.base.Scraper):
|
||||
name = 'telegram-channel'
|
||||
|
||||
def __init__(self, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._name = name
|
||||
|
||||
def _soup_to_items(self, soup, pageUrl):
|
||||
posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})
|
||||
for post in reversed(posts):
|
||||
date = datetime.datetime.strptime(post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date').find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
|
||||
message = post.find('div', class_ = 'tgme_widget_message_text')
|
||||
if message:
|
||||
content = message.text
|
||||
outlinks = [urllib.parse.urljoin(pageUrl, link['href']) for link in post.find_all('a') if not link.text.startswith('@') and link['href'].startswith('https://t.me/')]
|
||||
outlinksss = ' '.join(outlinks)
|
||||
else:
|
||||
content = None
|
||||
outlinks = []
|
||||
outlinksss = ''
|
||||
yield TelegramPost(url = f'https://t.me/s/{post["data-post"]}', date = date, content = content, outlinks = outlinks, outlinksss = outlinksss)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'}
|
||||
|
||||
nextPageUrl = f'https://t.me/s/{self._name}'
|
||||
while True:
|
||||
r = self._get(nextPageUrl, headers = headers)
|
||||
if r.status_code != 200:
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
yield from self._soup_to_items(soup, nextPageUrl)
|
||||
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
|
||||
if not pageLink:
|
||||
break
|
||||
nextPageUrl = urllib.parse.urljoin(nextPageUrl, pageLink['href'])
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('channel', help = 'A channel name')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.channel, retries = args.retries)
|
||||
@@ -1,80 +1,213 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
name = 'twitter-search'
|
||||
class Tweet(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
id: int
|
||||
username: str
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
tcooutlinks: list
|
||||
tcooutlinksss: str
|
||||
|
||||
def __init__(self, query, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
def _get_feed_from_html(self, html):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
return feed
|
||||
|
||||
class Account(typing.NamedTuple, snscrape.base.Item):
|
||||
username: str
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return f'https://twitter.com/{self.username}'
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterCommonScraper(snscrape.base.Scraper):
|
||||
def _feed_to_items(self, feed):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||
|
||||
date = None
|
||||
timestampA = tweet.find('a', 'tweet-timestamp')
|
||||
if timestampA:
|
||||
timestampSpan = timestampA.find('span', '_timestamp')
|
||||
if timestampSpan and timestampSpan.has_attr('data-time'):
|
||||
date = datetime.datetime.fromtimestamp(int(timestampSpan['data-time']), datetime.timezone.utc)
|
||||
if not date:
|
||||
logger.warning(f'Failed to extract date for {url}')
|
||||
|
||||
contentP = tweet.find('p', 'tweet-text')
|
||||
content = None
|
||||
outlinks = []
|
||||
tcooutlinks = []
|
||||
if contentP:
|
||||
content = contentP.text
|
||||
for a in contentP.find_all('a'):
|
||||
if a.has_attr('href') and not a['href'].startswith('/') and (not a.has_attr('class') or 'u-hidden' not in a['class']):
|
||||
if a.has_attr('data-expanded-url'):
|
||||
outlinks.append(a['data-expanded-url'])
|
||||
else:
|
||||
logger.warning(f'Ignoring link without expanded URL on {url}: {a["href"]}')
|
||||
tcooutlinks.append(a['href'])
|
||||
else:
|
||||
logger.warning(f'Failed to extract content for {url}')
|
||||
card = tweet.find('div', 'card2')
|
||||
if card and 'has-autoplayable-media' not in card['class']:
|
||||
for div in card.find_all('div'):
|
||||
if div.has_attr('data-card-url'):
|
||||
outlinks.append(div['data-card-url'])
|
||||
tcooutlinks.append(div['data-card-url'])
|
||||
outlinks = list(dict.fromkeys(outlinks)) # Deduplicate in case the same link was shared more than once within this tweet; may change order on Python 3.6 or older
|
||||
tcooutlinks = list(dict.fromkeys(tcooutlinks))
|
||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers['content-type'] != 'application/json;charset=utf-8':
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
return True, None
|
||||
|
||||
|
||||
class TwitterSearchScraper(TwitterCommonScraper):
|
||||
name = 'twitter-search'
|
||||
|
||||
def __init__(self, query, cursor = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
self._cursor = cursor
|
||||
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
||||
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
|
||||
|
||||
def _get_guest_token(self):
|
||||
logger.info(f'Retrieving guest token from search page')
|
||||
r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
|
||||
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
|
||||
if not match:
|
||||
raise snscrape.base.ScraperException('Unable to find guest token')
|
||||
return match.group(1)
|
||||
|
||||
def _check_scroll_response(self, r):
|
||||
if r.status_code == 429:
|
||||
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
|
||||
return True, None
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
if r.status_code != 200:
|
||||
return False, f'non-200 status code'
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {self._query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
return
|
||||
newestID = feed[0]['data-item-id']
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
headers = {
|
||||
'User-Agent': self._userAgent,
|
||||
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
|
||||
'Referer': self._baseUrl,
|
||||
}
|
||||
guestToken = None
|
||||
cursor = self._cursor
|
||||
while True:
|
||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
||||
r = self._get('https://twitter.com/i/search/timeline',
|
||||
params = {
|
||||
'f': 'tweets',
|
||||
'vertical': 'default',
|
||||
'lang': 'en',
|
||||
'q': self._query,
|
||||
'include_available_features': '1',
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'qf': 'off',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback)
|
||||
if not guestToken:
|
||||
guestToken = self._get_guest_token()
|
||||
headers['x-guest-token'] = guestToken
|
||||
|
||||
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
|
||||
if not feed:
|
||||
return
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
logger.info(f'Retrieving scroll page {cursor}')
|
||||
params = {
|
||||
'include_profile_interstitial_type': '1',
|
||||
'include_blocking': '1',
|
||||
'include_blocked_by': '1',
|
||||
'include_followed_by': '1',
|
||||
'include_want_retweets': '1',
|
||||
'include_mute_edge': '1',
|
||||
'include_can_dm': '1',
|
||||
'include_can_media_tag': '1',
|
||||
'skip_status': '1',
|
||||
'cards_platform': 'Web-12',
|
||||
'include_cards': '1',
|
||||
'include_composer_source': 'true',
|
||||
'include_ext_alt_text': 'true',
|
||||
'include_reply_count': '1',
|
||||
'tweet_mode': 'extended',
|
||||
'include_entities': 'true',
|
||||
'include_user_entities': 'true',
|
||||
'include_ext_media_color': 'true',
|
||||
'include_ext_media_availability': 'true',
|
||||
'send_error_codes': 'true',
|
||||
'simple_quoted_tweets': 'true',
|
||||
'q': self._query,
|
||||
'tweet_search_mode': 'live',
|
||||
'count': '100',
|
||||
'query_source': 'spelling_expansion_revert_click',
|
||||
}
|
||||
if cursor:
|
||||
params['cursor'] = cursor
|
||||
params['pc'] = '1'
|
||||
params['spelling_corrections'] = '1'
|
||||
params['ext'] = 'mediaStats%2CcameraMoment'
|
||||
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
|
||||
if r.status_code == 429:
|
||||
guestToken = None
|
||||
continue
|
||||
try:
|
||||
obj = r.json()
|
||||
except json.JSONDecodeError as e:
|
||||
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
|
||||
|
||||
# No data format test, just a hard and loud crash if anything's wrong :-)
|
||||
newCursor = None
|
||||
for instruction in obj['timeline']['instructions']:
|
||||
if 'addEntries' in instruction:
|
||||
entries = instruction['addEntries']['entries']
|
||||
elif 'replaceEntry' in instruction:
|
||||
entries = [instruction['replaceEntry']['entry']]
|
||||
else:
|
||||
continue
|
||||
for entry in entries:
|
||||
if entry['entryId'].startswith('sq-I-t-'):
|
||||
if 'tweet' in entry['content']['item']['content']:
|
||||
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
|
||||
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']:
|
||||
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']]
|
||||
else:
|
||||
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}')
|
||||
tweetID = tweet['id']
|
||||
content = tweet['full_text']
|
||||
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
|
||||
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
|
||||
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
|
||||
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
|
||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||
elif entry['entryId'] == 'sq-cursor-bottom':
|
||||
newCursor = entry['content']['operation']['cursor']['value']
|
||||
if not newCursor or newCursor == cursor:
|
||||
# End of pagination
|
||||
break
|
||||
cursor = newCursor
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('--cursor', metavar = 'CURSOR')
|
||||
subparser.add_argument('query', help = 'A Twitter search string')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.query, retries = args.retries)
|
||||
return cls(args.query, cursor = args.cursor, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterUserScraper(TwitterSearchScraper):
|
||||
@@ -106,3 +239,136 @@ class TwitterHashtagScraper(TwitterSearchScraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterThreadScraper(TwitterCommonScraper):
|
||||
name = 'twitter-thread'
|
||||
|
||||
def __init__(self, tweetID = None, **kwargs):
|
||||
if tweetID is not None and tweetID.strip('0123456789') != '':
|
||||
raise ValueError('Invalid tweet ID, must be numeric')
|
||||
super().__init__(**kwargs)
|
||||
self._tweetID = tweetID
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||
|
||||
# Fetch the page of the last tweet in the thread
|
||||
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Extract tweets on that page in the correct order; first, the tweet that was supplied, then the ancestors with pagination if necessary
|
||||
tweet = soup.find('div', 'ThreadedConversation--permalinkTweetWithAncestors')
|
||||
if tweet:
|
||||
tweet = tweet.find('div', 'tweet')
|
||||
if not tweet:
|
||||
logger.warning('Tweet does not exist, is not a thread, or does not have ancestors')
|
||||
return
|
||||
items = list(self._feed_to_items([tweet]))
|
||||
assert len(items) == 1
|
||||
yield items[0]
|
||||
username = items[0].username
|
||||
|
||||
ancestors = soup.find('div', 'ThreadedConversation--ancestors')
|
||||
if not ancestors:
|
||||
logger.warning('Tweet does not have ancestors despite claiming to')
|
||||
return
|
||||
feed = reversed(ancestors.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
# If necessary, iterate through pagination until reaching the initial tweet
|
||||
streamContainer = ancestors.find('div', 'stream-container')
|
||||
if not streamContainer.has_attr('data-max-position') or streamContainer['data-max-position'] == '':
|
||||
return
|
||||
minPosition = streamContainer['data-max-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'https://twitter.com/i/{username}/conversation/{self._tweetID}?include_available_features=1&include_entities=1&min_position={minPosition}',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
feed = reversed(soup.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
minPosition = obj['max_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('tweetID', help = 'A tweet ID of the last tweet in a thread')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(tweetID = args.tweetID, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListPostsScraper(TwitterSearchScraper):
|
||||
name = 'twitter-list-posts'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(f'list:{listName}', **kwargs)
|
||||
self._listName = listName
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListMembersScraper(TwitterCommonScraper):
|
||||
name = 'twitter-list-members'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._user, self._list = listName.split('/')
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||
|
||||
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.warning('List not found')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
container = soup.find('div', 'stream-container')
|
||||
if not container:
|
||||
raise snscrape.base.ScraperException('Unable to find container')
|
||||
items = container.find_all('li', 'js-stream-item')
|
||||
if not items:
|
||||
logger.warning('Empty list')
|
||||
return
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
|
||||
if not container.has_attr('data-min-position') or container['data-min-position'] == '':
|
||||
return
|
||||
maxPosition = container['data-min-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'{baseUrl}/timeline?include_available_features=1&include_entities=1&max_position={maxPosition}&reset_error_state=false',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
items = soup.find_all('li', 'js-stream-item')
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
maxPosition = obj['min_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
104
snscrape/modules/vkontakte.py
Normal file
104
snscrape/modules/vkontakte.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class VKontakteUserScraper(snscrape.base.Scraper):
|
||||
name = 'vkontakte-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
for post in soup.find_all('div', class_ = 'post'):
|
||||
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
|
||||
textDiv = post.find('div', class_ = 'wall_post_text')
|
||||
yield VKontaktePost(
|
||||
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
|
||||
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
|
||||
content = textDiv.text if textDiv else None,
|
||||
)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
baseUrl = f'https://vk.com/{self._username}'
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('Wall does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
|
||||
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
|
||||
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
|
||||
|
||||
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
|
||||
logger.warning('Private profile')
|
||||
return
|
||||
|
||||
profileDeleted = soup.find('h5', class_ = 'profile_deleted_text')
|
||||
if profileDeleted:
|
||||
# Unclear what this state represents, so just log website text.
|
||||
logger.warning(profileDeleted.text)
|
||||
return
|
||||
|
||||
newestPost = soup.find('div', class_ = 'post')
|
||||
if not newestPost:
|
||||
logger.info('Wall has no posts')
|
||||
return
|
||||
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
|
||||
# If there is a pinned post, we need its ID for the pagination requests
|
||||
if 'post_fixed' in newestPost.attrs['class']:
|
||||
fixedPostID = newestPost.attrs['id'].split('_')[1]
|
||||
else:
|
||||
fixedPostID = ''
|
||||
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
for offset in itertools.count(start = 10, step = 10):
|
||||
logger.info('Retrieving next page')
|
||||
r = self._post(
|
||||
'https://vk.com/al_wall.php',
|
||||
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
|
||||
headers = headers
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
|
||||
# Convert to JSON and read the HTML payload. Note that this implicitly converts the data to a Python string (i.e., Unicode), away from a windows-1251-encoded bytes.
|
||||
posts = r.json()['payload'][1][0]
|
||||
if posts.startswith('<div class="page_block no_posts">'):
|
||||
# Reached the end
|
||||
break
|
||||
if not posts.startswith('<div id="post'):
|
||||
raise snscrape.base.ScraperException(f'Got an unknown response: {posts[:200]!r}...')
|
||||
soup = bs4.BeautifulSoup(posts, 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A VK username')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
7
snscrape/version.py
Normal file
7
snscrape/version.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import pkg_resources
|
||||
|
||||
|
||||
try:
|
||||
__version__ = pkg_resources.get_distribution('snscrape').version
|
||||
except pkg_resources.DistributionNotFound:
|
||||
__version__ = None
|
||||
Reference in New Issue
Block a user