mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-08 02:28:29 +03:00
Split up into modules
This commit is contained in:
2
setup.py
2
setup.py
@@ -9,7 +9,7 @@ setuptools.setup(
|
||||
install_requires = ['requests', 'lxml', 'beautifulsoup4'],
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'smscrape = socialmediascraper:main',
|
||||
'smscrape = socialmediascraper.cli:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1,206 +0,0 @@
|
||||
import abc
|
||||
import argparse
|
||||
import bs4
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Item:
|
||||
'''An abstract base class for an item returned by the scraper's get_items generator.
|
||||
|
||||
An item can really be anything. The string representation should be useful for the CLI output (e.g. a direct URL for the item).'''
|
||||
|
||||
@abc.abstractmethod
|
||||
def __str__(self):
|
||||
pass
|
||||
|
||||
|
||||
class URLItem(Item):
|
||||
'''A generic item which only holds a URL string.'''
|
||||
|
||||
def __init__(self, url):
|
||||
self._url = url
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return self._url
|
||||
|
||||
def __str__(self):
|
||||
return self._url
|
||||
|
||||
|
||||
class ScraperException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Scraper:
|
||||
'''An abstract base class for a scraper.'''
|
||||
|
||||
name = None
|
||||
|
||||
def __init__(self, retries = 3):
|
||||
self._retries = retries
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_items(self):
|
||||
'''Iterator yielding Items.'''
|
||||
pass
|
||||
|
||||
def _get(self, url, params = None, headers = None, responseOkCallback = None):
|
||||
for attempt in range(self._retries + 1):
|
||||
logger.info(f'Retrieving {url}')
|
||||
logger.debug(f'... with parameters: {params!r}')
|
||||
logger.debug(f'... with headers: {headers!r}')
|
||||
try:
|
||||
r = requests.get(url, params = params, headers = headers)
|
||||
if responseOkCallback is None or responseOkCallback(r):
|
||||
logger.debug(f'{r.request.url} retrieved successfully')
|
||||
return r
|
||||
except requests.exceptions.RequestException as exc:
|
||||
logger.error(f'Error retrieving {url}: {exc!r}')
|
||||
else:
|
||||
msg = f'{self._retries + 1} requests to {url} failed, giving up.'
|
||||
logger.fatal(msg)
|
||||
raise ScraperException(msg)
|
||||
raise RuntimeError('Reached unreachable code')
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def setup_parser(cls, subparser):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def from_args(cls, args):
|
||||
pass
|
||||
|
||||
|
||||
class TwitterUserTweetsScraper(Scraper):
|
||||
name = 'twitter-user-tweets'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _get_feed_from_html(self, html):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
return feed
|
||||
|
||||
def _feed_to_items(self, feed):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers['content-type'] != 'application/json;charset=utf-8':
|
||||
logger.error(f'Content type of {r.url} is not JSON')
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_items(self):
|
||||
query = f'from:{self._username}'
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': query, 'src': 'typd'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
return
|
||||
newestID = feed[0]['data-item-id']
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
||||
r = self._get('https://twitter.com/i/search/timeline',
|
||||
params = {
|
||||
'f': 'tweets',
|
||||
'vertical': 'default',
|
||||
'lang': 'en',
|
||||
'q': query,
|
||||
'include_available_features': '1',
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback)
|
||||
|
||||
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
|
||||
if not feed:
|
||||
return
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A Twitter username (without @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
for cls in Scraper.__subclasses__():
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# http://bugs.python.org/issue16308 / https://bugs.python.org/issue26510 (fixed in Python 3.7)
|
||||
if not args.scraper:
|
||||
raise RuntimeError('Error: no scraper specified')
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(verbosity):
|
||||
rootLogger = logging.getLogger()
|
||||
|
||||
# Set level
|
||||
if verbosity > 0:
|
||||
level = logging.INFO if verbosity == 1 else logging.DEBUG
|
||||
rootLogger.setLevel(level)
|
||||
for handler in rootLogger.handlers:
|
||||
handler.setLevel(level)
|
||||
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('{asctime} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
|
||||
|
||||
# Add stream handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
rootLogger.addHandler(handler)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
setup_logging(args.verbosity)
|
||||
scraper = args.cls.from_args(args)
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
|
||||
76
socialmediascraper/base.py
Normal file
76
socialmediascraper/base.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import abc
|
||||
import logging
|
||||
import requests
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Item:
|
||||
'''An abstract base class for an item returned by the scraper's get_items generator.
|
||||
|
||||
An item can really be anything. The string representation should be useful for the CLI output (e.g. a direct URL for the item).'''
|
||||
|
||||
@abc.abstractmethod
|
||||
def __str__(self):
|
||||
pass
|
||||
|
||||
|
||||
class URLItem(Item):
|
||||
'''A generic item which only holds a URL string.'''
|
||||
|
||||
def __init__(self, url):
|
||||
self._url = url
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return self._url
|
||||
|
||||
def __str__(self):
|
||||
return self._url
|
||||
|
||||
|
||||
class ScraperException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Scraper:
|
||||
'''An abstract base class for a scraper.'''
|
||||
|
||||
name = None
|
||||
|
||||
def __init__(self, retries = 3):
|
||||
self._retries = retries
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_items(self):
|
||||
'''Iterator yielding Items.'''
|
||||
pass
|
||||
|
||||
def _get(self, url, params = None, headers = None, responseOkCallback = None):
|
||||
for attempt in range(self._retries + 1):
|
||||
logger.info(f'Retrieving {url}')
|
||||
logger.debug(f'... with parameters: {params!r}')
|
||||
logger.debug(f'... with headers: {headers!r}')
|
||||
try:
|
||||
r = requests.get(url, params = params, headers = headers)
|
||||
if responseOkCallback is None or responseOkCallback(r):
|
||||
logger.debug(f'{r.request.url} retrieved successfully')
|
||||
return r
|
||||
except requests.exceptions.RequestException as exc:
|
||||
logger.error(f'Error retrieving {url}: {exc!r}')
|
||||
else:
|
||||
msg = f'{self._retries + 1} requests to {url} failed, giving up.'
|
||||
logger.fatal(msg)
|
||||
raise ScraperException(msg)
|
||||
raise RuntimeError('Reached unreachable code')
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def setup_parser(cls, subparser):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def from_args(cls, args):
|
||||
pass
|
||||
63
socialmediascraper/cli.py
Normal file
63
socialmediascraper/cli.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import argparse
|
||||
import logging
|
||||
import socialmediascraper.base
|
||||
import socialmediascraper.modules
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('-v', '--verbose', '--verbosity', dest = 'verbosity', action = 'count', default = 0, help = 'Increase output verbosity')
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
for cls in socialmediascraper.base.Scraper.__subclasses__():
|
||||
subparser = subparsers.add_parser(cls.name, formatter_class = argparse.ArgumentDefaultsHelpFormatter)
|
||||
cls.setup_parser(subparser)
|
||||
subparser.set_defaults(cls = cls)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# http://bugs.python.org/issue16308 / https://bugs.python.org/issue26510 (fixed in Python 3.7)
|
||||
if not args.scraper:
|
||||
raise RuntimeError('Error: no scraper specified')
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(verbosity):
|
||||
rootLogger = logging.getLogger()
|
||||
|
||||
# Set level
|
||||
if verbosity > 0:
|
||||
level = logging.INFO if verbosity == 1 else logging.DEBUG
|
||||
rootLogger.setLevel(level)
|
||||
for handler in rootLogger.handlers:
|
||||
handler.setLevel(level)
|
||||
|
||||
# Create formatter
|
||||
formatter = logging.Formatter('{asctime} {levelname} {name} {message}', datefmt = '%Y-%m-%d %H:%M:%S', style = '{')
|
||||
|
||||
# Add stream handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
rootLogger.addHandler(handler)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
setup_logging(args.verbosity)
|
||||
scraper = args.cls.from_args(args)
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
else:
|
||||
logger.info(f'Done, found {i} results')
|
||||
15
socialmediascraper/modules/__init__.py
Normal file
15
socialmediascraper/modules/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import importlib
|
||||
import os
|
||||
import socialmediascraper.base
|
||||
|
||||
|
||||
def _import_modules():
|
||||
files = os.listdir(__path__[0])
|
||||
for fn in files:
|
||||
if fn.endswith('.py') and fn != '__init__.py':
|
||||
# Import module if not already imported
|
||||
moduleName = f'socialmediascraper.modules.{fn[:-3]}'
|
||||
module = importlib.import_module(moduleName)
|
||||
|
||||
|
||||
_import_modules()
|
||||
78
socialmediascraper/modules/twitter_user_tweets.py
Normal file
78
socialmediascraper/modules/twitter_user_tweets.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import bs4
|
||||
import json
|
||||
import logging
|
||||
import socialmediascraper.base
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitterUserTweetsScraper(socialmediascraper.base.Scraper):
|
||||
name = 'twitter-user-tweets'
|
||||
|
||||
def __init__(self, username, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _get_feed_from_html(self, html):
|
||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
||||
feed = soup.find_all('li', 'js-stream-item')
|
||||
return feed
|
||||
|
||||
def _feed_to_items(self, feed):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield socialmediascraper.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers['content-type'] != 'application/json;charset=utf-8':
|
||||
logger.error(f'Content type of {r.url} is not JSON')
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_items(self):
|
||||
query = f'from:{self._username}'
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
# First page
|
||||
logger.info(f'Retrieving search page for {query}')
|
||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': query, 'src': 'typd'}, headers = headers)
|
||||
|
||||
feed = self._get_feed_from_html(r.text)
|
||||
if not feed:
|
||||
return
|
||||
newestID = feed[0]['data-item-id']
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
while True:
|
||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
||||
r = self._get('https://twitter.com/i/search/timeline',
|
||||
params = {
|
||||
'f': 'tweets',
|
||||
'vertical': 'default',
|
||||
'lang': 'en',
|
||||
'q': query,
|
||||
'include_available_features': '1',
|
||||
'include_entities': '1',
|
||||
'reset_error_state': 'false',
|
||||
'src': 'typd',
|
||||
'max_position': maxPosition,
|
||||
},
|
||||
headers = headers,
|
||||
responseOkCallback = self._check_json_callback)
|
||||
|
||||
feed = self._get_feed_from_html(json.loads(r.text)['items_html'])
|
||||
if not feed:
|
||||
return
|
||||
maxPosition = f'TWEET-{feed[-1]["data-item-id"]}-{newestID}'
|
||||
yield from self._feed_to_items(feed)
|
||||
|
||||
@classmethod
|
||||
def setup_parser(cls, subparser):
|
||||
subparser.add_argument('username', help = 'A Twitter username (without @)')
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
return cls(args.username, retries = args.retries)
|
||||
Reference in New Issue
Block a user