76 Commits

Author SHA1 Message Date
JustAnotherArchivist
7b3c7deb28 Catch login redirects on Instagram 2020-05-30 00:56:34 +00:00
JustAnotherArchivist
040a11656c Update README 2020-05-30 00:53:52 +00:00
JustAnotherArchivist
1459245258 Consistently raise ScraperException on fatal errors 2020-05-30 00:53:49 +00:00
JustAnotherArchivist
dbe4c5ce55 Remove Google+ module
Google+ was mostly shut down in early 2019. What remained (Google+ for G Suite) was renamed to Google Currents and is for internal communication only (and therefore out of scope for snscrape).
2020-05-30 00:35:06 +00:00
JustAnotherArchivist
80491ecc2c Remove Gab module
Since Gab's move to a fork of Mastodon in July 2019, the module had been broken, and a new module would better be written from scratch as the platform changed entirely.
2020-05-30 00:23:33 +00:00
JustAnotherArchivist
1a71b58101 Add support for Telegram
Closes #50
2020-05-29 23:44:01 +00:00
JustAnotherArchivist
0ce37a69d4 Log exception details on crashes 2020-05-29 22:29:23 +00:00
JustAnotherArchivist
722bfd5f7c Handle Twitter tombstones
Fixes #63
2020-05-29 22:12:37 +00:00
JustAnotherArchivist
b6cc3180d9 Force TwitterThreadScraper and TwitterListMembersScraper to fetch the old design 2020-03-04 00:40:49 +00:00
JustAnotherArchivist
613395d1c2 Port TwitterSearchScraper to redesign
Fixes #57
2020-03-04 00:40:49 +00:00
JustAnotherArchivist
82a87b7b5a Merge pull request #53 from JackDallas/add-more-insta-fields
Add more fields to the instagram scraper
2020-02-09 23:48:59 +00:00
Jack Dallas
9568028bf9 Update changed fields 2020-02-07 11:30:16 +00:00
JustAnotherArchivist
6df351772e Fix crash in Facebook scraper on link-less entries 2020-02-05 16:15:10 +00:00
JustAnotherArchivist
541173b0c8 Merge pull request #54 from jodizzle/fix/vkontakte-user
Fix vkontakte-user: pagination returns JSON now, and handle some unscrapable profiles.
2020-02-05 14:56:12 +00:00
Jody Leonard
b6772d3778 vkontakte-user: Handle additional un-scrapeable profile case 2019-10-31 16:01:29 -04:00
Jody Leonard
20ea117a2c Fix vkontakte-user pagination 2019-10-30 22:29:49 -04:00
JackDallas
ff54c350bc Add more fields to the instagram scraper 2019-08-30 12:43:02 +01:00
JustAnotherArchivist
e6aae35304 Use setuptools_scm for versioning through git tags 2019-07-01 17:41:18 +00:00
JustAnotherArchivist
b698a201f5 Update scraper list 2019-07-01 16:05:21 +00:00
JustAnotherArchivist
7fe72cf708 Add a note about reporting issues with proper debugging information 2019-07-01 16:01:11 +00:00
JustAnotherArchivist
4651cde447 Refactor CLI logging and add --dump-locals for better debugging 2019-07-01 15:46:10 +00:00
JustAnotherArchivist
c99cc4b5d3 Remove existing logging handlers 2019-07-01 15:42:06 +00:00
JustAnotherArchivist
628074d6fc Print contents when ignoring a link-less entry 2019-07-01 01:35:00 +00:00
JustAnotherArchivist
64b293bd9e Add support for media sets
Closes #48
2019-07-01 01:34:17 +00:00
JustAnotherArchivist
180f4dfeb7 Add support for photo.php URLs
Fixes #42
2019-06-30 18:36:39 +00:00
JustAnotherArchivist
6d6e3fa16c Fix crash on (some?) inexistent groups 2019-06-30 18:36:30 +00:00
JustAnotherArchivist
5f7e6936c1 Add support for Facebook groups
Closes #47
2019-06-30 17:16:09 +00:00
JustAnotherArchivist
e2c05c9e0c Split common code off into FacebookCommonScraper and refactor odd link detection in preparation of group scraping 2019-06-30 16:28:33 +00:00
JustAnotherArchivist
14e11b28d2 Add support for Twitter lists
Closes #46
2019-06-30 14:39:29 +00:00
JustAnotherArchivist
1a07b3b7e8 Add support for Twitter threads 2019-06-30 02:11:46 +00:00
JustAnotherArchivist
4d8cc7bdb9 Extract outlinks from Facebook 2019-06-27 15:29:05 +00:00
JustAnotherArchivist
eec83f181e Check HTTP status code before attempting parsing 2019-06-27 15:25:26 +00:00
JustAnotherArchivist
fae7432c64 Log details about failed JSON parsing 2019-06-27 15:25:08 +00:00
JustAnotherArchivist
757818474d Add tweet ID and username fields to Tweet items 2019-06-23 11:48:54 +00:00
JustAnotherArchivist
e6c934c0b8 Retrieve as many posts at once as possible for Instagram hashtags 2019-06-21 09:56:12 +00:00
JustAnotherArchivist
d2315feec1 Add support for Instagram locations 2019-06-21 09:55:30 +00:00
JustAnotherArchivist
765ceeeb10 More complete and more readable exception dump 2019-06-18 14:25:38 +00:00
JustAnotherArchivist
731a2e8c8b Check that Instagram returned valid JSON, take 2
Fixes #22
2019-06-10 15:03:15 +00:00
JustAnotherArchivist
7d1916292c Twitter: stop recursion based on whether the server returns the same position instead of detecting an empty feed
Fixes #37
2019-06-10 14:38:25 +00:00
JustAnotherArchivist
0d509c4ba0 Check that Instagram returned valid JSON (fixes #22) 2019-05-30 15:04:05 +00:00
JustAnotherArchivist
907a003a59 Fix crash when Twitter search produces no results (fixes #41) 2019-05-24 11:51:50 +00:00
JustAnotherArchivist
8ada279b57 Add warning if Twitter module gets no results 2019-05-24 11:50:39 +00:00
JustAnotherArchivist
900eae54a6 Ignore branded content link on Facebook silently 2019-05-24 11:49:44 +00:00
JustAnotherArchivist
7989af27b5 Handle tweets by temporarily blocked accounts (which show up in the search results but don't have a date or content) 2019-05-21 22:37:43 +00:00
JustAnotherArchivist
e528ca3f26 Dump locals only for snscrape modules (closes #39) 2019-05-18 01:08:49 +00:00
JustAnotherArchivist
32a427dac3 Fix pagination on Twitter (fixes #40) 2019-05-18 01:08:00 +00:00
JustAnotherArchivist
7001983556 Skip timeline entries that don't have a link (fixes #36) 2019-05-16 23:17:46 +00:00
JustAnotherArchivist
64438afc92 Work around tweet URLs that don't have a data-expanded-url attribute (fixes #38) 2019-05-16 22:51:22 +00:00
JustAnotherArchivist
9e6538556a Dump also the deeper frames, not just the get_items one 2019-05-16 22:48:35 +00:00
JustAnotherArchivist
9c8bbf051c Fix order of processing in Twitter module for more useful locals dump output 2019-05-16 22:22:53 +00:00
JustAnotherArchivist
c6a11298ac Fix missing linebreak in locals dump output 2019-05-16 22:22:21 +00:00
JustAnotherArchivist
02cbf6ddf6 Dump locals to a temporary file in case of an exception 2019-05-16 18:29:30 +00:00
JustAnotherArchivist
3817aa59d4 Add support for extracting links from tweets (including cards)
Both the t.co and the original URLs can be extracted. Note that card links are always t.co since Twitter's HTML does not include the original URL for those.
2019-05-16 16:42:52 +00:00
JustAnotherArchivist
46a51008f8 Fix Instagram signature calculation 2019-05-16 16:19:51 +00:00
JustAnotherArchivist
f91979eb32 Add --max-position option to twitter-search scraper as a workaround for pagination stopping early (#37)
The value needs to be of the format 'TWEET-<seenID>-<newestID>' where <seenID> is the last result that was returned by a previous scrape and <newestID> is the first result returned by the initial scrape.
2019-05-10 17:30:15 +00:00
JustAnotherArchivist
85fff319bc Disable Twitter's spelling correction
src=typd means "this is what was typed in and could be incorrect". src=spxr is "no, I really mean that". src=sprv appears to be an alias of spxr that is no longer used.
2019-05-10 16:43:59 +00:00
JustAnotherArchivist
6b145526b7 Update README with new modules 2019-04-21 23:10:32 +02:00
JustAnotherArchivist
abf31764b1 Version 0.2.0 2019-04-21 23:03:21 +02:00
JustAnotherArchivist
64693f74bb Update Instagram query hash 2019-04-19 01:47:38 +02:00
JustAnotherArchivist
a7d08ed51c Remove leftover debugging print 2019-04-19 01:40:29 +02:00
JustAnotherArchivist
f48ca7726e Add support for Gab 2019-04-19 00:40:43 +02:00
JustAnotherArchivist
78c295f7e0 Add support for VKontakte (fixes #13) 2019-04-18 18:39:21 +02:00
JustAnotherArchivist
a5aca1a14f Add support for Instagram hashtags (fixes #29) 2019-04-18 16:14:54 +02:00
JustAnotherArchivist
96f7d871c1 Ignore Scraper subclasses which don't set a name 2019-04-18 16:14:26 +02:00
JustAnotherArchivist
b5dfd37949 Support unix timestamps in --since 2019-04-18 16:01:35 +02:00
JustAnotherArchivist
b511397791 Add --since option to return only results newer than a certain date (fixes #19) 2019-04-18 15:12:29 +02:00
JustAnotherArchivist
536fcb3303 Return proper items from scrapers including clean URLs (fixes #9 and #10) 2019-04-18 14:44:21 +02:00
JustAnotherArchivist
f8d812f799 Include permalink.php, events, and notes (fixes #32) 2019-04-18 04:22:47 +02:00
JustAnotherArchivist
c2cebd9166 Accept-Language header to get an English response unconditionally 2019-04-18 03:58:37 +02:00
JustAnotherArchivist
73bc99596f Treat Twitter responses without a Content-Type header as invalid (fixes #21) 2019-04-18 02:24:35 +02:00
JustAnotherArchivist
8458c12218 Rewrite link extraction on Facebook (fixes #17)
Facebook's returned HTML has a large number of inconsistencies; some (most) pages include a <link rel="canonical" /> but some don't, for example. This was at the root of the failing post extraction for some Facebook pages (#17). The previous link extraction technique was also quite poor for other reasons though. The new method uses the relevant CSS classes instead. Despite probably being the result of a CSS minimiser or similar, these seem to be quite stable: they haven't changed in the past two years (but the more readable ones have!).
2019-04-18 02:14:21 +02:00
JustAnotherArchivist
b59c7e8d8f Merge pull request #28 from peterk/master
Adds socks proxy support (via requests)
2019-03-11 13:32:07 +01:00
Peter Krantz
3ceb849d98 Adds socks proxy support (via requests) 2019-01-10 22:54:42 +01:00
JustAnotherArchivist
f5ee1f7ac5 Merge pull request #26 from ludios/avoid-twitter-bans
twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans
2018-12-25 02:19:17 +01:00
Ivan Kozik
1984110f78 twitter: randomize user agent to avoid Twitter's (IP, UA)-keyed bans 2018-12-24 08:03:33 +00:00
JustAnotherArchivist
c5a5dcb92c snscrape is now on PyPI 2018-10-09 17:26:03 +02:00
10 changed files with 1022 additions and 223 deletions

View File

@@ -2,10 +2,11 @@
snscrape is a scraper for social networking services (SNS). It scrapes things like user profiles, hashtags, or searches and returns the discovered items, e.g. the relevant posts.
The following services are currently supported:
* Facebook: user profiles
* Google Plus: user profiles
* Instagram: user profiles
* Twitter: user profiles, hashtags, and searches
* Facebook: user profiles and groups
* Instagram: user profiles, hashtags, and locations
* Telegram: channels
* Twitter: user profiles, hashtags, searches, threads, and lists (members as well as posts)
* VKontakte: user profiles
## Requirements
snscrape requires Python 3.6 or higher. The Python package dependencies are installed automatically when you install snscrape.
@@ -13,6 +14,10 @@ snscrape requires Python 3.6 or higher. The Python package dependencies are inst
Note that one of the dependencies, lxml, also requires libxml2 and libxslt to be installed.
## Installation
pip3 install snscrape
If you want to use the development version:
pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git
## Usage
@@ -22,7 +27,7 @@ To get all tweets by Jason Scott (@textfiles):
It's usually useful to redirect the output to a file for further processing, e.g. in bash using the filename `@textfiles-tweets`:
```bash
snscrape twitter-user textfiles >@textfiles-tweets
snscrape twitter-user textfiles >twitter-@textfiles
```
To get the latest 100 tweets with the hashtag #archiveteam:
@@ -33,6 +38,9 @@ To get the latest 100 tweets with the hashtag #archiveteam:
It is also possible to use snscrape as a library in Python, but this is currently undocumented.
## Issue reporting
If you discover an issue with snscrape, please report it at <https://github.com/JustAnotherArchivist/snscrape/issues>. If possible please run snscrape with `-vv` and `--dump-locals` and include the log output as well as the dump files referenced in the log in the issue. Note that the files may contain sensitive information in some cases and could potentially be used to identify you (e.g. if the service includes your IP address in its response). If you prefer to arrange a file transfer privately, just mention that in the issue.
## License
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

View File

@@ -3,7 +3,6 @@ import setuptools
setuptools.setup(
name = 'snscrape',
version = '0.1.3',
description = 'A social networking service scraper',
author = 'JustAnotherArchivist',
url = 'https://github.com/JustAnotherArchivist/snscrape',
@@ -13,7 +12,9 @@ setuptools.setup(
'Programming Language :: Python :: 3.6',
],
packages = ['snscrape', 'snscrape.modules'],
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
setup_requires = ['setuptools_scm'],
use_scm_version = True,
install_requires = ['requests[socks]', 'lxml', 'beautifulsoup4'],
entry_points = {
'console_scripts': [
'snscrape = snscrape.cli:main',

View File

@@ -1,25 +1,181 @@
import argparse
import contextlib
import datetime
import inspect
import logging
import snscrape.base
import snscrape.modules
import requests.models
# Imported in parse_args() after setting up the logger:
#import snscrape.base
#import snscrape.modules
#import snscrape.version
import tempfile
logger = logging.getLogger(__name__)
## Logging
dumpLocals = False
logger = logging # Replaced below after setting the logger class
class Logger(logging.Logger):
def _log_with_stack(self, level, *args, **kwargs):
super().log(level, *args, **kwargs)
if dumpLocals:
stack = inspect.stack()
if len(stack) >= 3:
name = _dump_stack_and_locals(stack[2:][::-1])
super().log(level, f'Dumped stack and locals to {name}')
def warning(self, *args, **kwargs):
self._log_with_stack(logging.WARNING, *args, **kwargs)
def error(self, *args, **kwargs):
self._log_with_stack(logging.ERROR, *args, **kwargs)
def critical(self, *args, **kwargs):
self._log_with_stack(logging.CRITICAL, *args, **kwargs)
def log(self, level, *args, **kwargs):
if level >= logging.WARNING:
self._log_with_stack(level, *args, **kwargs)
else:
super().log(level, *args, **kwargs)
def _requests_preparedrequest_repr(name, request):
ret = []
ret.append(repr(request))
ret.append(f'\n {name}.method = {request.method}')
ret.append(f'\n {name}.url = {request.url}')
ret.append(f'\n {name}.headers = \\')
for field in request.headers:
ret.append(f'\n {field} = {_repr("_", request.headers[field])}')
if request.body:
ret.append(f'\n {name}.body = ')
ret.append(_repr('_', request.body).replace('\n', '\n '))
return ''.join(ret)
def _requests_response_repr(name, response, withHistory = True):
ret = []
ret.append(repr(response))
ret.append(f'\n {name}.url = {response.url}')
ret.append(f'\n {name}.request = ')
ret.append(_repr('_', response.request).replace('\n', '\n '))
if withHistory and response.history:
ret.append(f'\n {name}.history = [')
for previousResponse in response.history:
ret.append(f'\n ')
ret.append(_requests_response_repr('_', previousResponse, withHistory = False).replace('\n', '\n '))
ret.append('\n ]')
ret.append(f'\n {name}.status_code = {response.status_code}')
ret.append(f'\n {name}.headers = \\')
for field in response.headers:
ret.append(f'\n {field} = {_repr("_", response.headers[field])}')
ret.append(f'\n {name}.content = {_repr("_", response.content)}')
return ''.join(ret)
def _repr(name, value):
if type(value) is requests.models.Response:
return _requests_response_repr(name, value)
if type(value) is requests.models.PreparedRequest:
return _requests_preparedrequest_repr(name, value)
valueRepr = repr(value)
if '\n' in valueRepr:
return ''.join(['\\\n ', valueRepr.replace('\n', '\n ')])
return valueRepr
@contextlib.contextmanager
def _dump_locals_on_exception():
try:
yield
except Exception as e:
trace = inspect.trace()
if len(trace) >= 2:
name = _dump_stack_and_locals(trace[1:], exc = e)
logger.fatal(f'Dumped stack and locals to {name}')
raise
def _dump_stack_and_locals(trace, exc = None):
with tempfile.NamedTemporaryFile('w', prefix = 'snscrape_locals_', delete = False) as fp:
if exc is not None:
fp.write('Exception:\n')
fp.write(f' {type(exc).__module__}.{type(exc).__name__}: {exc!s}\n')
fp.write(f' args: {exc.args!r}\n')
fp.write('\n')
fp.write('Stack:\n')
for frameRecord in trace:
fp.write(f' File "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}\n')
for line in frameRecord.code_context:
fp.write(f' {line.strip()}\n')
fp.write('\n')
for frameRecord in trace:
module = inspect.getmodule(frameRecord[0])
if not module.__name__.startswith('snscrape.') and module.__name__ != 'snscrape':
continue
locals_ = frameRecord[0].f_locals
fp.write(f'Locals from file "{frameRecord.filename}", line {frameRecord.lineno}, in {frameRecord.function}:\n')
for variableName in locals_:
variable = locals_[variableName]
varRepr = _repr(variableName, variable)
fp.write(f' {variableName} {type(variable)} = ')
fp.write(varRepr.replace('\n', '\n '))
fp.write('\n')
fp.write('\n')
if 'self' in locals_ and hasattr(locals_['self'], '__dict__'):
fp.write(f'Object dict:\n')
fp.write(repr(locals_['self'].__dict__))
fp.write('\n\n')
name = fp.name
return name
def parse_datetime_arg(arg):
for format in ('%Y-%m-%d %H:%M:%S %z', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %z', '%Y-%m-%d'):
try:
d = datetime.datetime.strptime(arg, format)
except ValueError:
continue
else:
if d.tzinfo is None:
return d.replace(tzinfo = datetime.timezone.utc)
return d
# Try treating it as a unix timestamp
try:
d = datetime.datetime.fromtimestamp(int(arg), datetime.timezone.utc)
except ValueError:
pass
else:
return d
raise argparse.ArgumentTypeError(f'Cannot parse {arg!r} into a datetime object')
def parse_args():
import snscrape.base
import snscrape.modules
import snscrape.version
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--version', action = 'version', version = f'snscrape {snscrape.version.__version__}')
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
parser.add_argument('--dump-locals', dest = 'dumpLocals', action = 'store_true', default = False, help = 'Dump local variables on serious log messages (warnings or higher)')
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME')
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
classes = snscrape.base.Scraper.__subclasses__()
for cls in classes:
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
cls.setup_parser(subparser)
subparser.set_defaults(cls = cls)
if cls.name is not None:
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
cls.setup_parser(subparser)
subparser.set_defaults(cls = cls)
classes.extend(cls.__subclasses__())
args = parser.parse_args()
@@ -31,7 +187,16 @@ def parse_args():
return args
def setup_logging(verbosity):
def setup_logging():
logging.setLoggerClass(Logger)
global logger
logger = logging.getLogger(__name__)
def configure_logging(verbosity, dumpLocals_):
global dumpLocals
dumpLocals = dumpLocals_
rootLogger = logging.getLogger()
# Set level
@@ -44,6 +209,10 @@ def setup_logging(verbosity):
# Create formatter
formatter = logging.Formatter('{asctime}.{msecs:03.0f} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
# Remove existing handlers
for handler in rootLogger.handlers:
rootLogger.removeHandler(handler)
# Add stream handler
handler = logging.StreamHandler()
handler.setFormatter(formatter)
@@ -51,15 +220,23 @@ def setup_logging(verbosity):
def main():
setup_logging()
args = parse_args()
setup_logging(args.verbosity)
configure_logging(args.verbosity, args.dumpLocals)
scraper = args.cls.from_args(args)
i = 0
for i, item in enumerate(scraper.get_items(), start = 1):
print(item)
if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results')
break
else:
logger.info(f'Done, found {i} results')
with _dump_locals_on_exception():
for i, item in enumerate(scraper.get_items(), start = 1):
if args.since is not None and item.date < args.since:
logger.info(f'Exiting due to reaching older results than {args.since}')
break
if args.format is not None:
print(args.format.format(**item._asdict()))
else:
print(item)
if args.maxResults and i >= args.maxResults:
logger.info(f'Exiting after {i} results')
break
else:
logger.info(f'Done, found {i} results')

View File

@@ -1,33 +1,135 @@
import bs4
import datetime
import json
import logging
import re
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class FacebookUserScraper(snscrape.base.Scraper):
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: typing.Optional[str]
outlinks: list
outlinksss: str
def __str__(self):
return self.cleanUrl
class FacebookCommonScraper(snscrape.base.Scraper):
def _clean_url(self, dirtyUrl):
u = urllib.parse.urlparse(dirtyUrl)
if u.path == '/permalink.php':
# Retain only story_fbid and id parameters
q = urllib.parse.parse_qs(u.query)
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
elif u.path == '/photo.php':
# Retain only the fbid parameter
q = urllib.parse.parse_qs(u.query)
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('fbid', q['fbid'][0]),)), '')
elif u.path == '/media/set/':
# Retain only the set parameter and try to shorten it to the minimum
q = urllib.parse.parse_qs(u.query)
setVal = q['set'][0]
if setVal.rstrip('0123456789').endswith('.a.'):
setVal = f'a.{setVal.rsplit(".", 1)[1]}'
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('set', setVal),)), '')
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
# No manipulation of the path needed, but strip the query string
clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.split('/')[2] in ('photos', 'videos'):
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
# But to be safe, also handle URLs that don't have that crap correctly.
if u.path.count('/') == 4:
clean = (u.scheme, u.netloc, u.path, '', '')
elif u.path.count('/') == 5:
# Strip out the third path component
pathcomps = u.path.split('/')
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
else:
return dirtyUrl
else:
# If we don't recognise the URL, just return the original one.
return dirtyUrl
return urllib.parse.urlunsplit(clean)
def _is_odd_link(self, href, entryText, mode):
# Returns (isOddLink: bool, warn: bool|None)
if mode == 'user':
if not any(x in href for x in ('/posts/', '/photos/', '/videos/', '/permalink.php?', '/events/', '/notes/', '/photo.php?', '/media/set/')):
if href == '#' and 'new photo' in entryText and 'to the album' in entryText:
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
return True, False
elif href.startswith('/business/help/788160621327601/?'):
# Skip the help article about branded content
return True, False
else:
return True, True
return False, None
elif mode == 'group':
if not re.match(r'^/groups/[^/]+/permalink/\d+/(\?|$)', href):
return True, True
return False, None
def _soup_to_items(self, soup, baseUrl, mode):
cleanUrl = None # Value from previous iteration is used for warning on link-less entries
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
mediaSetA = entry.find('a', class_ = '_17z-')
if not mediaSetA and not entryA:
logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}')
continue
if mediaSetA and (not entryA or entryA['href'] == '#'):
href = mediaSetA['href']
elif entryA:
href = entryA['href']
oddLink, warn = self._is_odd_link(href, entry.text, mode)
if oddLink:
if warn:
logger.warning(f'Ignoring odd link: {href}')
continue
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
cleanUrl = self._clean_url(dirtyUrl)
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
contentDiv = entry.find('div', class_ = '_5pbx')
if contentDiv:
content = contentDiv.text
else:
content = None
outlinks = []
for a in entry.find_all('a'):
if not a.has_attr('href'):
continue
href = a.get('href')
if not href.startswith('https://l.facebook.com/l.php?'):
continue
query = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
if 'u' not in query or len(query['u']) != 1:
logger.warning(f'Ignoring odd outlink: {href}')
continue
outlink = query['u'][0]
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
outlinks.append(outlink)
yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
class FacebookUserScraper(FacebookCommonScraper):
name = 'facebook-user'
def __init__(self, username, **kwargs):
super().__init__(**kwargs)
self._username = username
def _soup_to_items(self, soup, username, baseUrl):
yielded = set()
for a in soup.find_all('a', href = re.compile(r'^/[^/]+/(posts|photos|videos)/[^/]*\d')):
href = a.get('href')
if href.startswith(f'/{username}/'):
link = urllib.parse.urljoin(baseUrl, href)
if link not in yielded:
yield snscrape.base.URLItem(link)
yielded.add(link)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=')
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
@@ -39,12 +141,9 @@ class FacebookUserScraper(snscrape.base.Scraper):
logger.warning('User does not exist')
return
elif r.status_code != 200:
logger.error('Got status code {r.status_code}')
return
raise snscrape.base.ScraperException('Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
username = re.sub(r'^https://www\.facebook\.com/([^/]+)/$', r'\1', soup.find('link').get('href')) # Canonical capitalisation
baseUrl = f'https://www.facebook.com/{username}/'
yield from self._soup_to_items(soup, username, baseUrl)
yield from self._soup_to_items(soup, baseUrl, 'user')
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
while nextPageLink:
@@ -54,8 +153,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
# Reproducing that would be difficult to get right, especially as Facebook's codebase evolves, so it's just not sent at all here.
r = self._get(urllib.parse.urljoin(baseUrl, nextPageLink.get('ajaxify')) + '&__a=1', headers = headers)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
response = json.loads(spuriousForLoopPattern.sub('', r.text))
assert 'domops' in response
assert len(response['domops']) == 1
@@ -65,7 +163,7 @@ class FacebookUserScraper(snscrape.base.Scraper):
assert response['domops'][0][2] == False
assert '__html' in response['domops'][0][3]
soup = bs4.BeautifulSoup(response['domops'][0][3]['__html'], 'lxml')
yield from self._soup_to_items(soup, username, baseUrl)
yield from self._soup_to_items(soup, baseUrl, 'user')
nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern)
@classmethod
@@ -75,3 +173,70 @@ class FacebookUserScraper(snscrape.base.Scraper):
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)
class FacebookGroupScraper(FacebookCommonScraper):
name = 'facebook-group'
def __init__(self, group, **kwargs):
super().__init__(**kwargs)
self._group = group
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
pageletDataPattern = re.compile(r'"GroupEntstreamPagelet",\{.*?\}(?=,\{)')
pageletDataPrefixLength = len('"GroupEntstreamPagelet",')
spuriousForLoopPattern = re.compile(r'^for \(;;\);')
baseUrl = f'https://www.facebook.com/groups/{self._group}/'
r = self._get(baseUrl, headers = headers)
if r.status_code == 404:
logger.warning('Group does not exist')
return
elif r.status_code != 200:
raise snscrape.base.ScraperException('Got status code {r.status_code}')
if 'content:{pagelet_group_mall:{container_id:"' not in r.text:
raise snscrape.base.ScraperException('Code container ID marker not found (does the group exist?)')
soup = bs4.BeautifulSoup(r.text, 'lxml')
# Posts are inside an HTML comment in two code tags with IDs listed in JS...
for codeContainerIdStart in ('content:{pagelet_group_mall:{container_id:"', 'content:{group_mall_after_tti:{container_id:"'):
codeContainerIdPos = r.text.index(codeContainerIdStart) + len(codeContainerIdStart)
codeContainerId = r.text[codeContainerIdPos : r.text.index('"', codeContainerIdPos)]
codeContainer = soup.find('code', id = codeContainerId)
if not codeContainer:
raise snscrape.base.ScraperException('Code container not found')
if type(codeContainer.string) is not bs4.element.Comment:
raise snscrape.base.ScraperException('Code container does not contain a comment')
codeSoup = bs4.BeautifulSoup(codeContainer.string, 'lxml')
yield from self._soup_to_items(codeSoup, baseUrl, 'group')
# Pagination
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
while True:
# As on the user profile pages, the web app sends a lot of additional parameters, but those all seem to be unnecessary (although some change the response format, e.g. from JSON to HTML)
r = self._get(
f'https://www.facebook.com/ajax/pagelet/generic.php/GroupEntstreamPagelet',
params = {'data': data, '__a': 1},
headers = headers,
)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
obj = json.loads(spuriousForLoopPattern.sub('', r.text))
if obj['payload'] == '':
# End of pagination
break
soup = bs4.BeautifulSoup(obj['payload'], 'lxml')
yield from self._soup_to_items(soup, baseUrl, 'group')
data = pageletDataPattern.search(r.text).group(0)[pageletDataPrefixLength:]
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('group', help = 'A group name or ID')
@classmethod
def from_args(cls, args):
return cls(args.group, retries = args.retries)

View File

@@ -1,102 +0,0 @@
import datetime
import itertools
import json
import logging
import re
import snscrape.base
logger = logging.getLogger(__name__)
class GooglePlusUserScraper(snscrape.base.Scraper):
name = 'googleplus-user'
def __init__(self, user, **kwargs):
super().__init__(**kwargs)
self._user = user
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
logger.info('Retrieving initial data')
r = self._get(f'https://plus.google.com/{self._user}', headers = headers)
if r.status_code == 404:
logger.warning('User does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# Global data; only needed for the session ID
#TODO: Make this more robust somehow
match = re.search(r'''(['"])FdrFJe\1\s*:\s*(['"])(?P<sid>.*?)\2''', r.text)
if not match:
logger.error('Unable to find session ID')
return
sid = match.group('sid')
# Page data
# As of 2018-05-18, the much simpler regex r'''<script[^>]*>AF_initDataCallback\(\{key: 'ds:6',.*?return (.*?)\}\}\);</script>''' would work also, but this is more generic and less likely to break:
match = re.search(r'''<script[^>]*>\s*(?:.*?)\s*\(\s*\{(?:|.*?,)\s*key\s*:\s*(['"])ds:6\1\s*,.*?,\s*data\s*:\s*function\s*\(\s*\)\s*\{\s*return\s*(?P<data>.*?)\}\s*\}\s*\)\s*;\s*</script>''', r.text, re.DOTALL)
if not match:
logger.error('Unable to extract data')
return
jsonData = match.group('data')
response = json.loads(jsonData)
if response[0][7] is None:
logger.info('User has no posts')
return
for postObj in response[0][7]:
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
cursor = response[0][1] # 'ADSJ_x'
if cursor is None:
# No further pages
return
baseDate = datetime.datetime.utcnow()
baseSeconds = baseDate.hour * 3600 + baseDate.minute * 60 + baseDate.second
userid = response[1] # Alternatively and more ugly: response[0][7][0][6]['33558957'][16]
for counter in itertools.count(start = 2):
logger.info('Retrieving next page')
reqid = 1 + baseSeconds + int(1e5) * counter
r = self._post(
f'https://plus.google.com/_/PlusAppUi/data?ds.extension=74333095&f.sid={sid}&hl=en-US&soc-app=199&soc-platform=1&soc-device=1&_reqid={reqid}&rt=c',
data = [('f.req', '[[[74333095,[{"74333095":["' + cursor + '","' + userid + '"]}],null,null,0]]]'), ('', '')],
headers = headers
)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
# As if everything up to here wasn't terrible already, this is where it gets *really* bad.
# The API contains a few junk characters at the beginning, apparently as an anti-CSRF measure.
# The remainder is effectively a self-made chunked transfer encoding but with decimal digits and including everything except the digits themselves in the chunk size.
# It sucks.
# Each chunk is actually one JSON object; you'd think that we can just read the first one and parse that, but there are some quirks that make this difficult.
# I was unable to figure out what the "chunk size" actually covers exactly; the response is UTF-8 encoded, but the chunk size matches neither the binary nor the decoded length.
# Enter the awful workaround: strip away the initial chunk size, then parse the beginning of the remaining data using a parser that doesn't care if there's junk after the JSON.
garbage = r.text
assert garbage[:6] == ")]}'\n\n" # anti-CSRF and two newlines
data = []
pos = 6
while garbage[pos].isdigit() or garbage[pos].isspace(): # Also strip leading whitespace
pos += 1
response = json.JSONDecoder().raw_decode(''.join(garbage[pos:]))[0] # Parses only the first structure in the data stream without throwing an error about the extra data at the end
for postObj in response[0][2]['74333095'][0][7]:
yield snscrape.base.URLItem(f'https://plus.google.com/{postObj[6]["33558957"][21]}')
cursor = response[0][2]['74333095'][0][1]
if cursor is None:
break
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('user', help = 'A Google Plus username (with leading "+") or numeric ID')
@classmethod
def from_args(cls, args):
return cls(args.user, retries = args.retries)

View File

@@ -1,74 +1,181 @@
import datetime
import hashlib
import json
import logging
import snscrape.base
import typing
logger = logging.getLogger(__name__)
class InstagramUserScraper(snscrape.base.Scraper):
name = 'instagram-user'
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
cleanUrl: str
dirtyUrl: str
date: datetime.datetime
content: str
thumbnailUrl: str
displayUrl: str
username: str
likes: int
comments: int
commentsDisabled: bool
isVideo: bool
def __init__(self, username, **kwargs):
def __str__(self):
return self.cleanUrl
class InstagramCommonScraper(snscrape.base.Scraper):
def __init__(self, mode, name, **kwargs):
super().__init__(**kwargs)
self._username = username
if mode not in ('User', 'Hashtag', 'Location'):
raise ValueError('Invalid mode')
self._mode = mode
self._name = name
def _response_to_items(self, response, username):
for node in response['user']['edge_owner_to_timeline_media']['edges']:
if self._mode == 'User':
self._initialUrl = f'https://www.instagram.com/{self._name}/'
self._pageName = 'ProfilePage'
self._responseContainer = 'user'
self._edgeXToMedia = 'edge_owner_to_timeline_media'
self._pageIDKey = 'id'
self._queryHash = 'f2405b236d85e8296cf30347c9f08c2a'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
elif self._mode == 'Hashtag':
self._initialUrl = f'https://www.instagram.com/explore/tags/{self._name}/'
self._pageName = 'TagPage'
self._responseContainer = 'hashtag'
self._edgeXToMedia = 'edge_hashtag_to_media'
self._pageIDKey = 'name'
self._queryHash = 'f92f56d47dc7a55b606908374b43a314'
self._variablesFormat = '{{"tag_name":"{pageID}","first":50,"after":"{endCursor}"}}'
elif self._mode == 'Location':
self._initialUrl = f'https://www.instagram.com/explore/locations/{self._name}/'
self._pageName = 'LocationsPage'
self._responseContainer = 'location'
self._edgeXToMedia = 'edge_location_to_media'
self._pageIDKey = 'id'
self._queryHash = '1b84447a4d8b6d6d0426fefb34514485'
self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}'
def _response_to_items(self, response):
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
code = node['node']['shortcode']
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
usernameQuery = '?taken-by=' + username
cleanUrl = f'https://www.instagram.com/p/{code}/'
yield InstagramPost(
cleanUrl = cleanUrl,
dirtyUrl = f'{cleanUrl}{usernameQuery}',
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
thumbnailUrl = node['node']['thumbnail_src'],
displayUrl = node['node']['display_url'],
username = username,
likes = node['node']['edge_media_preview_like']['count'],
comments = node['node']['edge_media_to_comment']['count'],
commentsDisabled = node['node']['comments_disabled'],
isVideo = node['node']['is_video'],
)
def _check_initial_page_callback(self, r):
if r.status_code != 200:
return True, None
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
try:
obj = json.loads(jsonData)
except json.JSONDecodeError:
return False, 'invalid JSON'
r._snscrape_json_obj = obj
return True, None
def _check_json_callback(self, r):
if r.status_code != 200:
return False, f'status code {r.status_code}'
try:
obj = json.loads(r.text)
except json.JSONDecodeError as e:
return False, f'invalid JSON ({e!r})'
r._snscrape_json_obj = obj
return True, None
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
logger.info('Retrieving initial data')
r = self._get(f'https://www.instagram.com/{self._username}/', headers = headers)
r = self._get(self._initialUrl, headers = headers, responseOkCallback = self._check_initial_page_callback)
if r.status_code == 404:
logger.warning('User does not exist')
logger.warning(f'{self._mode} does not exist')
return
elif r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
elif r.url.startswith('https://www.instagram.com/accounts/login/'):
raise snscrape.base.ScraperException('Redirected to login page')
response = r._snscrape_json_obj
rhxGis = response['rhx_gis'] if 'rhx_gis' in response else ''
if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0:
logger.info(f'{self._mode} has no posts')
return
jsonData = r.text.split('<script type="text/javascript">window._sharedData = ')[1].split(';</script>')[0] # May throw an IndexError if Instagram changes something again; we just let that bubble.
response = json.loads(jsonData)
rhxGis = response['rhx_gis']
if response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['count'] == 0:
logger.info('User has no posts')
return
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['edges']:
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']:
logger.warning('Private account')
return
userID = response['entry_data']['ProfilePage'][0]['graphql']['user']['id']
username = response['entry_data']['ProfilePage'][0]['graphql']['user']['username'] # Might have different capitalisation than self._username
yield from self._response_to_items(response['entry_data']['ProfilePage'][0]['graphql'], username)
if not response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey]
yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql'])
if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
while True:
logger.info(f'Retrieving endCursor = {endCursor!r}')
variables = f'{{"id":"{userID}","first":50,"after":"{endCursor}"}}'
variables = self._variablesFormat.format(**locals())
headers['X-Requested-With'] = 'XMLHttpRequest'
headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest()
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash=42323d64886122307be10013ad2dcc44&variables={variables}', headers = headers)
r = self._get(f'https://www.instagram.com/graphql/query/?query_hash={self._queryHash}&variables={variables}', headers = headers, responseOkCallback = self._check_json_callback)
if r.status_code != 200:
logger.error(f'Got status code {r.status_code}')
return
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
response = json.loads(r.text)
if not response['data']['user']['edge_owner_to_timeline_media']['edges']:
response = r._snscrape_json_obj
if not response['data'][self._responseContainer][self._edgeXToMedia]['edges']:
return
yield from self._response_to_items(response['data'], username)
if not response['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']:
yield from self._response_to_items(response['data'])
if not response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['has_next_page']:
return
endCursor = response['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor']
class InstagramUserScraper(InstagramCommonScraper):
name = 'instagram-user'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'An Instagram username')
subparser.add_argument('username', help = 'An Instagram username (no leading @)')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)
return cls('User', args.username, retries = args.retries)
class InstagramHashtagScraper(InstagramCommonScraper):
name = 'instagram-hashtag'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('hashtag', help = 'An Instagram hashtag (no leading #)')
@classmethod
def from_args(cls, args):
return cls('Hashtag', args.hashtag, retries = args.retries)
class InstagramLocationScraper(InstagramCommonScraper):
name = 'instagram-location'
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('locationid', help = 'An Instagram location ID', type = int)
@classmethod
def from_args(cls, args):
return cls('Location', args.locationid, retries = args.retries)

View File

@@ -0,0 +1,66 @@
import bs4
import datetime
import logging
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class TelegramPost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
outlinks: list
outlinksss: str
def __str__(self):
return self.url
class TelegramChannelScraper(snscrape.base.Scraper):
name = 'telegram-channel'
def __init__(self, name, **kwargs):
super().__init__(**kwargs)
self._name = name
def _soup_to_items(self, soup, pageUrl):
posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True})
for post in reversed(posts):
date = datetime.datetime.strptime(post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date').find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
message = post.find('div', class_ = 'tgme_widget_message_text')
if message:
content = message.text
outlinks = [urllib.parse.urljoin(pageUrl, link['href']) for link in post.find_all('a') if not link.text.startswith('@') and link['href'].startswith('https://t.me/')]
outlinksss = ' '.join(outlinks)
else:
content = None
outlinks = []
outlinksss = ''
yield TelegramPost(url = f'https://t.me/s/{post["data-post"]}', date = date, content = content, outlinks = outlinks, outlinksss = outlinksss)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'}
nextPageUrl = f'https://t.me/s/{self._name}'
while True:
r = self._get(nextPageUrl, headers = headers)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
soup = bs4.BeautifulSoup(r.text, 'lxml')
yield from self._soup_to_items(soup, nextPageUrl)
pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
if not pageLink:
break
nextPageUrl = urllib.parse.urljoin(nextPageUrl, pageLink['href'])
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('channel', help = 'A channel name')
@classmethod
def from_args(cls, args):
return cls(args.channel, retries = args.retries)

View File

@@ -1,80 +1,213 @@
import bs4
import datetime
import json
import random
import logging
import re
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class TwitterSearchScraper(snscrape.base.Scraper):
name = 'twitter-search'
class Tweet(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
id: int
username: str
outlinks: list
outlinksss: str
tcooutlinks: list
tcooutlinksss: str
def __init__(self, query, **kwargs):
super().__init__(**kwargs)
self._query = query
def __str__(self):
return self.url
def _get_feed_from_html(self, html):
soup = bs4.BeautifulSoup(html, 'lxml')
feed = soup.find_all('li', 'js-stream-item')
return feed
class Account(typing.NamedTuple, snscrape.base.Item):
username: str
@property
def url(self):
return f'https://twitter.com/{self.username}'
def __str__(self):
return self.url
class TwitterCommonScraper(snscrape.base.Scraper):
def _feed_to_items(self, feed):
for tweet in feed:
username = tweet.find('span', 'username').find('b').text
tweetID = tweet['data-item-id']
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
url = f'https://twitter.com/{username}/status/{tweetID}'
date = None
timestampA = tweet.find('a', 'tweet-timestamp')
if timestampA:
timestampSpan = timestampA.find('span', '_timestamp')
if timestampSpan and timestampSpan.has_attr('data-time'):
date = datetime.datetime.fromtimestamp(int(timestampSpan['data-time']), datetime.timezone.utc)
if not date:
logger.warning(f'Failed to extract date for {url}')
contentP = tweet.find('p', 'tweet-text')
content = None
outlinks = []
tcooutlinks = []
if contentP:
content = contentP.text
for a in contentP.find_all('a'):
if a.has_attr('href') and not a['href'].startswith('/') and (not a.has_attr('class') or 'u-hidden' not in a['class']):
if a.has_attr('data-expanded-url'):
outlinks.append(a['data-expanded-url'])
else:
logger.warning(f'Ignoring link without expanded URL on {url}: {a["href"]}')
tcooutlinks.append(a['href'])
else:
logger.warning(f'Failed to extract content for {url}')
card = tweet.find('div', 'card2')
if card and 'has-autoplayable-media' not in card['class']:
for div in card.find_all('div'):
if div.has_attr('data-card-url'):
outlinks.append(div['data-card-url'])
tcooutlinks.append(div['data-card-url'])
outlinks = list(dict.fromkeys(outlinks)) # Deduplicate in case the same link was shared more than once within this tweet; may change order on Python 3.6 or older
tcooutlinks = list(dict.fromkeys(tcooutlinks))
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
def _check_json_callback(self, r):
if r.headers['content-type'] != 'application/json;charset=utf-8':
if r.headers.get('content-type') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
return True, None
class TwitterSearchScraper(TwitterCommonScraper):
name = 'twitter-search'
def __init__(self, query, cursor = None, **kwargs):
super().__init__(**kwargs)
self._query = query
self._cursor = cursor
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
def _get_guest_token(self):
logger.info(f'Retrieving guest token from search page')
r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
if not match:
raise snscrape.base.ScraperException('Unable to find guest token')
return match.group(1)
def _check_scroll_response(self, r):
if r.status_code == 429:
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
return True, None
if r.headers.get('content-type') != 'application/json;charset=utf-8':
return False, f'content type is not JSON'
if r.status_code != 200:
return False, f'non-200 status code'
return True, None
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
# First page
logger.info(f'Retrieving search page for {self._query}')
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'typd', 'qf': 'off'}, headers = headers)
feed = self._get_feed_from_html(r.text)
if not feed:
return
newestID = feed[0]['data-item-id']
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
yield from self._feed_to_items(feed)
headers = {
'User-Agent': self._userAgent,
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'Referer': self._baseUrl,
}
guestToken = None
cursor = self._cursor
while True:
logger.info(f'Retrieving scroll page {maxPosition}')
r = self._get('https://twitter.com/i/search/timeline',
params = {
'f': 'tweets',
'vertical': 'default',
'lang': 'en',
'q': self._query,
'include_available_features': '1',
'include_entities': '1',
'reset_error_state': 'false',
'src': 'typd',
'qf': 'off',
'max_position': maxPosition,
},
headers = headers,
responseOkCallback = self._check_json_callback)
if not guestToken:
guestToken = self._get_guest_token()
headers['x-guest-token'] = guestToken
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
if not feed:
return
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
yield from self._feed_to_items(feed)
logger.info(f'Retrieving scroll page {cursor}')
params = {
'include_profile_interstitial_type': '1',
'include_blocking': '1',
'include_blocked_by': '1',
'include_followed_by': '1',
'include_want_retweets': '1',
'include_mute_edge': '1',
'include_can_dm': '1',
'include_can_media_tag': '1',
'skip_status': '1',
'cards_platform': 'Web-12',
'include_cards': '1',
'include_composer_source': 'true',
'include_ext_alt_text': 'true',
'include_reply_count': '1',
'tweet_mode': 'extended',
'include_entities': 'true',
'include_user_entities': 'true',
'include_ext_media_color': 'true',
'include_ext_media_availability': 'true',
'send_error_codes': 'true',
'simple_quoted_tweets': 'true',
'q': self._query,
'tweet_search_mode': 'live',
'count': '100',
'query_source': 'spelling_expansion_revert_click',
}
if cursor:
params['cursor'] = cursor
params['pc'] = '1'
params['spelling_corrections'] = '1'
params['ext'] = 'mediaStats%2CcameraMoment'
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
if r.status_code == 429:
guestToken = None
continue
try:
obj = r.json()
except json.JSONDecodeError as e:
raise snscrape.base.ScraperException('Received invalid JSON from Twitter') from e
# No data format test, just a hard and loud crash if anything's wrong :-)
newCursor = None
for instruction in obj['timeline']['instructions']:
if 'addEntries' in instruction:
entries = instruction['addEntries']['entries']
elif 'replaceEntry' in instruction:
entries = [instruction['replaceEntry']['entry']]
else:
continue
for entry in entries:
if entry['entryId'].startswith('sq-I-t-'):
if 'tweet' in entry['content']['item']['content']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
elif 'tombstone' in entry['content']['item']['content'] and 'tweet' in entry['content']['item']['content']['tombstone']:
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tombstone']['tweet']['id']]
else:
raise snscrape.base.ScraperException(f'Unable to handle entry {entry["entryId"]!r}')
tweetID = tweet['id']
content = tweet['full_text']
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
url = f'https://twitter.com/{username}/status/{tweetID}'
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
elif entry['entryId'] == 'sq-cursor-bottom':
newCursor = entry['content']['operation']['cursor']['value']
if not newCursor or newCursor == cursor:
# End of pagination
break
cursor = newCursor
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('--cursor', metavar = 'CURSOR')
subparser.add_argument('query', help = 'A Twitter search string')
@classmethod
def from_args(cls, args):
return cls(args.query, retries = args.retries)
return cls(args.query, cursor = args.cursor, retries = args.retries)
class TwitterUserScraper(TwitterSearchScraper):
@@ -106,3 +239,136 @@ class TwitterHashtagScraper(TwitterSearchScraper):
@classmethod
def from_args(cls, args):
return cls(args.hashtag, retries = args.retries)
class TwitterThreadScraper(TwitterCommonScraper):
name = 'twitter-thread'
def __init__(self, tweetID = None, **kwargs):
if tweetID is not None and tweetID.strip('0123456789') != '':
raise ValueError('Invalid tweet ID, must be numeric')
super().__init__(**kwargs)
self._tweetID = tweetID
def get_items(self):
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
# Fetch the page of the last tweet in the thread
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
soup = bs4.BeautifulSoup(r.text, 'lxml')
# Extract tweets on that page in the correct order; first, the tweet that was supplied, then the ancestors with pagination if necessary
tweet = soup.find('div', 'ThreadedConversation--permalinkTweetWithAncestors')
if tweet:
tweet = tweet.find('div', 'tweet')
if not tweet:
logger.warning('Tweet does not exist, is not a thread, or does not have ancestors')
return
items = list(self._feed_to_items([tweet]))
assert len(items) == 1
yield items[0]
username = items[0].username
ancestors = soup.find('div', 'ThreadedConversation--ancestors')
if not ancestors:
logger.warning('Tweet does not have ancestors despite claiming to')
return
feed = reversed(ancestors.find_all('li', 'js-stream-item'))
yield from self._feed_to_items(feed)
# If necessary, iterate through pagination until reaching the initial tweet
streamContainer = ancestors.find('div', 'stream-container')
if not streamContainer.has_attr('data-max-position') or streamContainer['data-max-position'] == '':
return
minPosition = streamContainer['data-max-position']
while True:
r = self._get(
f'https://twitter.com/i/{username}/conversation/{self._tweetID}?include_available_features=1&include_entities=1&min_position={minPosition}',
headers = headers,
responseOkCallback = self._check_json_callback
)
obj = json.loads(r.text)
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
feed = reversed(soup.find_all('li', 'js-stream-item'))
yield from self._feed_to_items(feed)
if not obj['has_more_items']:
break
minPosition = obj['max_position']
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('tweetID', help = 'A tweet ID of the last tweet in a thread')
@classmethod
def from_args(cls, args):
return cls(tweetID = args.tweetID, retries = args.retries)
class TwitterListPostsScraper(TwitterSearchScraper):
name = 'twitter-list-posts'
def __init__(self, listName, **kwargs):
super().__init__(f'list:{listName}', **kwargs)
self._listName = listName
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
@classmethod
def from_args(cls, args):
return cls(args.list, retries = args.retries)
class TwitterListMembersScraper(TwitterCommonScraper):
name = 'twitter-list-members'
def __init__(self, listName, **kwargs):
super().__init__(**kwargs)
self._user, self._list = listName.split('/')
def get_items(self):
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
r = self._get(baseUrl, headers = headers)
if r.status_code != 200:
logger.warning('List not found')
return
soup = bs4.BeautifulSoup(r.text, 'lxml')
container = soup.find('div', 'stream-container')
if not container:
raise snscrape.base.ScraperException('Unable to find container')
items = container.find_all('li', 'js-stream-item')
if not items:
logger.warning('Empty list')
return
for item in items:
yield Account(username = item.find('div', 'account')['data-screen-name'])
if not container.has_attr('data-min-position') or container['data-min-position'] == '':
return
maxPosition = container['data-min-position']
while True:
r = self._get(
f'{baseUrl}/timeline?include_available_features=1&include_entities=1&max_position={maxPosition}&reset_error_state=false',
headers = headers,
responseOkCallback = self._check_json_callback
)
obj = json.loads(r.text)
soup = bs4.BeautifulSoup(obj['items_html'], 'lxml')
items = soup.find_all('li', 'js-stream-item')
for item in items:
yield Account(username = item.find('div', 'account')['data-screen-name'])
if not obj['has_more_items']:
break
maxPosition = obj['min_position']
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('list', help = 'A Twitter list, formatted as "username/listname"')
@classmethod
def from_args(cls, args):
return cls(args.list, retries = args.retries)

View File

@@ -0,0 +1,104 @@
import bs4
import datetime
import itertools
import logging
import snscrape.base
import typing
import urllib.parse
logger = logging.getLogger(__name__)
class VKontaktePost(typing.NamedTuple, snscrape.base.Item):
url: str
date: datetime.datetime
content: str
def __str__(self):
return self.url
class VKontakteUserScraper(snscrape.base.Scraper):
name = 'vkontakte-user'
def __init__(self, username, **kwargs):
super().__init__(**kwargs)
self._username = username
def _soup_to_items(self, soup, baseUrl):
for post in soup.find_all('div', class_ = 'post'):
dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date')
textDiv = post.find('div', class_ = 'wall_post_text')
yield VKontaktePost(
url = urllib.parse.urljoin(baseUrl, post.find('a', class_ = 'post_link')['href']),
date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None,
content = textDiv.text if textDiv else None,
)
def get_items(self):
headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0', 'Accept-Language': 'en-US,en;q=0.5'}
baseUrl = f'https://vk.com/{self._username}'
logger.info('Retrieving initial data')
r = self._get(baseUrl, headers = headers)
if r.status_code == 404:
logger.warning('Wall does not exist')
return
elif r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
# VK sends windows-1251-encoded data, but Requests's decoding doesn't seem to work correctly and causes lxml to choke, so we need to pass the binary content and the encoding explicitly.
soup = bs4.BeautifulSoup(r.content, 'lxml', from_encoding = r.encoding)
if soup.find('div', class_ = 'profile_closed_wall_dummy'):
logger.warning('Private profile')
return
profileDeleted = soup.find('h5', class_ = 'profile_deleted_text')
if profileDeleted:
# Unclear what this state represents, so just log website text.
logger.warning(profileDeleted.text)
return
newestPost = soup.find('div', class_ = 'post')
if not newestPost:
logger.info('Wall has no posts')
return
ownerID = newestPost.attrs['data-post-id'].split('_')[0]
# If there is a pinned post, we need its ID for the pagination requests
if 'post_fixed' in newestPost.attrs['class']:
fixedPostID = newestPost.attrs['id'].split('_')[1]
else:
fixedPostID = ''
yield from self._soup_to_items(soup, baseUrl)
headers['X-Requested-With'] = 'XMLHttpRequest'
for offset in itertools.count(start = 10, step = 10):
logger.info('Retrieving next page')
r = self._post(
'https://vk.com/al_wall.php',
data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)],
headers = headers
)
if r.status_code != 200:
raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
# Convert to JSON and read the HTML payload. Note that this implicitly converts the data to a Python string (i.e., Unicode), away from a windows-1251-encoded bytes.
posts = r.json()['payload'][1][0]
if posts.startswith('<div class="page_block no_posts">'):
# Reached the end
break
if not posts.startswith('<div id="post'):
raise snscrape.base.ScraperException(f'Got an unknown response: {posts[:200]!r}...')
soup = bs4.BeautifulSoup(posts, 'lxml')
yield from self._soup_to_items(soup, baseUrl)
@classmethod
def setup_parser(cls, subparser):
subparser.add_argument('username', help = 'A VK username')
@classmethod
def from_args(cls, args):
return cls(args.username, retries = args.retries)

7
snscrape/version.py Normal file
View File

@@ -0,0 +1,7 @@
import pkg_resources
try:
__version__ = pkg_resources.get_distribution('snscrape').version
except pkg_resources.DistributionNotFound:
__version__ = None