diff --git a/snscrape/modules/facebook.py b/snscrape/modules/facebook.py index cefe14c..e8a7ffb 100644 --- a/snscrape/modules/facebook.py +++ b/snscrape/modules/facebook.py @@ -1,3 +1,6 @@ +__all__ = ['FacebookPost', 'User', 'FacebookUserScraper', 'FacebookCommunityScraper', 'FacebookGroupScraper'] + + import bs4 import dataclasses import datetime @@ -9,7 +12,7 @@ import typing import urllib.parse -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) @dataclasses.dataclass @@ -46,7 +49,7 @@ class User(snscrape.base.Entity): return f'https://www.facebook.com/{self.username}/' -class FacebookCommonScraper(snscrape.base.Scraper): +class _FacebookCommonScraper(snscrape.base.Scraper): def _clean_url(self, dirtyUrl): u = urllib.parse.urlparse(dirtyUrl) if u.path == '/permalink.php': @@ -119,7 +122,7 @@ class FacebookCommonScraper(snscrape.base.Scraper): entryA = entry.find('a', class_ = '_5pcq') # There can be more than one, e.g. when a post is shared by another user, but the first one is always the one of this entry. mediaSetA = entry.find('a', class_ = '_17z-') if not mediaSetA and not entryA: - logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}') + _logger.warning(f'Ignoring link-less entry after {cleanUrl}: {entry.text!r}') continue if mediaSetA and (not entryA or entryA['href'] == '#'): href = mediaSetA['href'] @@ -128,7 +131,7 @@ class FacebookCommonScraper(snscrape.base.Scraper): oddLink, warn = self._is_odd_link(href, entry.text, mode) if oddLink: if warn: - logger.warning(f'Ignoring odd link: {href}') + _logger.warning(f'Ignoring odd link: {href}') continue dirtyUrl = urllib.parse.urljoin(baseUrl, href) cleanUrl = self._clean_url(dirtyUrl) @@ -146,7 +149,7 @@ class FacebookCommonScraper(snscrape.base.Scraper): continue query = urllib.parse.parse_qs(urllib.parse.urlparse(href).query) if 'u' not in query or len(query['u']) != 1: - logger.warning(f'Ignoring odd outlink: {href}') + _logger.warning(f'Ignoring odd outlink: {href}') continue outlink = query['u'][0] if outlink.startswith('http://') or outlink.startswith('https://') and outlink not in outlinks: @@ -154,7 +157,7 @@ class FacebookCommonScraper(snscrape.base.Scraper): yield FacebookPost(cleanUrl = cleanUrl, dirtyUrl = dirtyUrl, date = date, content = content, outlinks = outlinks) -class FacebookUserAndCommunityScraper(FacebookCommonScraper): +class _FacebookUserAndCommunityScraper(_FacebookCommonScraper): def __init__(self, username, **kwargs): super().__init__(**kwargs) self._username = username @@ -164,7 +167,7 @@ class FacebookUserAndCommunityScraper(FacebookCommonScraper): def _initial_page(self): if self._initialPage is None: - logger.info('Retrieving initial data') + _logger.info('Retrieving initial data') r = self._get(self._baseUrl, headers = self._headers) if r.status_code not in (200, 404): raise snscrape.base.ScraperException(f'Got status code {r.status_code}') @@ -178,12 +181,12 @@ class FacebookUserAndCommunityScraper(FacebookCommonScraper): r, soup = self._initial_page() if r.status_code == 404: - logger.warning('User does not exist') + _logger.warning('User does not exist') return yield from self._soup_to_items(soup, self._baseUrl, 'user') while (nextPageLink := soup.find('a', ajaxify = nextPageLinkPattern)): - logger.info('Retrieving next page') + _logger.info('Retrieving next page') # The web app sends a bunch of additional parameters. Most of them would be easy to add, but there's also __dyn, which is a compressed list of the "modules" loaded in the browser. # Reproducing that would be difficult to get right, especially as Facebook's codebase evolves, so it's just not sent at all here. @@ -210,7 +213,7 @@ class FacebookUserAndCommunityScraper(FacebookCommonScraper): return cls._construct(args, args.username) -class FacebookUserScraper(FacebookUserAndCommunityScraper): +class FacebookUserScraper(_FacebookUserAndCommunityScraper): name = 'facebook-user' def __init__(self, *args, **kwargs): @@ -288,7 +291,7 @@ class FacebookUserScraper(FacebookUserAndCommunityScraper): return User(**kwargs) -class FacebookCommunityScraper(FacebookUserAndCommunityScraper): +class FacebookCommunityScraper(_FacebookUserAndCommunityScraper): name = 'facebook-community' def __init__(self, *args, **kwargs): @@ -296,7 +299,7 @@ class FacebookCommunityScraper(FacebookUserAndCommunityScraper): self._baseUrl = f'https://www.facebook.com/{self._username}/community/' -class FacebookGroupScraper(FacebookCommonScraper): +class FacebookGroupScraper(_FacebookCommonScraper): name = 'facebook-group' def __init__(self, group, **kwargs): @@ -313,7 +316,7 @@ class FacebookGroupScraper(FacebookCommonScraper): baseUrl = f'https://upload.facebook.com/groups/{self._group}/?sorting_setting=CHRONOLOGICAL' r = self._get(baseUrl, headers = headers) if r.status_code == 404: - logger.warning('Group does not exist') + _logger.warning('Group does not exist') return elif r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') diff --git a/snscrape/modules/instagram.py b/snscrape/modules/instagram.py index 641c823..a3fa977 100644 --- a/snscrape/modules/instagram.py +++ b/snscrape/modules/instagram.py @@ -1,3 +1,6 @@ +__all__ = ['InstagramPost', 'User', 'InstagramUserScraper', 'InstagramHashtagScraper', 'InstagramLocationScraper'] + + import dataclasses import datetime import hashlib @@ -8,7 +11,7 @@ import snscrape.base import typing -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) @dataclasses.dataclass @@ -44,7 +47,7 @@ class User(snscrape.base.Entity): return f'https://www.instagram.com/{self.username}/' -class InstagramCommonScraper(snscrape.base.Scraper): +class _InstagramCommonScraper(snscrape.base.Scraper): def __init__(self, **kwargs): super().__init__(**kwargs) self._headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} @@ -70,7 +73,7 @@ class InstagramCommonScraper(snscrape.base.Scraper): def _initial_page(self): if self._initialPage is None: - logger.info('Retrieving initial data') + _logger.info('Retrieving initial data') r = self._get(self._initialUrl, headers = self._headers, responseOkCallback = self._check_initial_page_callback) if r.status_code not in (200, 404): raise snscrape.base.ScraperException(f'Got status code {r.status_code}') @@ -103,15 +106,15 @@ class InstagramCommonScraper(snscrape.base.Scraper): def get_items(self): r = self._initial_page() if r.status_code == 404: - logger.warning(f'Page does not exist') + _logger.warning(f'Page does not exist') return response = r._snscrape_json_obj rhxGis = response['rhx_gis'] if 'rhx_gis' in response else '' if response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['count'] == 0: - logger.info(f'Page has no posts') + _logger.info(f'Page has no posts') return if not response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._edgeXToMedia]['edges']: - logger.warning('Private account') + _logger.warning('Private account') return pageID = response['entry_data'][self._pageName][0]['graphql'][self._responseContainer][self._pageIDKey] yield from self._response_to_items(response['entry_data'][self._pageName][0]['graphql']) @@ -121,7 +124,7 @@ class InstagramCommonScraper(snscrape.base.Scraper): headers = self._headers.copy() while True: - logger.info(f'Retrieving endCursor = {endCursor!r}') + _logger.info(f'Retrieving endCursor = {endCursor!r}') variables = self._variablesFormat.format(**locals()) headers['X-Requested-With'] = 'XMLHttpRequest' headers['X-Instagram-GIS'] = hashlib.md5(f'{rhxGis}:{variables}'.encode('utf-8')).hexdigest() @@ -139,7 +142,7 @@ class InstagramCommonScraper(snscrape.base.Scraper): endCursor = response['data'][self._responseContainer][self._edgeXToMedia]['page_info']['end_cursor'] -class InstagramUserScraper(InstagramCommonScraper): +class InstagramUserScraper(_InstagramCommonScraper): name = 'instagram-user' def __init__(self, username, **kwargs): @@ -194,7 +197,7 @@ class InstagramUserScraper(InstagramCommonScraper): ) -class InstagramHashtagScraper(InstagramCommonScraper): +class InstagramHashtagScraper(_InstagramCommonScraper): name = 'instagram-hashtag' def __init__(self, hashtag, **kwargs): @@ -216,7 +219,7 @@ class InstagramHashtagScraper(InstagramCommonScraper): return cls._construct(args, args.hashtag) -class InstagramLocationScraper(InstagramCommonScraper): +class InstagramLocationScraper(_InstagramCommonScraper): name = 'instagram-location' def __init__(self, locationId, **kwargs): diff --git a/snscrape/modules/reddit.py b/snscrape/modules/reddit.py index 1003f50..9668afd 100644 --- a/snscrape/modules/reddit.py +++ b/snscrape/modules/reddit.py @@ -1,3 +1,6 @@ +__all__ = ['Submission', 'Comment', 'RedditUserScraper', 'RedditSubredditScraper', 'RedditSearchScraper'] + + import dataclasses import datetime import logging @@ -9,7 +12,7 @@ import time import typing -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # Most of these fields should never be None, but due to broken data, they sometimes are anyway... @@ -43,7 +46,7 @@ class Comment(snscrape.base.Item): return self.url -class RedditPushshiftScraper(snscrape.base.Scraper): +class _RedditPushshiftScraper(snscrape.base.Scraper): def __init__(self, name, submissions = True, comments = True, before = None, after = None, **kwargs): super().__init__(**kwargs) self._name = name @@ -59,7 +62,7 @@ class RedditPushshiftScraper(snscrape.base.Scraper): def _handle_rate_limiting(self, r): if r.status_code == 429: - logger.info('Got 429 response, sleeping') + _logger.info('Got 429 response, sleeping') time.sleep(10) return False, 'rate-limited' if r.status_code != 200: @@ -128,7 +131,7 @@ class RedditPushshiftScraper(snscrape.base.Scraper): else: # E.g. submission 617p51 but can likely happen for comments as well permalink = f'/comments/{d["link_id"][3:]}/_/{d["id"]}/' else: - logger.warning(f'Unable to find or construct permalink') + _logger.warning(f'Unable to find or construct permalink') permalink = '/' kwargs = { @@ -215,19 +218,19 @@ class RedditPushshiftScraper(snscrape.base.Scraper): return cls._construct(args, getattr(args, name), submissions = not args.noSubmissions, comments = not args.noComments, before = args.before, after = args.after) -class RedditUserScraper(RedditPushshiftScraper): +class RedditUserScraper(_RedditPushshiftScraper): name = 'reddit-user' _validationFunc = lambda x: re.match('^[A-Za-z0-9_-]{3,20}$', x) _apiField = 'author' -class RedditSubredditScraper(RedditPushshiftScraper): +class RedditSubredditScraper(_RedditPushshiftScraper): name = 'reddit-subreddit' _validationFunc = lambda x: re.match('^[A-Za-z0-9][A-Za-z0-9_]{2,20}$', x) _apiField = 'subreddit' -class RedditSearchScraper(RedditPushshiftScraper): +class RedditSearchScraper(_RedditPushshiftScraper): name = 'reddit-search' _validationFunc = lambda x: True _apiField = 'q' diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 9076975..fb6bf24 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -1,3 +1,6 @@ +__all__ = ['LinkPreview', 'TelegramPost', 'Channel', 'TelegramChannelScraper'] + + import bs4 import dataclasses import datetime @@ -8,7 +11,7 @@ import typing import urllib.parse -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') @@ -84,7 +87,7 @@ class TelegramChannelScraper(snscrape.base.Scraper): dateDiv = post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date') rawUrl = dateDiv['href'] if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[1].strip('0123456789') != '': - logger.warning(f'Possibly incorrect URL: {rawUrl!r}') + _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') url = rawUrl.replace('//t.me/', '//t.me/s/') date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') if (message := post.find('div', class_ = 'tgme_widget_message_text')): @@ -120,14 +123,14 @@ class TelegramChannelScraper(snscrape.base.Scraper): if imageI['style'].startswith("background-image:url('"): kwargs['image'] = imageI['style'][22 : imageI['style'].index("'", 22)] else: - logger.warning(f'Could not process link preview image on {url}') + _logger.warning(f'Could not process link preview image on {url}') linkPreview = LinkPreview(**kwargs) yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) def get_items(self): r, soup = self._initial_page() if '/s/' not in r.url: - logger.warning('No public post list for this user') + _logger.warning('No public post list for this user') return while True: yield from self._soup_to_items(soup, r.url) @@ -166,7 +169,7 @@ class TelegramChannelScraper(snscrape.base.Scraper): kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername = True)) except StopIteration: # If there are no posts, fall back to the channel info div, although that should never happen due to the 'Channel created' entry. - logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') + _logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') kwargs['username'] = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_username').text[1:] # Remove @ if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): kwargs['description'] = descriptionDiv.text diff --git a/snscrape/modules/twitter.py b/snscrape/modules/twitter.py index c1f870e..fd67eb9 100644 --- a/snscrape/modules/twitter.py +++ b/snscrape/modules/twitter.py @@ -1,3 +1,20 @@ +__all__ = [ + 'Tweet', 'Medium', 'Photo', 'VideoVariant', 'Video', 'Gif', 'DescriptionUrl', 'Coordinates', 'Place', + 'User', 'UserLabel', + 'Trend', + 'ScrollDirection', + 'GuestTokenManager', + 'TwitterSearchScraper', + 'TwitterUserScraper', + 'TwitterProfileScraper', + 'TwitterHashtagScraper', + 'TwitterTweetScraperMode', + 'TwitterTweetScraper', + 'TwitterListPostsScraper', + 'TwitterTrendsScraper', +] + + import collections import dataclasses import datetime @@ -15,7 +32,7 @@ import typing import urllib.parse -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _API_AUTHORIZATION_HEADER = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA' _globalGuestTokenManager = None @@ -193,7 +210,7 @@ class GuestTokenManager: self._setTime = 0.0 -class TwitterAPIScraper(snscrape.base.Scraper): +class _TwitterAPIScraper(snscrape.base.Scraper): def __init__(self, baseUrl, guestTokenManager = None, **kwargs): super().__init__(**kwargs) self._baseUrl = baseUrl @@ -223,24 +240,24 @@ class TwitterAPIScraper(snscrape.base.Scraper): def _ensure_guest_token(self, url = None): if self._guestTokenManager.token is None: - logger.info('Retrieving guest token') + _logger.info('Retrieving guest token') r = self._get(self._baseUrl if url is None else url, headers = {'User-Agent': self._userAgent}, responseOkCallback = self._check_guest_token_response) if (match := re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+); Max-Age=10800; Domain=\.twitter\.com; Path=/; Secure"\);', r.text)): - logger.debug('Found guest token in HTML') + _logger.debug('Found guest token in HTML') self._guestTokenManager.token = match.group(1) if 'gt' in r.cookies: - logger.debug('Found guest token in cookies') + _logger.debug('Found guest token in cookies') self._guestTokenManager.token = r.cookies['gt'] if not self._guestTokenManager.token: - logger.debug('No guest token in response') - logger.info('Retrieving guest token via API') + _logger.debug('No guest token in response') + _logger.info('Retrieving guest token via API') r = self._post('https://api.twitter.com/1.1/guest/activate.json', data = b'', headers = self._apiHeaders, responseOkCallback = self._check_guest_token_response) o = r.json() if not o.get('guest_token'): raise snscrape.base.ScraperError('Unable to retrieve guest token') self._guestTokenManager.token = o['guest_token'] assert self._guestTokenManager.token - logger.debug(f'Using guest token {self._guestTokenManager.token}') + _logger.debug(f'Using guest token {self._guestTokenManager.token}') self._session.cookies.set('gt', self._guestTokenManager.token, domain = '.twitter.com', path = '/', secure = True, expires = self._guestTokenManager.setTime + 10800) self._apiHeaders['x-guest-token'] = self._guestTokenManager.token @@ -291,7 +308,7 @@ class TwitterAPIScraper(snscrape.base.Scraper): stopOnEmptyResponse = False emptyResponsesOnCursor = 0 while True: - logger.info(f'Retrieving scroll page {cursor}') + _logger.info(f'Retrieving scroll page {cursor}') obj = self._get_api_data(endpoint, reqParams) yield obj @@ -375,14 +392,14 @@ class TwitterAPIScraper(snscrape.base.Scraper): if 'promotedMetadata' in entry['item']['content']['tweet']: # Promoted tweet aka ads return if entry['item']['content']['tweet']['id'] not in obj['globalObjects']['tweets']: - logger.warning(f'Skipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects') + _logger.warning(f'Skipping tweet {entry["item"]["content"]["tweet"]["id"]} which is not in globalObjects') return tweet = obj['globalObjects']['tweets'][entry['item']['content']['tweet']['id']] elif 'tombstone' in entry['item']['content']: if 'tweet' not in entry['item']['content']['tombstone']: # E.g. deleted reply return if entry['item']['content']['tombstone']['tweet']['id'] not in obj['globalObjects']['tweets']: - logger.warning(f'Skipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects') + _logger.warning(f'Skipping tweet {entry["item"]["content"]["tombstone"]["tweet"]["id"]} which is not in globalObjects') return tweet = obj['globalObjects']['tweets'][entry['item']['content']['tombstone']['tweet']['id']] else: @@ -417,11 +434,11 @@ class TwitterAPIScraper(snscrape.base.Scraper): for medium in tweet['extended_entities']['media']: if medium['type'] == 'photo': if '.' not in medium['media_url_https']: - logger.warning(f'Skipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot') + _logger.warning(f'Skipping malformed medium URL on tweet {kwargs["id"]}: {medium["media_url_https"]!r} contains no dot') continue baseUrl, format = medium['media_url_https'].rsplit('.', 1) if format not in ('jpg', 'png'): - logger.warning(f'Skipping photo with unknown format on tweet {kwargs["id"]}: {format!r}') + _logger.warning(f'Skipping photo with unknown format on tweet {kwargs["id"]}: {format!r}') continue media.append(Photo( previewUrl = f'{baseUrl}?format={format}&name=small', @@ -536,7 +553,7 @@ class TwitterAPIScraper(snscrape.base.Scraper): return UserLabel(**labelKwargs) -class TwitterSearchScraper(TwitterAPIScraper): +class TwitterSearchScraper(_TwitterAPIScraper): name = 'twitter-search' def __init__(self, query, cursor = None, top = False, **kwargs): @@ -765,7 +782,7 @@ class TwitterTweetScraperMode(enum.Enum): return cls.SINGLE -class TwitterTweetScraper(TwitterAPIScraper): +class TwitterTweetScraper(_TwitterAPIScraper): name = 'twitter-tweet' def __init__(self, tweetId, mode = TwitterTweetScraperMode.SINGLE, **kwargs): @@ -851,7 +868,7 @@ class TwitterListPostsScraper(TwitterSearchScraper): return cls._construct(args, args.list) -class TwitterTrendsScraper(TwitterAPIScraper): +class TwitterTrendsScraper(_TwitterAPIScraper): name = 'twitter-trends' def __init__(self, **kwargs): diff --git a/snscrape/modules/vkontakte.py b/snscrape/modules/vkontakte.py index 9ee4419..99cfb17 100644 --- a/snscrape/modules/vkontakte.py +++ b/snscrape/modules/vkontakte.py @@ -1,3 +1,6 @@ +__all__ = ['VKontaktePost', 'Photo', 'PhotoVariant', 'Video', 'User', 'VKontakteUserScraper'] + + import bs4 import collections import dataclasses @@ -14,23 +17,23 @@ try: except ImportError: # Python 3.8 support; nowadays, Europe/Moscow is always UTC+3, but it's more complicated before 2014, so need proper zone info import pytz - def timezone(s): + def _timezone(s): return pytz.timezone(s) - def localised_datetime(tz, *args, **kwargs): + def _localised_datetime(tz, *args, **kwargs): return tz.localize(datetime.datetime(*args, **kwargs)) else: - def timezone(s): + def _timezone(s): return zoneinfo.ZoneInfo(s) - def localised_datetime(tz, *args, **kwargs): + def _localised_datetime(tz, *args, **kwargs): return datetime.datetime(*args, tzinfo = tz, **kwargs) -logger = logging.getLogger(__name__) -months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] -datePattern = re.compile(r'^(?Ptoday' +_logger = logging.getLogger(__name__) +_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] +_datePattern = re.compile(r'^(?Ptoday' r'|yesterday' - r'|(?P\d+)\s+(?P' + '|'.join(months) + ')(\s+(?P\d{4}))?' - r'|(?P' + '|'.join(months) + r')\s+(?P\d+),\s+(?P\d{4})' + r'|(?P\d+)\s+(?P' + '|'.join(_months) + ')(\s+(?P\d{4}))?' + r'|(?P' + '|'.join(_months) + r')\s+(?P\d+),\s+(?P\d{4})' ')' r'\s+at\s+(?P\d+):(?P\d+)\s+(?P[ap]m)$') @@ -119,9 +122,9 @@ class VKontakteUserScraper(snscrape.base.Scraper): return None if 'time' in dateSpan.attrs: return datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) - if (match := datePattern.match(dateSpan.text)): + if (match := _datePattern.match(dateSpan.text)): # Datetime information down to minutes - tz = timezone('Europe/Moscow') + tz = _timezone('Europe/Moscow') if match.group('date') in ('today', 'yesterday'): date = datetime.datetime.now(tz = tz) if match.group('date') == 'yesterday': @@ -129,7 +132,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): year, month, day = date.year, date.month, date.day else: year = int(match.group('year1') or match.group('year2') or datetime.datetime.now(tz = tz).year) - month = months.index(match.group('month1') or match.group('month2')) + 1 + month = _months.index(match.group('month1') or match.group('month2')) + 1 day = int(match.group('day1') or match.group('day2')) hour = int(match.group('hour')) # Damn AM/PM... @@ -138,17 +141,17 @@ class VKontakteUserScraper(snscrape.base.Scraper): if match.group('ampm') == 'pm': hour += 12 minute = int(match.group('minute')) - return localised_datetime(tz, year, month, day, hour, minute) - if (match := re.match(r'^(?P\d+)\s+(?P' + '|'.join(months) + r')\s+(?P\d{4})$', dateSpan.text)): + return _localised_datetime(tz, year, month, day, hour, minute) + if (match := re.match(r'^(?P\d+)\s+(?P' + '|'.join(_months) + r')\s+(?P\d{4})$', dateSpan.text)): # Date only - return datetime.date(int(match.group('year')), months.index(match.group('month')) + 1, int(match.group('day'))) + return datetime.date(int(match.group('year')), _months.index(match.group('month')) + 1, int(match.group('day'))) if dateSpan.text not in ('video', 'photo'): # Silently ignore video and photo reposts which have no original date attached - logger.warning(f'Could not parse date string: {dateSpan.text!r}') + _logger.warning(f'Could not parse date string: {dateSpan.text!r}') def _post_div_to_item(self, post, isCopy = False): postLink = post.find('a', class_ = 'post_link' if not isCopy else 'published_by_date') if not postLink: - logger.warning(f'Skipping post without link: {str(post)[:200]!r}') + _logger.warning(f'Skipping post without link: {str(post)[:200]!r}') return url = urllib.parse.urljoin(self._baseUrl, postLink['href']) assert (url.startswith('https://vk.com/wall') or (isCopy and (url.startswith('https://vk.com/video') or url.startswith('https://vk.com/photo')))) and '_' in url and url[-1] != '_' and url.rsplit('_', 1)[1].strip('0123456789') == '' @@ -170,7 +173,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): photos = [] for a in thumbsDiv.find_all('a', class_ = 'page_post_thumb_wrap'): if 'data-photo-id' not in a.attrs and 'data-video' not in a.attrs: - logger.warning(f'Skipping non-photo and non-video thumb wrap on {url}') + _logger.warning(f'Skipping non-photo and non-video thumb wrap on {url}') continue if 'data-video' in a.attrs: # Video @@ -184,7 +187,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): continue # From here on: photo if 'onclick' not in a.attrs or not a['onclick'].startswith("return showPhoto('") or '{"temp":' not in a['onclick'] or not a['onclick'].endswith('}, event)'): - logger.warning(f'Photo thumb wrap on {url} has no or unexpected onclick, skipping') + _logger.warning(f'Photo thumb wrap on {url} has no or unexpected onclick, skipping') continue photoData = a['onclick'][a['onclick'].find('{"temp":') : -8] # -8 = len(', event)') photoObj = json.loads(photoData) @@ -200,7 +203,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): not all(photoObj['temp'][x] in (photoObj['temp'][f'{x}_'][0], photoObj['temp'][f'{x}_'][0] + '.jpg') for x in singleLetterKeys) or \ not all(photoObj['temp'][x].startswith('https://sun') and '.userapi.com/' in photoObj['temp'][x] for x in singleLetterKeys) or \ not all(len(photoObj['temp'][(x_ := f'{x}_')]) == 3 and isinstance(photoObj['temp'][x_][1], int) and isinstance(photoObj['temp'][x_][2], int) for x in singleLetterKeys): - logger.warning(f'Photo thumb wrap on {url} has unexpected data structure, skipping') + _logger.warning(f'Photo thumb wrap on {url} has unexpected data structure, skipping') continue photoVariants = [] for x in singleLetterKeys: @@ -225,7 +228,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): def _initial_page(self): if self._initialPage is None: - logger.info('Retrieving initial data') + _logger.info('Retrieving initial data') r = self._get(self._baseUrl, headers = self._headers) if r.status_code not in (200, 404): raise snscrape.base.ScraperException(f'Got status code {r.status_code}') @@ -236,21 +239,21 @@ class VKontakteUserScraper(snscrape.base.Scraper): def get_items(self): r, soup = self._initial_page() if r.status_code == 404: - logger.warning('Wall does not exist') + _logger.warning('Wall does not exist') return if soup.find('div', class_ = 'profile_closed_wall_dummy'): - logger.warning('Private profile') + _logger.warning('Private profile') return if (profileDeleted := soup.find('h5', class_ = 'profile_deleted_text')): # Unclear what this state represents, so just log website text. - logger.warning(profileDeleted.text) + _logger.warning(profileDeleted.text) return newestPost = soup.find('div', class_ = 'post') if not newestPost: - logger.info('Wall has no posts') + _logger.info('Wall has no posts') return ownerID = newestPost.attrs['data-post-id'].split('_')[0] # If there is a pinned post, we need its ID for the pagination requests @@ -279,7 +282,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): break if not posts.startswith('
'): @@ -299,7 +302,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): def _get_wall_offset(self, fixedPostID, ownerID, offset): headers = self._headers.copy() headers['X-Requested-With'] = 'XMLHttpRequest' - logger.info(f'Retrieving page offset {offset}') + _logger.info(f'Retrieving page offset {offset}') r = self._post( 'https://vk.com/al_wall.php', data = [('act', 'get_wall'), ('al', 1), ('fixed', fixedPostID), ('offset', offset), ('onlyCache', 'false'), ('owner_id', ownerID), ('type', 'own'), ('wall_start_from', offset)], @@ -338,7 +341,7 @@ class VKontakteUserScraper(snscrape.base.Scraper): continue for a in rowDiv.find_all('a'): if not a['href'].startswith('/away.php?to='): - logger.warning(f'Skipping odd website link: {a["href"]!r}') + _logger.warning(f'Skipping odd website link: {a["href"]!r}') continue websites.append(urllib.parse.unquote(a['href'].split('=', 1)[1].split('&', 1)[0])) if websites: @@ -377,4 +380,3 @@ class VKontakteUserScraper(snscrape.base.Scraper): @classmethod def from_args(cls, args): return cls._construct(args, args.username) - diff --git a/snscrape/modules/weibo.py b/snscrape/modules/weibo.py index 395fc1f..f6d49ac 100644 --- a/snscrape/modules/weibo.py +++ b/snscrape/modules/weibo.py @@ -1,10 +1,13 @@ +__all__ = ['Post', 'User', 'WeiboUserScraper'] + + import dataclasses import logging import snscrape.base import typing -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _userDoesNotExist = object() @@ -63,7 +66,7 @@ class WeiboUserScraper(snscrape.base.Scraper): # Redirect to uid URL self._uid = int(r.headers['Location'][3:]) elif r.status_code == 200 and '

用户不存在

' in r.text: - logger.warning('User does not exist') + _logger.warning('User does not exist') self._uid = _userDoesNotExist else: raise snscrape.base.ScraperError(f'Got unexpected response on resolving username ({r.status_code})') @@ -106,7 +109,7 @@ class WeiboUserScraper(snscrape.base.Scraper): o = r.json() for card in o['data']['cards']: if card['card_type'] != 9: - logger.warning(f'Skipping card of type {card["card_type"]}') + _logger.warning(f'Skipping card of type {card["card_type"]}') continue yield self._mblog_to_item(card['mblog']) if 'since_id' not in o['data']['cardlistInfo']: