mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-11 20:08:29 +03:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6cc3180d9 | ||
|
|
613395d1c2 | ||
|
|
82a87b7b5a | ||
|
|
9568028bf9 | ||
|
|
6df351772e | ||
|
|
541173b0c8 | ||
|
|
b6772d3778 | ||
|
|
20ea117a2c | ||
|
|
ff54c350bc |
@@ -80,6 +80,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
|
|||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
def _soup_to_items(self, soup, baseUrl, mode):
|
def _soup_to_items(self, soup, baseUrl, mode):
|
||||||
|
cleanUrl = None # Value from previous iteration is used for warning on link-less entries
|
||||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||||
mediaSetA = entry.find('a', class_ = '_17z-')
|
mediaSetA = entry.find('a', class_ = '_17z-')
|
||||||
@@ -96,6 +97,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
|
|||||||
logger.warning(f'Ignoring odd link: {href}')
|
logger.warning(f'Ignoring odd link: {href}')
|
||||||
continue
|
continue
|
||||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||||
|
cleanUrl = self._clean_url(dirtyUrl)
|
||||||
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
date = datetime.datetime.fromtimestamp(int(entry.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||||
if contentDiv:
|
if contentDiv:
|
||||||
@@ -116,7 +118,7 @@ class FacebookCommonScraper(snscrape.base.Scraper):
|
|||||||
outlink = query['u'][0]
|
outlink = query['u'][0]
|
||||||
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
|
if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks:
|
||||||
outlinks.append(outlink)
|
outlinks.append(outlink)
|
||||||
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
|
yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks, outlinksss = ' '.join(outlinks))
|
||||||
|
|
||||||
|
|
||||||
class FacebookUserScraper(FacebookCommonScraper):
|
class FacebookUserScraper(FacebookCommonScraper):
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
|||||||
content: str
|
content: str
|
||||||
thumbnailUrl: str
|
thumbnailUrl: str
|
||||||
displayUrl: str
|
displayUrl: str
|
||||||
|
username: str
|
||||||
|
likes: int
|
||||||
|
comments: int
|
||||||
|
commentsDisabled: bool
|
||||||
|
isVideo: bool
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.cleanUrl
|
return self.cleanUrl
|
||||||
@@ -57,7 +62,8 @@ class InstagramCommonScraper(snscrape.base.Scraper):
|
|||||||
def _response_to_items(self, response):
|
def _response_to_items(self, response):
|
||||||
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
for node in response[self._responseContainer][self._edgeXToMedia]['edges']:
|
||||||
code = node['node']['shortcode']
|
code = node['node']['shortcode']
|
||||||
usernameQuery = '?taken-by=' + node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
username = node['node']['owner']['username'] if 'username' in node['node']['owner'] else ''
|
||||||
|
usernameQuery = '?taken-by=' + username
|
||||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||||
yield InstagramPost(
|
yield InstagramPost(
|
||||||
cleanUrl = cleanUrl,
|
cleanUrl = cleanUrl,
|
||||||
@@ -66,6 +72,11 @@ class InstagramCommonScraper(snscrape.base.Scraper):
|
|||||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'] if len(node['node']['edge_media_to_caption']['edges']) else None,
|
||||||
thumbnailUrl = node['node']['thumbnail_src'],
|
thumbnailUrl = node['node']['thumbnail_src'],
|
||||||
displayUrl = node['node']['display_url'],
|
displayUrl = node['node']['display_url'],
|
||||||
|
username = username,
|
||||||
|
likes = node['node']['edge_media_preview_like']['count'],
|
||||||
|
comments = node['node']['edge_media_to_comment']['count'],
|
||||||
|
commentsDisabled = node['node']['comments_disabled'],
|
||||||
|
isVideo = node['node']['is_video'],
|
||||||
)
|
)
|
||||||
|
|
||||||
def _check_initial_page_callback(self, r):
|
def _check_initial_page_callback(self, r):
|
||||||
|
|||||||
@@ -3,8 +3,10 @@ import datetime
|
|||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
import snscrape.base
|
import snscrape.base
|
||||||
import typing
|
import typing
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -86,82 +88,122 @@ class TwitterCommonScraper(snscrape.base.Scraper):
|
|||||||
class TwitterSearchScraper(TwitterCommonScraper):
|
class TwitterSearchScraper(TwitterCommonScraper):
|
||||||
name = 'twitter-search'
|
name = 'twitter-search'
|
||||||
|
|
||||||
def __init__(self, query, maxPosition = None, **kwargs):
|
def __init__(self, query, cursor = None, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self._query = query
|
self._query = query
|
||||||
self._maxPosition = maxPosition
|
self._cursor = cursor
|
||||||
|
self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
|
||||||
|
self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
|
||||||
|
|
||||||
def _get_feed_from_html(self, html, withMinPosition):
|
def _get_guest_token(self):
|
||||||
soup = bs4.BeautifulSoup(html, 'lxml')
|
logger.info(f'Retrieving guest token from search page')
|
||||||
feed = soup.find_all('li', 'js-stream-item')
|
r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
|
||||||
if withMinPosition:
|
match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
|
||||||
streamContainer = soup.find('div', 'stream-container')
|
if not match:
|
||||||
if not streamContainer or not streamContainer.has_attr('data-min-position'):
|
raise RuntimeError('Unable to find guest token')
|
||||||
if soup.find('div', 'SearchEmptyTimeline'):
|
return match.group(1)
|
||||||
# No results found
|
|
||||||
minPosition = None
|
def _check_scroll_response(self, r):
|
||||||
else:
|
if r.status_code == 429:
|
||||||
# Unknown error condition
|
# Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
|
||||||
raise RuntimeError('Unable to find min-position')
|
return True, None
|
||||||
else:
|
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||||
minPosition = streamContainer['data-min-position']
|
return False, f'content type is not JSON'
|
||||||
else:
|
if r.status_code != 200:
|
||||||
minPosition = None
|
return False, f'non-200 status code'
|
||||||
return feed, minPosition
|
return True, None
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
headers = {
|
||||||
|
'User-Agent': self._userAgent,
|
||||||
# First page
|
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
|
||||||
if self._maxPosition is None:
|
'Referer': self._baseUrl,
|
||||||
logger.info(f'Retrieving search page for {self._query}')
|
}
|
||||||
r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers)
|
guestToken = None
|
||||||
|
cursor = self._cursor
|
||||||
feed, maxPosition = self._get_feed_from_html(r.text, True)
|
|
||||||
if not feed:
|
|
||||||
logger.warning(f'No results for {self._query}')
|
|
||||||
return
|
|
||||||
yield from self._feed_to_items(feed)
|
|
||||||
else:
|
|
||||||
maxPosition = self._maxPosition
|
|
||||||
|
|
||||||
if not maxPosition:
|
|
||||||
return
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
logger.info(f'Retrieving scroll page {maxPosition}')
|
if not guestToken:
|
||||||
r = self._get('https://twitter.com/i/search/timeline',
|
guestToken = self._get_guest_token()
|
||||||
params = {
|
headers['x-guest-token'] = guestToken
|
||||||
'f': 'tweets',
|
|
||||||
'vertical': 'default',
|
|
||||||
'lang': 'en',
|
|
||||||
'q': self._query,
|
|
||||||
'include_available_features': '1',
|
|
||||||
'include_entities': '1',
|
|
||||||
'reset_error_state': 'false',
|
|
||||||
'src': 'spxr',
|
|
||||||
'qf': 'off',
|
|
||||||
'max_position': maxPosition,
|
|
||||||
},
|
|
||||||
headers = headers,
|
|
||||||
responseOkCallback = self._check_json_callback)
|
|
||||||
|
|
||||||
obj = json.loads(r.text)
|
logger.info(f'Retrieving scroll page {cursor}')
|
||||||
feed, _ = self._get_feed_from_html(obj['items_html'], False)
|
params = {
|
||||||
if feed:
|
'include_profile_interstitial_type': '1',
|
||||||
yield from self._feed_to_items(feed)
|
'include_blocking': '1',
|
||||||
if obj['min_position'] == maxPosition:
|
'include_blocked_by': '1',
|
||||||
return
|
'include_followed_by': '1',
|
||||||
maxPosition = obj['min_position']
|
'include_want_retweets': '1',
|
||||||
|
'include_mute_edge': '1',
|
||||||
|
'include_can_dm': '1',
|
||||||
|
'include_can_media_tag': '1',
|
||||||
|
'skip_status': '1',
|
||||||
|
'cards_platform': 'Web-12',
|
||||||
|
'include_cards': '1',
|
||||||
|
'include_composer_source': 'true',
|
||||||
|
'include_ext_alt_text': 'true',
|
||||||
|
'include_reply_count': '1',
|
||||||
|
'tweet_mode': 'extended',
|
||||||
|
'include_entities': 'true',
|
||||||
|
'include_user_entities': 'true',
|
||||||
|
'include_ext_media_color': 'true',
|
||||||
|
'include_ext_media_availability': 'true',
|
||||||
|
'send_error_codes': 'true',
|
||||||
|
'simple_quoted_tweets': 'true',
|
||||||
|
'q': self._query,
|
||||||
|
'tweet_search_mode': 'live',
|
||||||
|
'count': '100',
|
||||||
|
'query_source': 'spelling_expansion_revert_click',
|
||||||
|
}
|
||||||
|
if cursor:
|
||||||
|
params['cursor'] = cursor
|
||||||
|
params['pc'] = '1'
|
||||||
|
params['spelling_corrections'] = '1'
|
||||||
|
params['ext'] = 'mediaStats%2CcameraMoment'
|
||||||
|
r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
|
||||||
|
if r.status_code == 429:
|
||||||
|
guestToken = None
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
obj = r.json()
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error(f'Received invalid JSON from Twitter: {e!s}')
|
||||||
|
raise RuntimeError('Received invalid JSON from Twitter') from e
|
||||||
|
|
||||||
|
# No data format test, just a hard and loud crash if anything's wrong :-)
|
||||||
|
newCursor = None
|
||||||
|
for instruction in obj['timeline']['instructions']:
|
||||||
|
if 'addEntries' in instruction:
|
||||||
|
entries = instruction['addEntries']['entries']
|
||||||
|
elif 'replaceEntry' in instruction:
|
||||||
|
entries = [instruction['replaceEntry']['entry']]
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
for entry in entries:
|
||||||
|
if entry['entryId'].startswith('sq-I-t-'):
|
||||||
|
tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
|
||||||
|
tweetID = tweet['id']
|
||||||
|
content = tweet['full_text']
|
||||||
|
username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
|
||||||
|
date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
|
||||||
|
outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
|
||||||
|
tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
|
||||||
|
url = f'https://twitter.com/{username}/status/{tweetID}'
|
||||||
|
yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
|
||||||
|
elif entry['entryId'] == 'sq-cursor-bottom':
|
||||||
|
newCursor = entry['content']['operation']['cursor']['value']
|
||||||
|
if not newCursor or newCursor == cursor:
|
||||||
|
# End of pagination
|
||||||
|
break
|
||||||
|
cursor = newCursor
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_parser(cls, subparser):
|
def setup_parser(cls, subparser):
|
||||||
subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition')
|
subparser.add_argument('--cursor', metavar = 'CURSOR')
|
||||||
subparser.add_argument('query', help = 'A Twitter search string')
|
subparser.add_argument('query', help = 'A Twitter search string')
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_args(cls, args):
|
def from_args(cls, args):
|
||||||
return cls(args.query, maxPosition = args.maxPosition, retries = args.retries)
|
return cls(args.query, cursor = args.cursor, retries = args.retries)
|
||||||
|
|
||||||
|
|
||||||
class TwitterUserScraper(TwitterSearchScraper):
|
class TwitterUserScraper(TwitterSearchScraper):
|
||||||
@@ -205,7 +247,7 @@ class TwitterThreadScraper(TwitterCommonScraper):
|
|||||||
self._tweetID = tweetID
|
self._tweetID = tweetID
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||||
|
|
||||||
# Fetch the page of the last tweet in the thread
|
# Fetch the page of the last tweet in the thread
|
||||||
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
|
r = self._get(f'https://twitter.com/user/status/{self._tweetID}', headers = headers)
|
||||||
@@ -283,7 +325,7 @@ class TwitterListMembersScraper(TwitterCommonScraper):
|
|||||||
self._user, self._list = listName.split('/')
|
self._user, self._list = listName.split('/')
|
||||||
|
|
||||||
def get_items(self):
|
def get_items(self):
|
||||||
headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
|
headers = {'User-Agent': f'Opera/9.80 (Windows NT 6.1; WOW64) Presto/2.12.388 Version/12.18'}
|
||||||
|
|
||||||
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
|
baseUrl = f'https://twitter.com/{self._user}/lists/{self._list}/members'
|
||||||
r = self._get(baseUrl, headers = headers)
|
r = self._get(baseUrl, headers = headers)
|
||||||
|
|||||||
@@ -56,6 +56,12 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
logger.error('Private profile')
|
logger.error('Private profile')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
profileDeleted = soup.find('h5', class_ = 'profile_deleted_text')
|
||||||
|
if profileDeleted:
|
||||||
|
# Unclear what this state represents, so just log website text.
|
||||||
|
logger.error(profileDeleted.text)
|
||||||
|
return
|
||||||
|
|
||||||
newestPost = soup.find('div', class_ = 'post')
|
newestPost = soup.find('div', class_ = 'post')
|
||||||
if not newestPost:
|
if not newestPost:
|
||||||
logger.info('Wall has no posts')
|
logger.info('Wall has no posts')
|
||||||
@@ -80,14 +86,15 @@ class VKontakteUserScraper(snscrape.base.Scraper):
|
|||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
logger.error(f'Got status code {r.status_code}')
|
logger.error(f'Got status code {r.status_code}')
|
||||||
return
|
return
|
||||||
fields = r.content.split(b'<!>')
|
# Convert to JSON and read the HTML payload. Note that this implicitly converts the data to a Python string (i.e., Unicode), away from a windows-1251-encoded bytes.
|
||||||
if fields[5].startswith(b'<div class="page_block no_posts">'):
|
posts = r.json()['payload'][1][0]
|
||||||
|
if posts.startswith('<div class="page_block no_posts">'):
|
||||||
# Reached the end
|
# Reached the end
|
||||||
break
|
break
|
||||||
if not fields[5].startswith(b'<div id="post'):
|
if not posts.startswith('<div id="post'):
|
||||||
logger.error(f'Got an unknown response: {fields[5][:200]!r}...')
|
logger.error(f'Got an unknown response: {posts[:200]!r}...')
|
||||||
break
|
break
|
||||||
soup = bs4.BeautifulSoup(fields[5], 'lxml', from_encoding = r.encoding)
|
soup = bs4.BeautifulSoup(posts, 'lxml')
|
||||||
yield from self._soup_to_items(soup, baseUrl)
|
yield from self._soup_to_items(soup, baseUrl)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user