From 536fcb33030d47ea6d98e4fa76c1eb915a07f1e4 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Thu, 18 Apr 2019 14:44:21 +0200 Subject: [PATCH] Return proper items from scrapers including clean URLs (fixes #9 and #10) --- snscrape/cli.py | 6 +++- snscrape/modules/facebook.py | 52 +++++++++++++++++++++++++++++++---- snscrape/modules/instagram.py | 24 +++++++++++++++- snscrape/modules/twitter.py | 15 +++++++++- 4 files changed, 88 insertions(+), 9 deletions(-) diff --git a/snscrape/cli.py b/snscrape/cli.py index daa3130..18fed53 100644 --- a/snscrape/cli.py +++ b/snscrape/cli.py @@ -13,6 +13,7 @@ def parse_args(): parser.add_argument('--retry', '--retries', dest = 'retries', type = int, default = 3, metavar = 'N', help = 'When the connection fails or the server returns an unexpected response, retry up to N times with an exponential backoff') parser.add_argument('-n', '--max-results', dest = 'maxResults', type = int, metavar = 'N', help = 'Only return the first N results') + parser.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format') subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use') classes = snscrape.base.Scraper.__subclasses__() @@ -57,7 +58,10 @@ def main(): i = 0 for i, item in enumerate(scraper.get_items(), start = 1): - print(item) + if args.format is not None: + print(args.format.format(**item._asdict())) + else: + print(item) if args.maxResults and i >= args.maxResults: logger.info(f'Exiting after {i} results') break diff --git a/snscrape/modules/facebook.py b/snscrape/modules/facebook.py index e4e28d4..13339ba 100644 --- a/snscrape/modules/facebook.py +++ b/snscrape/modules/facebook.py @@ -1,14 +1,26 @@ import bs4 +import datetime import json import logging import re import snscrape.base +import typing import urllib.parse logger = logging.getLogger(__name__) +class FacebookPost(typing.NamedTuple, snscrape.base.Item): + cleanUrl: str + dirtyUrl: str + date: datetime.datetime + content: typing.Optional[str] + + def __str__(self): + return self.cleanUrl + + class FacebookUserScraper(snscrape.base.Scraper): name = 'facebook-user' @@ -16,9 +28,33 @@ class FacebookUserScraper(snscrape.base.Scraper): super().__init__(**kwargs) self._username = username - def _soup_to_items(self, soup, baseUrl): - yielded = set() + def _clean_url(self, dirtyUrl): + u = urllib.parse.urlparse(dirtyUrl) + if u.path == '/permalink.php': + # Retain only story_fbid and id parameters + q = urllib.parse.parse_qs(u.query) + clean = (u.scheme, u.netloc, u.path, urllib.parse.urlencode((('story_fbid', q['story_fbid'][0]), ('id', q['id'][0]))), '') + elif u.path.split('/')[2] == 'posts' or u.path.startswith('/events/') or u.path.startswith('/notes/'): + # No manipulation of the path needed, but strip the query string + clean = (u.scheme, u.netloc, u.path, '', '') + elif u.path.split('/')[2] in ('photos', 'videos'): + # Path: "/" username or ID "/" photos or videos "/" crap "/" ID of photo or video "/" + # But to be safe, also handle URLs that don't have that crap correctly. + if u.path.count('/') == 4: + clean = (u.scheme, u.netloc, u.path, '', '') + elif u.path.count('/') == 5: + # Strip out the third path component + pathcomps = u.path.split('/') + pathcomps.pop(3) # Don't forget about the empty string at the beginning! + clean = (u.scheme, u.netloc, '/'.join(pathcomps), '', '') + else: + return dirtyUrl + else: + # If we don't recognise the URL, just return the original one. + return dirtyUrl + return urllib.parse.urlunsplit(clean) + def _soup_to_items(self, soup, baseUrl): for entry in soup.find_all('div', class_ = '_5pcr'): # also class 'fbUserContent' in 2017 and 'userContentWrapper' in 2019 entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry. href = entryA.get('href') @@ -27,10 +63,14 @@ class FacebookUserScraper(snscrape.base.Scraper): # Don't print a warning if it's a "User added 5 new photos to the album"-type entry, which doesn't have a permalink. logger.warning(f'Ignoring odd link: {href}') continue - link = urllib.parse.urljoin(baseUrl, href) - if link not in yielded: - yield snscrape.base.URLItem(link) - yielded.add(link) + dirtyUrl = urllib.parse.urljoin(baseUrl, href) + date = datetime.datetime.fromtimestamp(int(entryA.find('abbr', class_ = '_5ptz')['data-utime']), datetime.timezone.utc) + contentDiv = entry.find('div', class_ = '_5pbx') + if contentDiv: + content = contentDiv.text + else: + content = None + yield FacebookPost(cleanUrl = self._clean_url(dirtyUrl), dirtyUrl = dirtyUrl, date = date, content = content) def get_items(self): headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'} diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 3287b6c..0e82f44 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -1,12 +1,26 @@ +import datetime import hashlib import json import logging import snscrape.base +import typing logger = logging.getLogger(__name__) +class InstagramPost(typing.NamedTuple, snscrape.base.Item): + cleanUrl: str + dirtyUrl: str + date: datetime.datetime + content: str + thumbnailUrl: str + displayUrl: str + + def __str__(self): + return self.cleanUrl + + class InstagramUserScraper(snscrape.base.Scraper): name = 'instagram-user' @@ -17,7 +31,15 @@ class InstagramUserScraper(snscrape.base.Scraper): def _response_to_items(self, response, username): for node in response['user']['edge_owner_to_timeline_media']['edges']: code = node['node']['shortcode'] - yield snscrape.base.URLItem(f'https://www.instagram.com/p/{code}/?taken-by={username}') #TODO: Do we want the taken-by parameter in here? + cleanUrl = f'https://www.instagram.com/p/{code}/' + yield InstagramPost( + cleanUrl = cleanUrl, + dirtyUrl = f'{cleanUrl}?taken-by={username}', + date = datetime.datetime.fromtimestamp(node['node']['taken_at_timestamp'], datetime.timezone.utc), + content = node['node']['edge_media_to_caption']['edges'][0]['node']['text'], + thumbnailUrl = node['node']['thumbnail_src'], + displayUrl = node['node']['display_url'], + ) def get_items(self): headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index 9199973..d151331 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -1,13 +1,24 @@ import bs4 +import datetime import json import random import logging import snscrape.base +import typing logger = logging.getLogger(__name__) +class Tweet(typing.NamedTuple, snscrape.base.Item): + url: str + date: datetime.datetime + content: str + + def __str__(self): + return self.url + + class TwitterSearchScraper(snscrape.base.Scraper): name = 'twitter-search' @@ -24,7 +35,9 @@ class TwitterSearchScraper(snscrape.base.Scraper): for tweet in feed: username = tweet.find('span', 'username').find('b').text tweetID = tweet['data-item-id'] - yield snscrape.base.URLItem(f'https://twitter.com/{username}/status/{tweetID}') + date = datetime.datetime.fromtimestamp(int(tweet.find('a', 'tweet-timestamp').find('span', '_timestamp')['data-time']), datetime.timezone.utc) + content = tweet.find('p', 'tweet-text').text + yield Tweet(f'https://twitter.com/{username}/status/{tweetID}', date, content) def _check_json_callback(self, r): if r.headers.get('content-type') != 'application/json;charset=utf-8':