diff --git a/archivers/base_archiver.py b/archivers/base_archiver.py index 2ba7c4d..0f490fe 100644 --- a/archivers/base_archiver.py +++ b/archivers/base_archiver.py @@ -95,7 +95,7 @@ class Archiver(ABC): # eg images in a tweet save to cloud storage - def generate_media_page(self, urls, url, object, requester=requests): + def generate_media_page(self, urls, url, object): """ For a list of media urls, fetch them, upload them and call self.generate_media_page_html with them @@ -111,7 +111,7 @@ class Archiver(ABC): filename = os.path.join(Storage.TMP_FOLDER, key) - d = requester.get(media_url, headers=headers) + d = requests.get(media_url, headers=headers) with open(filename, 'wb') as f: f.write(d.content) diff --git a/archivers/vk_archiver.py b/archivers/vk_archiver.py index b9d3f6c..e37c7a5 100644 --- a/archivers/vk_archiver.py +++ b/archivers/vk_archiver.py @@ -1,4 +1,4 @@ -import re, json +import re, json, requests import vk_api, dateparser from bs4 import BeautifulSoup @@ -16,6 +16,7 @@ class VkArchiver(Archiver): """ name = "vk" wall_pattern = re.compile(r"(wall.{0,1}\d+_\d+)") + photo_pattern = re.compile(r"(photo.{0,1}\d+_\d+)") onclick_pattern = re.compile(r"({.*})") def __init__(self, storage: Storage, driver, config: VkConfig): @@ -27,55 +28,49 @@ class VkArchiver(Archiver): def download(self, url, check_if_exists=False): # detect URLs that this archiver can handle has_wall = self.wall_pattern.search(url) + has_photo = self.photo_pattern.search(url) + _id, method = None, None if has_wall: - wall_id = has_wall[0] - wall_url = f'https://vk.com/{wall_id}' - logger.info(f"found valid wall id from {url=} : {wall_id=}") - key = self.get_html_key(wall_url) + _id = has_wall[0] + method = self.archive_wall + elif has_photo: + _id = has_photo[0] + method = self.archive_photo + else: return False - # if check if exists will not download again - if check_if_exists and self.storage.exists(key): - screenshot = self.get_screenshot(wall_url) - cdn_url = self.storage.get_cdn_url(key) - return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot) + logger.info(f"found valid {_id=} from {url=}") + proper_url = f'https://vk.com/{_id}' - return self.archive_wall(wall_url) - return False + # if check if exists will not download again + key = self.get_html_key(proper_url) + if check_if_exists and self.storage.exists(key): + screenshot = self.get_screenshot(proper_url) + cdn_url = self.storage.get_cdn_url(key) + return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot) - def archive_wall(self, wall_url): - res = self.vk_session.http.get(wall_url).text - soup = BeautifulSoup(res, "html.parser") - image_urls = [] - time = None - try: - rel_date = soup.find("a", class_="post_link").find("span", class_="rel_date") - t = rel_date.get_text() - if "time" in rel_date.attrs: - t = rel_date["time"] - elif "abs_time" in rel_date.attrs: - t = rel_date["abs_time"] - time = dateparser.parse(t, settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"}) - except Exception as e: - logger.warning(f"could not fetch time from post: {e}") + return method(proper_url, _id) - post = soup.find("div", class_="wall_text") - post_text = post.find(class_="wall_post_text").get_text() - for anchor in post.find_all("a", attrs={"aria-label": "photo"}): - if img_url := self.get_image_from_anchor(anchor): - image_urls.append(img_url) + def archive_photo(self, photo_url, photo_id): + headers = {"access_token": self.vk_session.token["access_token"], "photos": photo_id.replace("photo", ""), "extended": "1", "v": self.vk_session.api_version} + req = requests.get("https://api.vk.com/method/photos.getById", headers) + res = req.json()["response"][0] + img_url = res["orig_photo"]["url"] + time = dateparser.parse(str(res["date"]), settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"}) - page_cdn, page_hash, thumbnail = self.generate_media_page(image_urls, wall_url, post_text, requester=self.vk_session.http) - screenshot = self.get_screenshot(wall_url) + page_cdn, page_hash, thumbnail = self.generate_media_page([img_url], photo_url, res) + screenshot = self.get_screenshot(photo_url) return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=time) - def get_image_from_anchor(self, anchor): - try: - # get anchor.onlick text, retrieve the JSON value there - # retrieve "temp"."z" which contains the image with more quality - temp_json = json.loads(self.onclick_pattern.search(anchor["onclick"])[0])["temp"] - for quality in ["z", "y", "x"]: # decreasing quality - if quality in temp_json: - return temp_json[quality] - except Exception as e: - logger.warning(f"failed to get image from vk wall anchor: {e}") - return False + def archive_wall(self, wall_url, wall_id): + headers = {"access_token": self.vk_session.token["access_token"], "posts": wall_id.replace("wall", ""), "extended": "1", "copy_history_depth": "2", "v": self.vk_session.api_version} + req = requests.get("https://api.vk.com/method/wall.getById", headers) + res = req.json()["response"] + wall = res["items"][0] + img_urls = [p[p["type"]]["sizes"][-1]["url"] for p in wall["attachments"]] if "attachments" in wall else [] + title = wall["text"][0:200] # more on the page + time = dateparser.parse(str(wall["date"]), settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"}) + + page_cdn, page_hash, thumbnail = self.generate_media_page(img_urls, wall_url, res) + screenshot = self.get_screenshot(wall_url) + return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=time, title=title) +