From dd25fd0526e1c97e9ebac372b08a5f60d4275c23 Mon Sep 17 00:00:00 2001 From: JustAnotherArchivist Date: Mon, 24 Aug 2020 01:38:27 +0000 Subject: [PATCH] Add support for extracting the entity behind a scrape Closes #11 Backwards incompatibility: snscrape.modules.twitter.Account is now called User. However, this was previously only used on the list member scraper, which has been broken for a while since the list member list is no longer publicly accessible. For compatibility reasons, the CLI does not output the entity by default; the new option --with-entity enables it. --- snscrape/base.py | 20 ++++++ snscrape/cli.py | 8 +++ snscrape/modules/facebook.py | 108 +++++++++++++++++++++++++++++-- snscrape/modules/instagram.py | 75 ++++++++++++++++++--- snscrape/modules/telegram.py | 86 ++++++++++++++++++++++--- snscrape/modules/twitter.py | 84 ++++++++++++++++++++++-- snscrape/modules/vkontakte.py | 118 ++++++++++++++++++++++++++++++---- 7 files changed, 459 insertions(+), 40 deletions(-) diff --git a/snscrape/base.py b/snscrape/base.py index 443efc2..d9d4a8c 100644 --- a/snscrape/base.py +++ b/snscrape/base.py @@ -17,6 +17,22 @@ class Item: pass +class Entity: + '''An abstract base class for an entity returned by the scraper's get_entity method. + + An entity is typically the account of a person or organisation. The string representation should be the preferred direct URL to the entity's page on the network.''' + + @abc.abstractmethod + def __str__(self): + pass + + +Granularity = int +'''Type of fields storing the unit/granularity of numbers. + +For example, a granularity of 1000 means that the SNS returned something like '42k' and the last three significant digits are unknown.''' + + class URLItem(Item): '''A generic item which only holds a URL string.''' @@ -49,6 +65,10 @@ class Scraper: '''Iterator yielding Items.''' pass + def get_entity(self): + '''Get the entity behind the scraper, if any.''' + return None + def _request(self, method, url, params = None, data = None, headers = None, timeout = 10, responseOkCallback = None): for attempt in range(self._retries + 1): # The request is newly prepared on each retry because of potential cookie updates. diff --git a/snscrape/cli.py b/snscrape/cli.py index c81e0c3..b05a380 100644 --- a/snscrape/cli.py +++ b/snscrape/cli.py @@ -170,6 +170,7 @@ def parse_args(): group = parser.add_mutually_exclusive_group(required = False) group.add_argument('-f', '--format', dest = 'format', type = str, default = None, help = 'Output format') group.add_argument('--jsonl', dest = 'jsonl', action = 'store_true', default = False, help = 'Output JSONL') + parser.add_argument('--with-entity', dest = 'withEntity', action = 'store_true', default = False, help = 'Include the entity (e.g. user, channel) as the first output item') parser.add_argument('--since', type = parse_datetime_arg, metavar = 'DATETIME', help = 'Only return results newer than DATETIME') subparsers = parser.add_subparsers(dest = 'scraper', help = 'The scraper you want to use') @@ -236,6 +237,13 @@ def main(): i = 0 with _dump_locals_on_exception(): + if args.withEntity: + entity = scraper.get_entity() + if entity: + if args.jsonl: + print(json.dumps(entity._asdict(), default = json_serialise_datetime)) + else: + print(entity) for i, item in enumerate(scraper.get_items(), start = 1): if args.since is not None and item.date < args.since: logger.info(f'Exiting due to reaching older results than {args.since}') diff --git a/snscrape/modules/facebook.py b/snscrape/modules/facebook.py index 56fd6d8..670aa97 100644 --- a/snscrape/modules/facebook.py +++ b/snscrape/modules/facebook.py @@ -23,6 +23,25 @@ class FacebookPost(typing.NamedTuple, snscrape.base.Item): return self.cleanUrl +class User(typing.NamedTuple, snscrape.base.Entity): + username: str + pageId: int + name: str + verified: bool + created: typing.Optional[datetime.date] = None + pageOwner: typing.Optional[str] = None + likes: typing.Optional[int] = None + followers: typing.Optional[int] = None + checkins: typing.Optional[int] = None + address: typing.Optional[str] = None + phone: typing.Optional[str] = None + web: typing.Optional[str] = None + keywords: typing.Optional[typing.List[str]] = None + + def __str__(self): + return f'https://www.facebook.com/{self.username}/' + + class FacebookCommonScraper(snscrape.base.Scraper): def _clean_url(self, dirtyUrl): u = urllib.parse.urlparse(dirtyUrl) @@ -136,6 +155,19 @@ class FacebookUserAndCommunityScraper(FacebookCommonScraper): def __init__(self, username, **kwargs): super().__init__(**kwargs) self._username = username + self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'} + self._initialPage = None + self._initialPageSoup = None + + def _initial_page(self): + if self._initialPage is None: + logger.info('Retrieving initial data') + r = self._get(self._baseUrl, headers = self._headers) + if r.status_code not in (200, 404): + raise snscrape.base.ScraperException('Got status code {r.status_code}') + self._initialPage = r + self._initialPageSoup = bs4.BeautifulSoup(r.text, 'lxml') + return self._initialPage, self._initialPageSoup def get_items(self): headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.5'} @@ -143,14 +175,10 @@ class FacebookUserAndCommunityScraper(FacebookCommonScraper): nextPageLinkPattern = re.compile(r'^/pages_reaction_units/more/\?page_id=') spuriousForLoopPattern = re.compile(r'^for \(;;\);') - logger.info('Retrieving initial data') - r = self._get(self._baseUrl, headers = headers) + r, soup = self._initial_page() if r.status_code == 404: logger.warning('User does not exist') return - elif r.status_code != 200: - raise snscrape.base.ScraperException('Got status code {r.status_code}') - soup = bs4.BeautifulSoup(r.text, 'lxml') yield from self._soup_to_items(soup, self._baseUrl, 'user') nextPageLink = soup.find('a', ajaxify = nextPageLinkPattern) @@ -190,6 +218,76 @@ class FacebookUserScraper(FacebookUserAndCommunityScraper): super().__init__(*args, **kwargs) self._baseUrl = f'https://www.facebook.com/{self._username}/' + def get_entity(self): + kwargs = {} + + nameVerifiedMarkupPattern = re.compile(r'"markup":\[\["__markup_a588f507_0_0",\{"__html":(".*?")\}') + handleDivPattern = re.compile(r']*(?<=\s)data-key\s*=\s*"tab_home".*?') + handlePattern = re.compile(r']*(?<=\s)href="/([^/]+)') + months = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] + createdDatePattern = re.compile('^(' + '|'.join(months) + r') (\d+), (\d+)$') + + r, soup = self._initial_page() + if r.status_code != 200: + return + + handleDiv = handleDivPattern.search(r.text) + handle = handlePattern.search(handleDiv.group(0)) + kwargs['username'] = handle.group(1) + + nameVerifiedMarkup = nameVerifiedMarkupPattern.search(r.text) + nameVerifiedMarkup = json.loads(nameVerifiedMarkup.group(1)) + nameVerifiedSoup = bs4.BeautifulSoup(nameVerifiedMarkup, 'lxml') + kwargs['name'] = nameVerifiedSoup.find('a', class_ = '_64-f').text + kwargs['verified'] = bool(nameVerifiedSoup.find('a', class_ = '_56_f')) + + pageTransparencyContentDiv = soup.find('div', class_ = '_61-0') + if pageTransparencyContentDiv.text.startswith('Page created - '): + createdDateMess = pageTransparencyContentDiv.text.split(' - ', 1)[1] + m = createdDatePattern.match(createdDateMess) + assert m, 'unexpected created div content' + kwargs['created'] = datetime.date(int(m.group(3)), months.index(m.group(1)) + 1, int(m.group(2))) + if pageTransparencyContentDiv.text.startswith('Confirmed Page Owner: '): + kwargs['pageOwner'] = pageTransparencyContentDiv.text.split(': ', 1)[1] + + communityDiv = soup.find('div', class_ = '_6590') + for div in communityDiv.find_all('div', class_ = '_4bl9'): + text = div.text + if text.endswith(' people like this'): + kwargs['likes'] = int(text.split(' ', 1)[0].replace(',', '')) + elif text.endswith(' people follow this'): + kwargs['followers'] = int(text.split(' ', 1)[0].replace(',', '')) + elif text.endswith(' check-ins'): + kwargs['checkins'] = int(text.split(' ', 1)[0].replace(',', '')) + + aboutDiv = soup.find('div', class_ = '_u9q') + if aboutDiv: + # As if the above wasn't already ugly enough, this is where it gets really bad... + for div in aboutDiv.find_all('div', class_ = '_2pi9'): + img = div.find('img', class_ = '_3-91') + if not img: + continue + if img['src'] == 'https://static.xx.fbcdn.net/rsrc.php/v3/y5/r/vfXKA62x4Da.png': # Address + rawAddress = div.find('div', class_ = '_2wzd').text + kwargs['address'] = re.sub(r' \((\d+,)?\d+(\.\d+)? mi\)', '\n', rawAddress) # Remove distance from inferred IP location, restore linebreak + elif img['src'] == 'https://static.xx.fbcdn.net/rsrc.php/v3/yW/r/mYv88EsODOI.png': # Phone number + kwargs['phone'] = div.find('div', class_ = '_4bl9').text + elif img['src'] == 'https://static.xx.fbcdn.net/rsrc.php/v3/yx/r/xVA3lB-GVep.png': # Web link + for a in div.find_all('a'): + if a.text == '' or 'href' not in a.attrs or a.find('span'): + continue + dirtyWeb = a['href'] + assert dirtyWeb.startswith('https://l.facebook.com/l.php?u='), 'unexpected web link' + kwargs['web'] = urllib.parse.unquote(dirtyWeb.split('=', 1)[1].split('&', 1)[0]) + elif img['src'] == 'https://static.xx.fbcdn.net/rsrc.php/v3/yl/r/LwDWwC1d0Rx.png': # Keywords + kwargs['keywords'] = div.find('div', class_ = '_4bl9').text.split(' ยท ') + + androidUrlMeta = soup.find('meta', property = 'al:android:url') + assert androidUrlMeta['content'].startswith('fb://page/') and androidUrlMeta['content'].endswith('?referrer=app_link') + kwargs['pageId'] = int(androidUrlMeta['content'][10:-18]) + + return User(**kwargs) + class FacebookCommunityScraper(FacebookUserAndCommunityScraper): name = 'facebook-community' diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index ff34e71..eeaec77 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -2,6 +2,7 @@ import datetime import hashlib import json import logging +import re import snscrape.base import typing @@ -26,6 +27,20 @@ class InstagramPost(typing.NamedTuple, snscrape.base.Item): return self.cleanUrl +class User(typing.NamedTuple, snscrape.base.Entity): + username: str + name: typing.Optional[str] + followers: int + followersGranularity: snscrape.base.Granularity + following: int + followingGranularity: snscrape.base.Granularity + posts: int + postsGranularity: snscrape.base.Granularity + + def __str__(self): + return f'https://www.instagram.com/{self.username}/' + + class InstagramCommonScraper(snscrape.base.Scraper): def __init__(self, mode, name, **kwargs): super().__init__(**kwargs) @@ -34,6 +49,8 @@ class InstagramCommonScraper(snscrape.base.Scraper): self._mode = mode self._name = name + self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} + if self._mode == 'User': self._initialUrl = f'https://www.instagram.com/{self._name}/' self._pageName = 'ProfilePage' @@ -58,6 +75,7 @@ class InstagramCommonScraper(snscrape.base.Scraper): self._pageIDKey = 'id' self._queryHash = '1b84447a4d8b6d6d0426fefb34514485' self._variablesFormat = '{{"id":"{pageID}","first":50,"after":"{endCursor}"}}' + self._initialPage = None def _response_to_items(self, response): for node in response[self._responseContainer][self._edgeXToMedia]['edges']: @@ -79,6 +97,17 @@ class InstagramCommonScraper(snscrape.base.Scraper): isVideo = node['node']['is_video'], ) + def _initial_page(self): + if self._initialPage is None: + logger.info('Retrieving initial data') + r = self._get(self._initialUrl, headers = self._headers, responseOkCallback = self._check_initial_page_callback) + if r.status_code not in (200, 404): + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + elif r.url.startswith('https://www.instagram.com/accounts/login/'): + raise snscrape.base.ScraperException('Redirected to login page') + self._initialPage = r + return self._initialPage + def _check_initial_page_callback(self, r): if r.status_code != 200: return True, None @@ -101,17 +130,10 @@ class InstagramCommonScraper(snscrape.base.Scraper): return True, None def get_items(self): - headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} - - logger.info('Retrieving initial data') - r = self._get(self._initialUrl, headers = headers, responseOkCallback = self._check_initial_page_callback) + r = self._initial_page() if r.status_code == 404: logger.warning(f'{self._mode} does not exist') return - elif r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - elif r.url.startswith('https://www.instagram.com/accounts/login/'): - raise snscrape.base.ScraperException('Redirected to login page') response = r._snscrape_json_obj rhxGis = response['rhx_gis'] if 'rhx_gis' in response else '' if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0: @@ -126,6 +148,7 @@ class InstagramCommonScraper(snscrape.base.Scraper): return endCursor = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] + headers = self._headers.copy() while True: logger.info(f'Retrieving endCursor = {endCursor!r}') variables = self._variablesFormat.format(**locals()) @@ -156,6 +179,42 @@ class InstagramUserScraper(InstagramCommonScraper): def from_args(cls, args): return cls('User', args.username, retries = args.retries) + def get_entity(self): + r = self._initial_page() + if r.status_code != 200: + return + if '