diff --git a/snscrape/modules/vkontakte.py b/snscrape/modules/vkontakte.py index 86eff0b..987c0d5 100644 --- a/snscrape/modules/vkontakte.py +++ b/snscrape/modules/vkontakte.py @@ -3,6 +3,7 @@ import collections import dataclasses import datetime import itertools +import json import logging import snscrape.base import typing @@ -17,11 +18,37 @@ class VKontaktePost(snscrape.base.Item): url: str date: datetime.datetime content: str + outlinks: typing.Optional[typing.List[str]] = None + photos: typing.Optional[typing.List['Photo']] = None + video: typing.Optional['Video'] = None + quotedPost: typing.Optional['VKontaktePost'] = None def __str__(self): return self.url +@dataclasses.dataclass +class Photo: + variants: typing.List['PhotoVariant'] + url: typing.Optional[str] = None + + +@dataclasses.dataclass +class PhotoVariant: + url: str + width: int + height: int + + +@dataclasses.dataclass +class Video: + id: str + list: str + duration: int + url: str + thumbUrl: str + + @dataclasses.dataclass class User(snscrape.base.Entity): username: str @@ -56,15 +83,80 @@ class VKontakteUserScraper(snscrape.base.Scraper): self._initialPage = None self._initialPageSoup = None - def _post_div_to_item(self, post): - url = urllib.parse.urljoin(self._baseUrl, post.find('a', class_ = 'post_link')['href']) + def _away_a_to_url(self, a): + # Transform an tag with an href of /away.php?to=... to a plain URL; returns None if a doesn't have that form. + if a and a.get('href', '').startswith('/away.php?to='): + end = a['href'].find('&', 13) + if end == -1: + end = None + return urllib.parse.unquote(a['href'][13 : end]) + return None + + def _post_div_to_item(self, post, isCopy = False): + url = urllib.parse.urljoin(self._baseUrl, post.find('a', class_ = 'post_link' if not isCopy else 'published_by_date')['href']) assert url.startswith('https://vk.com/wall') and '_' in url and url[-1] != '_' and url.rsplit('_', 1)[1].strip('0123456789') == '' - dateSpan = post.find('div', class_ = 'post_date').find('span', class_ = 'rel_date') + dateSpan = post.find('div', class_ = 'post_date' if not isCopy else 'copy_post_date').find('span', class_ = 'rel_date') textDiv = post.find('div', class_ = 'wall_post_text') + outlinks = [h for a in textDiv.find_all('a') if (h := self._away_a_to_url(a))] if textDiv else [] + if (mediaLinkDiv := post.find('div', class_ = 'media_link')) and \ + (mediaLinkA := mediaLinkDiv.find('a', class_ = 'media_link__title')) and \ + (href := self._away_a_to_url(mediaLinkA)) and \ + href not in outlinks: + outlinks.append(href) + photos = None + video = None + if (thumbsDiv := (post.find('div', class_ = 'wall_text') if not isCopy else post).find('div', class_ = 'page_post_sized_thumbs')) and \ + not (not isCopy and thumbsDiv.parent.name == 'div' and 'class' in thumbsDiv.parent.attrs and 'copy_quote' in thumbsDiv.parent.attrs['class']): # Skip post quotes + photos = [] + for a in thumbsDiv.find_all('a', class_ = 'page_post_thumb_wrap'): + if 'data-photo-id' not in a.attrs and 'data-video' not in a.attrs: + logger.warning(f'Skipping non-photo and non-video thumb wrap on {url}') + continue + if 'data-video' in a.attrs: + # Video + video = Video( + id = a['data-video'], + list = a['data-list'], + duration = int(a['data-duration']), + url = f'https://vk.com{a["href"]}', + thumbUrl = a['style'][(begin := a['style'].find('background-image: url(') + 22) : a['style'].find(')', begin)], + ) + continue + # From here on: photo + if 'onclick' not in a.attrs or not a['onclick'].startswith("return showPhoto('") or '{"temp":' not in a['onclick'] or not a['onclick'].endswith('}, event)'): + logger.warning(f'Photo thumb wrap on {url} has no or unexpected onclick, skipping') + continue + photoData = a['onclick'][a['onclick'].find('{"temp":') : -8] # -8 = len(', event)') + photoObj = json.loads(photoData) + singleLetterKeys = [k for k in photoObj['temp'].keys() if len(k) == 1 and 97 <= ord(k) <= 122] # 97 = ord('a'), 122 = ord('z') + for x in singleLetterKeys: + # Merge base into URLs + if not photoObj['temp'][x].startswith('https://'): + photoObj['temp'][x] = f'{photoObj["temp"]["base"]}{photoObj["temp"][x]}' + x_ = f'{x}_' + if not photoObj['temp'][x_][0].startswith('https://'): + photoObj['temp'][x_][0] = f'{photoObj["temp"]["base"]}{photoObj["temp"][x_][0]}' + if any(k not in {'base', 'w', 'w_', 'x', 'x_', 'y', 'y_', 'z', 'z_'} for k in photoObj['temp'].keys()) or \ + not all(photoObj['temp'][x] in (photoObj['temp'][f'{x}_'][0], photoObj['temp'][f'{x}_'][0] + '.jpg') for x in singleLetterKeys) or \ + not all(photoObj['temp'][x].startswith('https://sun') and '.userapi.com/' in photoObj['temp'][x] for x in singleLetterKeys) or \ + not all(len(photoObj['temp'][(x_ := f'{x}_')]) == 3 and isinstance(photoObj['temp'][x_][1], int) and isinstance(photoObj['temp'][x_][2], int) for x in singleLetterKeys): + logger.warning(f'Photo thumb wrap on {url} has unexpected data structure, skipping') + continue + photoVariants = [] + for x in singleLetterKeys: + x_ = f'{x}_' + photoVariants.append(PhotoVariant(url = f'{photoObj["temp"][x_][0]}.jpg' if '.jpg' not in photoObj['temp'][x_][0] else photoObj['temp'][x_][0], width = photoObj['temp'][x_][1], height = photoObj['temp'][x_][2])) + photoUrl = f'https://vk.com{a["href"]}' if 'href' in a.attrs and a['href'].startswith('/photo') and a['href'][6:].strip('0123456789-_') == '' else None + photos.append(Photo(variants = photoVariants, url = photoUrl)) + quotedPost = self._post_div_to_item(quoteDiv, isCopy = True) if (quoteDiv := post.find('div', class_ = 'copy_quote')) else None return VKontaktePost( url = url, - date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if 'time' in dateSpan else None, + date = datetime.datetime.fromtimestamp(int(dateSpan['time']), datetime.timezone.utc) if dateSpan and 'time' in dateSpan else None, content = textDiv.text if textDiv else None, + outlinks = outlinks or None, + photos = photos or None, + video = video or None, + quotedPost = quotedPost, ) def _soup_to_items(self, soup):