mirror of
https://github.com/bellingcat/snscrape.git
synced 2026-06-10 19:38:29 +03:00
This commit is contained in:
@@ -13,6 +13,7 @@ def parse_args():
|
||||
parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N',
|
||||
help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff')
|
||||
parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results')
|
||||
parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format')
|
||||
|
||||
subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use')
|
||||
classes = snscrape.base.Scraper.__subclasses__()
|
||||
@@ -57,7 +58,10 @@ def main():
|
||||
|
||||
i = 0
|
||||
for i, item in enumerate(scraper.get_items(), start = 1):
|
||||
print(item)
|
||||
if args.format is not None:
|
||||
print(args.format.format(**item._asdict()))
|
||||
else:
|
||||
print(item)
|
||||
if args.maxResults and i >= args.maxResults:
|
||||
logger.info(f'Exiting after {i} results')
|
||||
break
|
||||
|
||||
@@ -1,14 +1,26 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import snscrape.base
|
||||
import typing
|
||||
import urllib.parse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FacebookPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: typing.Optional[str]
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class FacebookUserScraper(snscrape.base.Scraper):
|
||||
name = 'facebook-user'
|
||||
|
||||
@@ -16,9 +28,33 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
super().__init__(**kwargs)
|
||||
self._username = username
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
yielded = set()
|
||||
def _clean_url(self, dirtyUrl):
|
||||
u = urllib.parse.urlparse(dirtyUrl)
|
||||
if u.path == '/permalink.php':
|
||||
# Retain only story_fbid and id parameters
|
||||
q = urllib.parse.parse_qs(u.query)
|
||||
clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '')
|
||||
elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'):
|
||||
# No manipulation of the path needed, but strip the query string
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.split('/')[2] in ('photos', 'videos'):
|
||||
# Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/"
|
||||
# But to be safe, also handle URLs that don't have that crap correctly.
|
||||
if u.path.count('/') == 4:
|
||||
clean = (u.scheme, u.netloc, u.path, '', '')
|
||||
elif u.path.count('/') == 5:
|
||||
# Strip out the third path component
|
||||
pathcomps = u.path.split('/')
|
||||
pathcomps.pop(3) # Don't forget about the empty string at the beginning!
|
||||
clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '')
|
||||
else:
|
||||
return dirtyUrl
|
||||
else:
|
||||
# If we don't recognise the URL, just return the original one.
|
||||
return dirtyUrl
|
||||
return urllib.parse.urlunsplit(clean)
|
||||
|
||||
def _soup_to_items(self, soup, baseUrl):
|
||||
for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019
|
||||
entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry.
|
||||
href = entryA.get('href')
|
||||
@@ -27,10 +63,14 @@ class FacebookUserScraper(snscrape.base.Scraper):
|
||||
# Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink.
|
||||
logger.warning(f'Ignoring odd link: {href}')
|
||||
continue
|
||||
link = urllib.parse.urljoin(baseUrl, href)
|
||||
if link not in yielded:
|
||||
yield snscrape.base.URLItem(link)
|
||||
yielded.add(link)
|
||||
dirtyUrl = urllib.parse.urljoin(baseUrl, href)
|
||||
date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc)
|
||||
contentDiv = entry.find('div', class_ = '_5pbx')
|
||||
if contentDiv:
|
||||
content = contentDiv.text
|
||||
else:
|
||||
content = None
|
||||
yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'}
|
||||
|
||||
@@ -1,12 +1,26 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstagramPost(typing.NamedTuple, snscrape.base.Item):
|
||||
cleanUrl: str
|
||||
dirtyUrl: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
thumbnailUrl: str
|
||||
displayUrl: str
|
||||
|
||||
def __str__(self):
|
||||
return self.cleanUrl
|
||||
|
||||
|
||||
class InstagramUserScraper(snscrape.base.Scraper):
|
||||
name = 'instagram-user'
|
||||
|
||||
@@ -17,7 +31,15 @@ class InstagramUserScraper(snscrape.base.Scraper):
|
||||
def _response_to_items(self, response, username):
|
||||
for node in response['user']['edge_owner_to_timeline_media']['edges']:
|
||||
code = node['node']['shortcode']
|
||||
yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here?
|
||||
cleanUrl = f'https://www.instagram.com/p/{code}/'
|
||||
yield InstagramPost(
|
||||
cleanUrl = cleanUrl,
|
||||
dirtyUrl = f'{cleanUrl}?taken-by={username}',
|
||||
date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc),
|
||||
content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'],
|
||||
thumbnailUrl = node['node']['thumbnail_src'],
|
||||
displayUrl = node['node']['display_url'],
|
||||
)
|
||||
|
||||
def get_items(self):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
|
||||
@@ -1,13 +1,24 @@
|
||||
import bs4
|
||||
import datetime
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
import snscrape.base
|
||||
import typing
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Tweet(typing.NamedTuple, snscrape.base.Item):
|
||||
url: str
|
||||
date: datetime.datetime
|
||||
content: str
|
||||
|
||||
def __str__(self):
|
||||
return self.url
|
||||
|
||||
|
||||
class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
name = 'twitter-search'
|
||||
|
||||
@@ -24,7 +35,9 @@ class TwitterSearchScraper(snscrape.base.Scraper):
|
||||
for tweet in feed:
|
||||
username = tweet.find('span', 'username').find('b').text
|
||||
tweetID = tweet['data-item-id']
|
||||
yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}')
|
||||
date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc)
|
||||
content = tweet.find('p', 'tweet-text').text
|
||||
yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content)
|
||||
|
||||
def _check_json_callback(self, r):
|
||||
if r.headers.get('content-type') != 'application/json;charset=utf-8':
|
||||
|
||||
Reference in New Issue
Block a user