mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 10:38:28 +03:00
Compare commits
73 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6cc3180d9 | ||
|
|
613395d1c2 | ||
|
|
82a87b7b5a | ||
|
|
9568028bf9 | ||
|
|
6df351772e | ||
|
|
541173b0c8 | ||
|
|
b6772d3778 | ||
|
|
20ea117a2c | ||
|
|
ff54c350bc | ||
|
|
e6aae35304 | ||
|
|
b698a201f5 | ||
|
|
7fe72cf708 | ||
|
|
4651cde447 | ||
|
|
c99cc4b5d3 | ||
|
|
628074d6fc | ||
|
|
64b293bd9e | ||
|
|
180f4dfeb7 | ||
|
|
6d6e3fa16c | ||
|
|
5f7e6936c1 | ||
|
|
e2c05c9e0c | ||
|
|
14e11b28d2 | ||
|
|
1a07b3b7e8 | ||
|
|
4d8cc7bdb9 | ||
|
|
eec83f181e | ||
|
|
fae7432c64 | ||
|
|
757818474d | ||
|
|
e6c934c0b8 | ||
|
|
d2315feec1 | ||
|
|
765ceeeb10 | ||
|
|
731a2e8c8b | ||
|
|
7d1916292c | ||
|
|
0d509c4ba0 | ||
|
|
907a003a59 | ||
|
|
8ada279b57 | ||
|
|
900eae54a6 | ||
|
|
7989af27b5 | ||
|
|
e528ca3f26 | ||
|
|
32a427dac3 | ||
|
|
7001983556 | ||
|
|
64438afc92 | ||
|
|
9e6538556a | ||
|
|
9c8bbf051c | ||
|
|
c6a11298ac | ||
|
|
02cbf6ddf6 | ||
|
|
3817aa59d4 | ||
|
|
46a51008f8 | ||
|
|
f91979eb32 | ||
|
|
85fff319bc | ||
|
|
6b145526b7 | ||
|
|
abf31764b1 | ||
|
|
64693f74bb | ||
|
|
a7d08ed51c | ||
|
|
f48ca7726e | ||
|
|
78c295f7e0 | ||
|
|
a5aca1a14f | ||
|
|
96f7d871c1 | ||
|
|
b5dfd37949 | ||
|
|
b511397791 | ||
|
|
536fcb3303 | ||
|
|
f8d812f799 | ||
|
|
c2cebd9166 | ||
|
|
73bc99596f | ||
|
|
8458c12218 | ||
|
|
b59c7e8d8f | ||
|
|
3ceb849d98 | ||
|
|
f5ee1f7ac5 | ||
|
|
1984110f78 | ||
|
|
c5a5dcb92c | ||
|
|
cfb1c9a2aa | ||
|
|
d0d3c8b2a6 | ||
|
|
4d0350e541 | ||
|
|
d17aa15bcb | ||
|
|
d1ef280d6e |
19
README.md
19
README.md
@@ -2,10 +2,12 @@
|
||||
snscrape is a scraper for social networking services (SNS). It scrapes things like user profiles, hashtags, or searches and returns the discovered items, e.g. the relevant posts.
|
||||
|
||||
The following services are currently supported:
|
||||
* Facebook: user profiles
|
||||
* Google Plus: user profiles
|
||||
* Instagram: user profiles
|
||||
* Twitter: user profiles, hashtags, and searches
|
||||
* Facebook: user profiles and groups
|
||||
* Gab: user profile posts, media, and comments
|
||||
* Google+: user profiles
|
||||
* Instagram: user profiles, hashtags, and locations
|
||||
* Twitter: user profiles, hashtags, searches, threads, and lists (members as well as posts)
|
||||
* VKontakte: user profiles
|
||||
|
||||
## Requirements
|
||||
snscrape requires Python 3.6 or higher. The Python package dependencies are installed automatically when you install snscrape.
|
||||
@@ -13,6 +15,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
|
||||
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
|
||||
|
||||
## Installation
|
||||
pip3 install snscrape
|
||||
|
||||
If you want to use the development version:
|
||||
|
||||
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
|
||||
|
||||
## Usage
|
||||
@@ -22,7 +28,7 @@ To get all tweets by Jason Scott (@textfiles):
|
||||
|
||||
It's usually useful to redirect the output to a file for further processing, e.g. in bash using the filename `@textfiles-tweets`:
|
||||
```bash
|
||||
snscrape twitter-user textfiles >@textfiles-tweets
|
||||
snscrape twitter-user textfiles >twitter-@textfiles
|
||||
```
|
||||
|
||||
To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
@@ -33,6 +39,9 @@ To get the latest 100 tweets with the hashtag #archiveteam:
|
||||
|
||||
It is also possible to use snscrape as a library in Python, but this is currently undocumented.
|
||||
|
||||
## Issue reporting
|
||||
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If possible please run snscrape with `-vv` and `--dump-locals` and include the log output as well as the dump files referenced in the log in the issue. Note that the files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
|
||||
|
||||
## License
|
||||
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
|
||||
|
||||
7
setup.py
7
setup.py
@@ -3,7 +3,6 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
name = 'snscrape',
|
||||
version = '0.1.1',
|
||||
description = 'A social networking service scraper',
|
||||
author = 'JustAnotherArchivist',
|
||||
url = 'https://github.com/JustAnotherArchivist/snscrape',
|
||||
@@ -12,8 +11,10 @@ setuptools.setup(
|
||||
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
],
|
||||
packages = ['snscrape'],
|
||||
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
|
||||
packages = ['snscrape', 'snscrape.modules'],
|
||||
setup_requires = ['setuptools_scm'],
|
||||
use_scm_version = True,
|
||||
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'snscrape = snscrape.cli:main',
|
||||
|
||||
@@ -59,11 +59,32 @@ class Scraper:
|
||||
logger.debug(f'... with data: {data!r}')
|
||||
try:
|
||||
r = self._session.send(req, timeout = timeout)
|
||||
if responseOkCallback is None or responseOkCallback(r):
|
||||
logger.debug(f'{req.url} retrieved successfully')
|
||||
return r
|
||||
except requests.exceptions.RequestException as exc:
|
||||
logger.error(f'Error retrieving {url}: {exc!r}')
|
||||
if attempt < self._retries:
|
||||
retrying = ', retrying'
|
||||
level = logging.WARNING
|
||||
else:
|
||||
retrying = ''
|
||||
level = logging.ERROR
|
||||
logger.log(level, f'Error retrieving {req.url}: {exc!r}{retrying}')
|
||||
else:
|
||||
if responseOkCallback is not None:
|
||||
success, msg = responseOkCallback(r)
|
||||
else:
|
||||
success, msg = (True, None)
|
||||
msg = f': {msg}' if msg else ''
|
||||
|
||||
if success:
|
||||
logger.debug(f'{req.url} retrieved successfully{msg}')
|
||||
return r
|
||||
else:
|
||||
if attempt < self._retries:
|
||||
retrying = ', retrying'
|
||||
level = logging.WARNING
|
||||
else:
|
||||
retrying = ''
|
||||
level = logging.ERROR
|
||||
logger.log(level, f'Error retrieving {req.url}{msg}{retrying}')
|
||||
if attempt < self._retries:
|
||||
sleepTime = 1.0 * 2**attempt # exponential backoff: sleep 1 second after first attempt, 2 after second, 4 after third, etc.
|
||||
logger.info(f'Waiting {sleepTime:.0f} seconds')
|
||||
|
||||
201
snscrape/cli.py
201
snscrape/cli.py
@@ -1,25 +1,175 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import datetime
|
||||
import inspect
|
||||
import logging
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import requests.models
|
||||
# Imported in parse_args() after setting up the logger:
|
||||
#import snscrape.base
|
||||
#import snscrape.modules
|
||||
#import snscrape.version
|
||||
import tempfile
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
## Logging
|
||||
dumpLocals = False
|
||||
logger = logging # Replaced below after setting the logger class
|
||||
|
||||
|
||||
class Logger(logging.Logger):
|
||||
def _log_with_stack(self, level, *args, **kwargs):
|
||||
super().log(level, *args, **kwargs)
|
||||
if dumpLocals:
|
||||
stack = inspect.stack()
|
||||
if len(stack) >= 3:
|
||||
name = _dump_stack_and_locals(stack[2:][::-1])
|
||||
super().log(level, f'Dumped stack and locals to {name}')
|
||||
|
||||
def warning(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.WARNING, *args, **kwargs)
|
||||
|
||||
def error(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.ERROR, *args, **kwargs)
|
||||
|
||||
def critical(self, *args, **kwargs):
|
||||
self._log_with_stack(logging.CRITICAL, *args, **kwargs)
|
||||
|
||||
def log(self, level, *args, **kwargs):
|
||||
if level >= logging.WARNING:
|
||||
self._log_with_stack(level, *args, **kwargs)
|
||||
else:
|
||||
super().log(level, *args, **kwargs)
|
||||
|
||||
|
||||
def _requests_preparedrequest_repr(name, request):
|
||||
ret = []
|
||||
ret.append(repr(request))
|
||||
ret.append(f'\n {name}.method = {request.method}')
|
||||
ret.append(f'\n {name}.url = {request.url}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in request.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", request.headers[field])}')
|
||||
if request.body:
|
||||
ret.append(f'\n {name}.body = ')
|
||||
ret.append(_repr('_', request.body).replace('\n', '\n '))
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _requests_response_repr(name, response, withHistory = True):
|
||||
ret = []
|
||||
ret.append(repr(response))
|
||||
ret.append(f'\n {name}.url = {response.url}')
|
||||
ret.append(f'\n {name}.request = ')
|
||||
ret.append(_repr('_', response.request).replace('\n', '\n '))
|
||||
if withHistory and response.history:
|
||||
ret.append(f'\n {name}.history = [')
|
||||
for previousResponse in response.history:
|
||||
ret.append(f'\n ')
|
||||
ret.append(_requests_response_repr('_', previousResponse, withHistory = False).replace('\n', '\n '))
|
||||
ret.append('\n ]')
|
||||
ret.append(f'\n {name}.status_code = {response.status_code}')
|
||||
ret.append(f'\n {name}.headers = \\')
|
||||
for field in response.headers:
|
||||
ret.append(f'\n {field} = {_repr("_", response.headers[field])}')
|
||||
ret.append(f'\n {name}.content = {_repr("_", response.content)}')
|
||||
return ''.join(ret)
|
||||
|
||||
|
||||
def _repr(name, value):
|
||||
if type(value) is requests.models.Response:
|
||||
return _requests_response_repr(name, value)
|
||||
if type(value) is requests.models.PreparedRequest:
|
||||
return _requests_preparedrequest_repr(name, value)
|
||||
valueRepr = repr(value)
|
||||
if '\n' in valueRepr:
|
||||
return ''.join(['\\\n ', valueRepr.replace('\n', '\n ')])
|
||||
return valueRepr
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dump_locals_on_exception():
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
trace = inspect.trace()
|
||||
if len(trace) >= 2:
|
||||
name = _dump_stack_and_locals(trace[1:])
|
||||
logger.fatal(f'Dumped stack and locals to {name}')
|
||||
raise
|
||||
|
||||
|
||||
def _dump_stack_and_locals(trace):
|
||||
with tempfile.NamedTemporaryFile('w', prefix = 'snscrape_locals_', delete = False) as fp:
|
||||
fp.write('Stack:\n')
|
||||
for frameRecord in trace:
|
||||
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
|
||||
for line in frameRecord.code_context:
|
||||
fp.write(f' {line.strip()}\n')
|
||||
fp.write('\n')
|
||||
|
||||
for frameRecord in trace:
|
||||
module = inspect.getmodule(frameRecord[0])
|
||||
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
|
||||
continue
|
||||
locals_ = frameRecord[0].f_locals
|
||||
fp.write(f'Locals from file "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}:\n')
|
||||
for variableName in locals_:
|
||||
variable = locals_[variableName]
|
||||
varRepr = _repr(variableName, variable)
|
||||
fp.write(f' {variableName} {type(variable)} = ')
|
||||
fp.write(varRepr.replace('\n', '\n '))
|
||||
fp.write('\n')
|
||||
fp.write('\n')
|
||||
if 'self' in locals_ and hasattr(locals_['self'], '__dict__'):
|
||||
fp.write(f'Object dict:\n')
|
||||
fp.write(repr(locals_['self'].__dict__))
|
||||
fp.write('\n\n')
|
||||
name = fp.name
|
||||
return name
|
||||
|
||||
|
||||
def parse_datetime_arg(arg):
|
||||
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
|
||||
try:
|
||||
d = datetime.datetime.strptime(arg, format)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
if d.tzinfo is None:
|
||||
return d.replace(tzinfo = datetime.timezone.utc)
|
||||
return d
|
||||
# Try treating it as a unix timestamp
|
||||
try:
|
||||
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return d
|
||||
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
|
||||
|
||||
|
||||
def parse_args():
|
||||
import snscrape.base
|
||||
import snscrape.modules
|
||||
import snscrape.version
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('--version', action = 'version', version = f'snscrape {snscrape.version.__version__}')
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--dump-locals', dest = 'dumpLocals', action = 'store_true', default = False, help = 'Dump local variables on serious log messages (warnings or higher)')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
|
||||
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
classes = snscrape.base.Scraper.__subclasses__()
|
||||
for cls in classes:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
if cls.name is not None:
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
classes.extend(cls.__subclasses__())
|
||||
|
||||
args = parser.parse_args()
|
||||
@@ -31,7 +181,16 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(verbosity):
|
||||
def setup_logging():
|
||||
logging.setLoggerClass(Logger)
|
||||
global logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def configure_logging(verbosity, dumpLocals_):
|
||||
global dumpLocals
|
||||
dumpLocals = dumpLocals_
|
||||
|
||||
rootLogger = logging.getLogger()
|
||||
|
||||
# Set level
|
||||
@@ -44,6 +203,10 @@ def setup_logging(verbosity):
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
|
||||
|
||||
# Remove existing handlers
|
||||
for handler in rootLogger.handlers:
|
||||
rootLogger.removeHandler(handler)
|
||||
|
||||
# Add stream handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
@@ -51,15 +214,23 @@ def setup_logging(verbosity):
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
args = parse_args()
|
||||
setup_logging(args.verbosity)
|
||||
configure_logging(args.verbosity, args.dumpLocals)
|
||||
scraper = args.cls.from_args(args)
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
with _dump_locals_on_exception():
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
if args.since is not None and item.date < args.since:
|
||||
logger.info(f'Exiting due to reaching older results than {args.since}')
|
||||
break
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
|
||||
@@ -1,33 +1,135 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FacebookUserScraper(snscrape.base.Scraper):
|
||||
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: typing.Optional[str]
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class FacebookCommonScraper(snscrape.base.Scraper):
|
||||
def _clean_url(self, dirtyUrl):
|
||||
u = urllib.parse.urlparse(dirtyUrl)
|
||||
if u.path == '/permalink.php':
|
||||
# Retain only story_fbid and id parameters
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
|
||||
elif u.path == '/photo.php':
|
||||
# Retain only the fbid parameter
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('fbid', q['fbid'][0]),)), '')
|
||||
elif u.path == '/media/set/':
|
||||
# Retain only the set parameter and try to shorten it to the minimum
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
setVal = q['set'][0]
|
||||
if setVal.rstrip('0123456789').endswith('.a.'):
|
||||
setVal = f'a.{setVal.rsplit(".", 1)[1]}'
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('set', setVal),)), '')
|
||||
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
|
||||
# No manipulation of the path needed, but strip the query string
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.split('/')[2] in ('photos', 'videos'):
|
||||
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
|
||||
# But to be safe, also handle URLs that don't have that crap correctly.
|
||||
if u.path.count('/') == 4:
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.count('/') == 5:
|
||||
# Strip out the third path component
|
||||
pathcomps = u.path.split('/')
|
||||
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
|
||||
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
|
||||
else:
|
||||
return dirtyUrl
|
||||
else:
|
||||
# If we don't recognise the URL, just return the original one.
|
||||
return dirtyUrl
|
||||
return urllib.parse.urlunsplit(clean)
|
||||
|
||||
def _is_odd_link(self, href, entryText, mode):
|
||||
# Returns (isOddLink: bool, warn: bool|None)
|
||||
if mode == 'user':
|
||||
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/', '/photo.php?', '/media/set/')):
|
||||
if href == '#' and 'new photo' in entryText and 'to the album' in entryText:
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
return True, False
|
||||
elif href.startswith('/business/help/788160621327601/?'):
|
||||
# Skip the help article about branded content
|
||||
return True, False
|
||||
else:
|
||||
return True, True
|
||||
return False, None
|
||||
elif mode == 'group':
|
||||
if not re.match(r'^/groups/[^/]+/permalink/\d+/(\?|$)', href):
|
||||
return True, True
|
||||
return False, None
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl, mode):
|
||||
cleanUrl = None # Value from previous iteration is used for warning on link-less entries
|
||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||
mediaSetA = entry.find('a', class_ = '_17z-')
|
||||
if not mediaSetA and not entryA:
|
||||
logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}')
|
||||
continue
|
||||
if mediaSetA and (not entryA or entryA['href'] == '#'):
|
||||
href = mediaSetA['href']
|
||||
elif entryA:
|
||||
href = entryA['href']
|
||||
oddLink, warn = self._is_odd_link(href, entry.text, mode)
|
||||
if oddLink:
|
||||
if warn:
|
||||
logger.warning(f'Ignoring odd link: {href}')
|
||||
continue
|
||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||
cleanUrl = self._clean_url(dirtyUrl)
|
||||
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||
if contentDiv:
|
||||
content = contentDiv.text
|
||||
else:
|
||||
content = None
|
||||
outlinks = []
|
||||
for a in entry.find_all('a'):
|
||||
if not a.has_attr('href'):
|
||||
continue
|
||||
href = a.get('href')
|
||||
if not href.startswith('https://l.facebook.com/l.php?'):
|
||||
continue
|
||||
query = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
|
||||
if 'u' not in query or len(query['u']) != 1:
|
||||
logger.warning(f'Ignoring odd outlink: {href}')
|
||||
continue
|
||||
outlink = query['u'][0]
|
||||
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
|
||||
outlinks.append(outlink)
|
||||
yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
|
||||
|
||||
|
||||
class FacebookUserScraper(FacebookCommonScraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, username, baseUrl):
|
||||
yielded = set()
|
||||
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')):
|
||||
href = a.get('href')
|
||||
if href.startswith(f'/{username}/'):
|
||||
link = urllib.parse.urljoin(baseUrl, href)
|
||||
if link not in yielded:
|
||||
yield snscrape.base.URLItem(link)
|
||||
yielded.add(link)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
@@ -42,9 +144,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation
|
||||
baseUrl = f'https://www.facebook.com/{username}/'
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
while nextPageLink:
|
||||
@@ -65,7 +165,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
assert response['domops'][0][2] == False
|
||||
assert '__html' in response['domops'][0][3]
|
||||
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
|
||||
yield from self._soup_to_items(soup, username, baseUrl)
|
||||
yield from self._soup_to_items(soup, baseUrl, 'user')
|
||||
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
|
||||
|
||||
@classmethod
|
||||
@@ -75,3 +175,72 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
|
||||
class FacebookGroupScraper(FacebookCommonScraper):
|
||||
name = 'facebook-group'
|
||||
|
||||
def __init__(self, group, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._group = group
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
pageletDataPattern = re.compile(r'"GroupEntstreamPagelet",\{.*?\}(?=,\{)')
|
||||
pageletDataPrefixLength = len('"GroupEntstreamPagelet",')
|
||||
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
|
||||
|
||||
baseUrl = f'https://www.facebook.com/groups/{self._group}/'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.warning('Group does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error('Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
if 'content:{pagelet_group_mall:{container_id:"' not in r.text:
|
||||
logger.error('Code container ID marker not found (does the group exist?)')
|
||||
return
|
||||
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Posts are inside an HTML comment in two code tags with IDs listed in JS...
|
||||
for codeContainerIdStart in ('content:{pagelet_group_mall:{container_id:"', 'content:{group_mall_after_tti:{container_id:"'):
|
||||
codeContainerIdPos = r.text.index(codeContainerIdStart) + len(codeContainerIdStart)
|
||||
codeContainerId = r.text[codeContainerIdPos : r.text.index('"', codeContainerIdPos)]
|
||||
codeContainer = soup.find('code', id = codeContainerId)
|
||||
if not codeContainer:
|
||||
raise RuntimeError('Code container not found')
|
||||
if type(codeContainer.string) is not bs4.element.Comment:
|
||||
raise RuntimeError('Code container does not contain a comment')
|
||||
codeSoup = bs4.BeautifulSoup(codeContainer.string, 'lxml')
|
||||
yield from self._soup_to_items(codeSoup, baseUrl, 'group')
|
||||
|
||||
# Pagination
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
while True:
|
||||
# As on the user profile pages, the web app sends a lot of additional parameters, but those all seem to be unnecessary (although some change the response format, e.g. from JSON to HTML)
|
||||
r = self._get(
|
||||
f'https://www.facebook.com/ajax/pagelet/generic.php/GroupEntstreamPagelet',
|
||||
params = {'data': data, '__a': 1},
|
||||
headers = headers,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise RuntimeError(f'Got status code {r.status_code}')
|
||||
obj = json.loads(spuriousForLoopPattern.sub('', r.text))
|
||||
if obj['payload'] == '':
|
||||
# End of pagination
|
||||
break
|
||||
soup = bs4.BeautifulSoup(obj['payload'], 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl, 'group')
|
||||
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('group', help = 'A group name or ID')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.group, retries = args.retries)
|
||||
|
||||
115
snscrape/modules/gab.py
Normal file
115
snscrape/modules/gab.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import time
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GabPost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class GabUserCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
if mode not in ('posts', 'comments', 'media'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._username = username
|
||||
if mode == 'posts':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}'
|
||||
self._beforeGlue = '?'
|
||||
elif mode == 'comments':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}/comments?includes=post.conversation_parent'
|
||||
self._beforeGlue = '&'
|
||||
elif mode == 'media':
|
||||
self._baseUrl = f'https://gab.com/api/feed/{username}/media'
|
||||
self._beforeGlue = '?'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
yielded = set()
|
||||
for post in response['data']:
|
||||
if post['post']['id'] not in yielded:
|
||||
yield GabPost(
|
||||
url = f'https://gab.com/{post["post"]["user"]["username"]}/posts/{post["post"]["id"]}',
|
||||
date = datetime.datetime.strptime(post['post']['created_at'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z'),
|
||||
content = post['post']['body'],
|
||||
)
|
||||
yielded.add(post['post']['id'])
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(self._baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.error('User does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']:
|
||||
logger.error('User has no posts')
|
||||
return
|
||||
yield from self._response_to_items(response)
|
||||
if self._mode == 'posts':
|
||||
before = response['data'][-1]['published_at']
|
||||
elif self._mode in ('comments', 'media'):
|
||||
before = 30
|
||||
|
||||
while True:
|
||||
logger.info('Retrieving next page')
|
||||
r = self._get(f'{self._baseUrl}{self._beforeGlue}before={before}', headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
response = json.loads(r.text)
|
||||
yield from self._response_to_items(response)
|
||||
if response['no-more'] or not response['data']:
|
||||
# Last page
|
||||
return
|
||||
if self._mode == 'posts':
|
||||
before = response['data'][-1]['published_at']
|
||||
elif self._mode in ('comments', 'media'):
|
||||
before += 30
|
||||
time.sleep(1) # Gab's API is pretty quick but doesn't like being hammered...
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A Gab username')
|
||||
|
||||
|
||||
class GabUserPostsScraper(GabUserCommonScraper):
|
||||
name = 'gab-user'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('posts', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class GabUserCommentsScraper(GabUserCommonScraper):
|
||||
name = 'gab-user-comments'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('comments', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class GabUserMediaScraper(GabUserCommonScraper):
|
||||
name = 'gab-user-media'
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('media', args.username, retries = args.retries)
|
||||
@@ -1,74 +1,181 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstagramUserScraper(snscrape.base.Scraper):
|
||||
name = 'instagram-user'
|
||||
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
thumbnailUrl: str
|
||||
displayUrl: str
|
||||
username: str
|
||||
likes: int
|
||||
comments: int
|
||||
commentsDisabled: bool
|
||||
isVideo: bool
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class InstagramCommonScraper(snscrape.base.Scraper):
|
||||
def __init__(self, mode, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
if mode not in ('User', 'Hashtag', 'Location'):
|
||||
raise ValueError('Invalid mode')
|
||||
self._mode = mode
|
||||
self._name = name
|
||||
|
||||
def _response_to_items(self, response, username):
|
||||
for node in response['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if self._mode == 'User':
|
||||
self._initialUrl = f'https://www.instagram.com/{self._name}/'
|
||||
self._pageName = 'ProfilePage'
|
||||
self._responseContainer = 'user'
|
||||
self._edgeXToMedia = 'edge_owner_to_timeline_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Hashtag':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
|
||||
self._pageName = 'TagPage'
|
||||
self._responseContainer = 'hashtag'
|
||||
self._edgeXToMedia = 'edge_hashtag_to_media'
|
||||
self._pageIDKey = 'name'
|
||||
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
|
||||
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
elif self._mode == 'Location':
|
||||
self._initialUrl = f'https://www.instagram.com/explore/locations/{self._name}/'
|
||||
self._pageName = 'LocationsPage'
|
||||
self._responseContainer = 'location'
|
||||
self._edgeXToMedia = 'edge_location_to_media'
|
||||
self._pageIDKey = 'id'
|
||||
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
|
||||
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
|
||||
|
||||
def _response_to_items(self, response):
|
||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
code = node['node']['shortcode']
|
||||
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
|
||||
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
||||
usernameQuery = '?taken-by=' + username
|
||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||
yield InstagramPost(
|
||||
cleanUrl = cleanUrl,
|
||||
dirtyUrl = f'{cleanUrl}{usernameQuery}',
|
||||
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
||||
thumbnailUrl = node['node']['thumbnail_src'],
|
||||
displayUrl = node['node']['display_url'],
|
||||
username = username,
|
||||
likes = node['node']['edge_media_preview_like']['count'],
|
||||
comments = node['node']['edge_media_to_comment']['count'],
|
||||
commentsDisabled = node['node']['comments_disabled'],
|
||||
isVideo = node['node']['is_video'],
|
||||
)
|
||||
|
||||
def _check_initial_page_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return True, None
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
try:
|
||||
obj = json.loads(jsonData)
|
||||
except json.JSONDecodeError:
|
||||
return False, 'invalid JSON'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.status_code != 200:
|
||||
return False, f'status code {r.status_code}'
|
||||
try:
|
||||
obj = json.loads(r.text)
|
||||
except json.JSONDecodeError as e:
|
||||
return False, f'invalid JSON ({e!r})'
|
||||
r._snscrape_json_obj = obj
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
|
||||
r = self._get(self._initialUrl, headers = headers, responseOkCallback = self._check_initial_page_callback)
|
||||
if r.status_code == 404:
|
||||
logger.warning('User does not exist')
|
||||
logger.warning(f'{self._mode} does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
|
||||
response = json.loads(jsonData)
|
||||
rhxGis = response['rhx_gis']
|
||||
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
|
||||
logger.info('User has no posts')
|
||||
response = r._snscrape_json_obj
|
||||
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
|
||||
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
|
||||
logger.info(f'{self._mode} has no posts')
|
||||
return
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
logger.warning('Private account')
|
||||
return
|
||||
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
|
||||
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
|
||||
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
|
||||
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
|
||||
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
|
||||
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving endCursor = {endCursor!r}')
|
||||
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
|
||||
variables = self._variablesFormat.format(**locals())
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
|
||||
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)
|
||||
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
response = json.loads(r.text)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
|
||||
response = r._snscrape_json_obj
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
|
||||
return
|
||||
yield from self._response_to_items(response['data'], username)
|
||||
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
|
||||
yield from self._response_to_items(response['data'])
|
||||
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
|
||||
return
|
||||
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
|
||||
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
|
||||
|
||||
|
||||
class InstagramUserScraper(InstagramCommonScraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'An Instagram username')
|
||||
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
return cls('User', args.username, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramHashtagScraper(InstagramCommonScraper):
|
||||
name = 'instagram-hashtag'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Hashtag', args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class InstagramLocationScraper(InstagramCommonScraper):
|
||||
name = 'instagram-location'
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('locationid', help = 'An Instagram location ID', type = int)
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls('Location', args.locationid, retries = args.retries)
|
||||
|
||||
@@ -1,80 +1,209 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
name = 'twitter-search'
|
||||
class Tweet(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
id: int
|
||||
username: str
|
||||
outlinks: list
|
||||
outlinksss: str
|
||||
tcooutlinks: list
|
||||
tcooutlinksss: str
|
||||
|
||||
def __init__(self, query, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
def _get_feed_from_html(self, html):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
return feed
|
||||
|
||||
class Account(typing.NamedTuple, snscrape.base.Item):
|
||||
username: str
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return f'https://twitter.com/{self.username}'
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterCommonScraper(snscrape.base.Scraper):
|
||||
def _feed_to_items(self, feed):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||
|
||||
date = None
|
||||
timestampA = tweet.find('a', 'tweet-timestamp')
|
||||
if timestampA:
|
||||
timestampSpan = timestampA.find('span', '_timestamp')
|
||||
if timestampSpan and timestampSpan.has_attr('data-time'):
|
||||
date = datetime.datetime.fromtimestamp(int(timestampSpan['data-time']), datetime.timezone.utc)
|
||||
if not date:
|
||||
logger.warning(f'Failed to extract date for {url}')
|
||||
|
||||
contentP = tweet.find('p', 'tweet-text')
|
||||
content = None
|
||||
outlinks = []
|
||||
tcooutlinks = []
|
||||
if contentP:
|
||||
content = contentP.text
|
||||
for a in contentP.find_all('a'):
|
||||
if a.has_attr('href') and not a['href'].startswith('/') and (not a.has_attr('class') or 'u-hidden' not in a['class']):
|
||||
if a.has_attr('data-expanded-url'):
|
||||
outlinks.append(a['data-expanded-url'])
|
||||
else:
|
||||
logger.warning(f'Ignoring link without expanded URL on {url}: {a["href"]}')
|
||||
tcooutlinks.append(a['href'])
|
||||
else:
|
||||
logger.warning(f'Failed to extract content for {url}')
|
||||
card = tweet.find('div', 'card2')
|
||||
if card and 'has-autoplayable-media' not in card['class']:
|
||||
for div in card.find_all('div'):
|
||||
if div.has_attr('data-card-url'):
|
||||
outlinks.append(div['data-card-url'])
|
||||
tcooutlinks.append(div['data-card-url'])
|
||||
outlinks = list(dict.fromkeys(outlinks)) # Deduplicate in case the same link was shared more than once within this tweet; may change order on Python 3.6 or older
|
||||
tcooutlinks = list(dict.fromkeys(tcooutlinks))
|
||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers['content-type'] != 'application/json;charset=utf-8':
|
||||
logger.error(f'Content type of {r.url} is not JSON')
|
||||
return False
|
||||
return True
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
return True, None
|
||||
|
||||
|
||||
class TwitterSearchScraper(TwitterCommonScraper):
|
||||
name = 'twitter-search'
|
||||
|
||||
def __init__(self, query, cursor = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._query = query
|
||||
self._cursor = cursor
|
||||
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
||||
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
|
||||
|
||||
def _get_guest_token(self):
|
||||
logger.info(f'Retrieving guest token from search page')
|
||||
r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
|
||||
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
|
||||
if not match:
|
||||
raise RuntimeError('Unable to find guest token')
|
||||
return match.group(1)
|
||||
|
||||
def _check_scroll_response(self, r):
|
||||
if r.status_code == 429:
|
||||
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
|
||||
return True, None
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
return False, f'content type is not JSON'
|
||||
if r.status_code != 200:
|
||||
return False, f'non-200 status code'
|
||||
return True, None
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {self._query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
return
|
||||
newestID = feed[0]['data-item-id']
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
headers = {
|
||||
'User-Agent': self._userAgent,
|
||||
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
|
||||
'Referer': self._baseUrl,
|
||||
}
|
||||
guestToken = None
|
||||
cursor = self._cursor
|
||||
while True:
|
||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
||||
r = self._get('https://twitter.com/i/search/timeline',
|
||||
params = {
|
||||
'f': 'tweets',
|
||||
'vertical': 'default',
|
||||
'lang': 'en',
|
||||
'q': self._query,
|
||||
'include_available_features': '1',
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback)
|
||||
if not guestToken:
|
||||
guestToken = self._get_guest_token()
|
||||
headers['x-guest-token'] = guestToken
|
||||
|
||||
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
|
||||
if not feed:
|
||||
return
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
logger.info(f'Retrieving scroll page {cursor}')
|
||||
params = {
|
||||
'include_profile_interstitial_type': '1',
|
||||
'include_blocking': '1',
|
||||
'include_blocked_by': '1',
|
||||
'include_followed_by': '1',
|
||||
'include_want_retweets': '1',
|
||||
'include_mute_edge': '1',
|
||||
'include_can_dm': '1',
|
||||
'include_can_media_tag': '1',
|
||||
'skip_status': '1',
|
||||
'cards_platform': 'Web-12',
|
||||
'include_cards': '1',
|
||||
'include_composer_source': 'true',
|
||||
'include_ext_alt_text': 'true',
|
||||
'include_reply_count': '1',
|
||||
'tweet_mode': 'extended',
|
||||
'include_entities': 'true',
|
||||
'include_user_entities': 'true',
|
||||
'include_ext_media_color': 'true',
|
||||
'include_ext_media_availability': 'true',
|
||||
'send_error_codes': 'true',
|
||||
'simple_quoted_tweets': 'true',
|
||||
'q': self._query,
|
||||
'tweet_search_mode': 'live',
|
||||
'count': '100',
|
||||
'query_source': 'spelling_expansion_revert_click',
|
||||
}
|
||||
if cursor:
|
||||
params['cursor'] = cursor
|
||||
params['pc'] = '1'
|
||||
params['spelling_corrections'] = '1'
|
||||
params['ext'] = 'mediaStats%2CcameraMoment'
|
||||
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
|
||||
if r.status_code == 429:
|
||||
guestToken = None
|
||||
continue
|
||||
try:
|
||||
obj = r.json()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f'Received invalid JSON from Twitter: {e!s}')
|
||||
raise RuntimeError('Received invalid JSON from Twitter') from e
|
||||
|
||||
# No data format test, just a hard and loud crash if anything's wrong :-)
|
||||
newCursor = None
|
||||
for instruction in obj['timeline']['instructions']:
|
||||
if 'addEntries' in instruction:
|
||||
entries = instruction['addEntries']['entries']
|
||||
elif 'replaceEntry' in instruction:
|
||||
entries = [instruction['replaceEntry']['entry']]
|
||||
else:
|
||||
continue
|
||||
for entry in entries:
|
||||
if entry['entryId'].startswith('sq-I-t-'):
|
||||
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
|
||||
tweetID = tweet['id']
|
||||
content = tweet['full_text']
|
||||
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
|
||||
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
|
||||
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
|
||||
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
|
||||
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||
elif entry['entryId'] == 'sq-cursor-bottom':
|
||||
newCursor = entry['content']['operation']['cursor']['value']
|
||||
if not newCursor or newCursor == cursor:
|
||||
# End of pagination
|
||||
break
|
||||
cursor = newCursor
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('--cursor', metavar = 'CURSOR')
|
||||
subparser.add_argument('query', help = 'A Twitter search string')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.query, retries = args.retries)
|
||||
return cls(args.query, cursor = args.cursor, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterUserScraper(TwitterSearchScraper):
|
||||
@@ -106,3 +235,136 @@ class TwitterHashtagScraper(TwitterSearchScraper):
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.hashtag, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterThreadScraper(TwitterCommonScraper):
|
||||
name = 'twitter-thread'
|
||||
|
||||
def __init__(self, tweetID = None, **kwargs):
|
||||
if tweetID is not None and tweetID.strip('0123456789') != '':
|
||||
raise ValueError('Invalid tweet ID, must be numeric')
|
||||
super().__init__(**kwargs)
|
||||
self._tweetID = tweetID
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||
|
||||
# Fetch the page of the last tweet in the thread
|
||||
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
|
||||
# Extract tweets on that page in the correct order; first, the tweet that was supplied, then the ancestors with pagination if necessary
|
||||
tweet = soup.find('div', 'ThreadedConversation--permalinkTweetWithAncestors')
|
||||
if tweet:
|
||||
tweet = tweet.find('div', 'tweet')
|
||||
if not tweet:
|
||||
logger.warning('Tweet does not exist, is not a thread, or does not have ancestors')
|
||||
return
|
||||
items = list(self._feed_to_items([tweet]))
|
||||
assert len(items) == 1
|
||||
yield items[0]
|
||||
username = items[0].username
|
||||
|
||||
ancestors = soup.find('div', 'ThreadedConversation--ancestors')
|
||||
if not ancestors:
|
||||
logger.warning('Tweet does not have ancestors despite claiming to')
|
||||
return
|
||||
feed = reversed(ancestors.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
# If necessary, iterate through pagination until reaching the initial tweet
|
||||
streamContainer = ancestors.find('div', 'stream-container')
|
||||
if not streamContainer.has_attr('data-max-position') or streamContainer['data-max-position'] == '':
|
||||
return
|
||||
minPosition = streamContainer['data-max-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'https://twitter.com/i/{username}/conversation/{self._tweetID}?include_available_features=1&include_entities=1&min_position={minPosition}',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
feed = reversed(soup.find_all('li', 'js-stream-item'))
|
||||
yield from self._feed_to_items(feed)
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
minPosition = obj['max_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('tweetID', help = 'A tweet ID of the last tweet in a thread')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(tweetID = args.tweetID, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListPostsScraper(TwitterSearchScraper):
|
||||
name = 'twitter-list-posts'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(f'list:{listName}', **kwargs)
|
||||
self._listName = listName
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
|
||||
class TwitterListMembersScraper(TwitterCommonScraper):
|
||||
name = 'twitter-list-members'
|
||||
|
||||
def __init__(self, listName, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._user, self._list = listName.split('/')
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||
|
||||
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code != 200:
|
||||
logger.warning('List not found')
|
||||
return
|
||||
soup = bs4.BeautifulSoup(r.text, 'lxml')
|
||||
container = soup.find('div', 'stream-container')
|
||||
if not container:
|
||||
raise RuntimeError('Unable to find container')
|
||||
items = container.find_all('li', 'js-stream-item')
|
||||
if not items:
|
||||
logger.warning('Empty list')
|
||||
return
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
|
||||
if not container.has_attr('data-min-position') or container['data-min-position'] == '':
|
||||
return
|
||||
maxPosition = container['data-min-position']
|
||||
while True:
|
||||
r = self._get(
|
||||
f'{baseUrl}/timeline?include_available_features=1&include_entities=1&max_position={maxPosition}&reset_error_state=false',
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback
|
||||
)
|
||||
obj = json.loads(r.text)
|
||||
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
|
||||
items = soup.find_all('li', 'js-stream-item')
|
||||
for item in items:
|
||||
yield Account(username = item.find('div', 'account')['data-screen-name'])
|
||||
if not obj['has_more_items']:
|
||||
break
|
||||
maxPosition = obj['min_position']
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.list, retries = args.retries)
|
||||
|
||||
107
snscrape/modules/vkontakte.py
Normal file
107
snscrape/modules/vkontakte.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class VKontakteUserScraper(snscrape.base.Scraper):
|
||||
name = 'vkontakte-user'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
for post in soup.find_all('div', class_ = 'post'):
|
||||
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
|
||||
textDiv = post.find('div', class_ = 'wall_post_text')
|
||||
yield VKontaktePost(
|
||||
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
|
||||
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
|
||||
content = textDiv.text if textDiv else None,
|
||||
)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
baseUrl = f'https://vk.com/{self._username}'
|
||||
|
||||
logger.info('Retrieving initial data')
|
||||
r = self._get(baseUrl, headers = headers)
|
||||
if r.status_code == 404:
|
||||
logger.error('Wall does not exist')
|
||||
return
|
||||
elif r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
|
||||
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
|
||||
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
|
||||
|
||||
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
|
||||
logger.error('Private profile')
|
||||
return
|
||||
|
||||
profileDeleted = soup.find('h5', class_ = 'profile_deleted_text')
|
||||
if profileDeleted:
|
||||
# Unclear what this state represents, so just log website text.
|
||||
logger.error(profileDeleted.text)
|
||||
return
|
||||
|
||||
newestPost = soup.find('div', class_ = 'post')
|
||||
if not newestPost:
|
||||
logger.info('Wall has no posts')
|
||||
return
|
||||
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
|
||||
# If there is a pinned post, we need its ID for the pagination requests
|
||||
if 'post_fixed' in newestPost.attrs['class']:
|
||||
fixedPostID = newestPost.attrs['id'].split('_')[1]
|
||||
else:
|
||||
fixedPostID = ''
|
||||
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
for offset in itertools.count(start = 10, step = 10):
|
||||
logger.info('Retrieving next page')
|
||||
r = self._post(
|
||||
'https://vk.com/al_wall.php',
|
||||
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
|
||||
headers = headers
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.error(f'Got status code {r.status_code}')
|
||||
return
|
||||
# Convert to JSON and read the HTML payload. Note that this implicitly converts the data to a Python string (i.e., Unicode), away from a windows-1251-encoded bytes.
|
||||
posts = r.json()['payload'][1][0]
|
||||
if posts.startswith('<div class="page_block no_posts">'):
|
||||
# Reached the end
|
||||
break
|
||||
if not posts.startswith('<div id="post'):
|
||||
logger.error(f'Got an unknown response: {posts[:200]!r}...')
|
||||
break
|
||||
soup = bs4.BeautifulSoup(posts, 'lxml')
|
||||
yield from self._soup_to_items(soup, baseUrl)
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A VK username')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
7
snscrape/version.py
Normal file
7
snscrape/version.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import pkg_resources
|
||||
|
||||
|
||||
try:
|
||||
__version__ = pkg_resources.get_distribution('snscrape').version
|
||||
except pkg_resources.DistributionNotFound:
|
||||
__version__ = None
|
||||
Reference in New Issue
Block a user