This commit is contained in:
msramalho
2022-06-17 19:34:14 +02:00
parent 48197b3587
commit f7ea1640a9
3 changed files with 97 additions and 49 deletions

View File

@@ -59,24 +59,39 @@ class VkScraper:
"payload": original response code which you can parse for more data
}
`
"""
if not len(wall_ids): return []
if not len(wall_ids):
return []
wall_ids = [wall_id.replace("wall", "") for wall_id in wall_ids]
# docs: https://dev.vk.com/method/wall.getById
headers = {"access_token": self.session.token["access_token"], "posts": ",".join(wall_ids), "extended": "1", "copy_history_depth": str(copy_history_depth), "v": self.session.api_version}
headers = {
"access_token": self.session.token["access_token"],
"posts": ",".join(wall_ids),
"extended": "1",
"copy_history_depth": str(copy_history_depth),
"v": self.session.api_version,
}
req = requests.get("https://api.vk.com/method/wall.getById", headers)
api_res = req.json()
res = []
for item in api_res.get("response", {}).get("items", []):
attachments_json = item.get("attachments", []) + sum([x.get("attachments", []) for x in item.get("copy_history", [])], [])
attachments_json = item.get("attachments", []) + sum(
[x.get("attachments", []) for x in item.get("copy_history", [])], []
)
attachments = defaultdict(list)
for a in attachments_json:
try:
first_type = a["type"]
attachment = a[first_type]
if first_type == "video":
attachments["video"].extend(self.scrape_videos(f'video{attachment["owner_id"]}_{attachment["id"]}')[0].get("attachments", {}).get("video", [""]))
attachments["video"].extend(
self.scrape_videos(f'video{attachment["owner_id"]}_{attachment["id"]}')[
0
]
.get("attachments", {})
.get("video", [""])
)
continue
if first_type == "link":
attachments["link"].append(attachment["url"])
@@ -85,9 +100,16 @@ class VkScraper:
first_type = "photo"
elif "video" in attachment:
attachment = attachment["video"]
attachments["video"].extend(self.scrape_videos(f'video{attachment["owner_id"]}_{attachment["id"]}')[0].get("attachments", {}).get("video", [""]))
attachments["video"].extend(
self.scrape_videos(
f'video{attachment["owner_id"]}_{attachment["id"]}'
)[0]
.get("attachments", {})
.get("video", [""])
)
continue
else:
continue
else: continue
if "thumb" in attachment:
attachment = attachment["thumb"]
@@ -99,13 +121,15 @@ class VkScraper:
except Exception as e:
print(f"Unexpected error in attachment={a}: {e}")
res.append({
"id": f'wall{item["owner_id"]}_{item["id"]}',
"text": item.get("text", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": dict(attachments),
"payload": item
})
res.append(
{
"id": f'wall{item["owner_id"]}_{item["id"]}',
"text": item.get("text", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": dict(attachments),
"payload": item,
}
)
return res
def scrape_videos(self, url: str) -> List:
@@ -116,24 +140,32 @@ class VkScraper:
return self.scrape_video_ids(video_ids)
def scrape_video_ids(self, video_ids: List[str]) -> List:
if not len(video_ids): return []
if not len(video_ids):
return []
video_ids = [video_id.replace("video", "") for video_id in video_ids]
headers = {"access_token": self.session.token["access_token"], "videos": ",".join(video_ids), "extended": "1", "v": self.session.api_version}
headers = {
"access_token": self.session.token["access_token"],
"videos": ",".join(video_ids),
"extended": "1",
"v": self.session.api_version,
}
req = requests.get("https://api.vk.com/method/video.get", headers)
api_res = req.json()
res = []
for item in api_res.get("response", {}).get("items", []):
res.append({
"id": f'video{item["owner_id"]}_{item["id"]}',
"text": item.get("title", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": {
"video": [item.get("player", "")],
},
"payload": item
})
res.append(
{
"id": f'video{item["owner_id"]}_{item["id"]}',
"text": item.get("title", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": {
"video": [item.get("player", "")],
},
"payload": item,
}
)
return res
def scrape_photos(self, url: str) -> List:
@@ -141,22 +173,28 @@ class VkScraper:
return self.scrape_photo_ids(photo_ids)
def scrape_photo_ids(self, photo_ids: List[str]) -> List:
if not len(photo_ids): return []
if not len(photo_ids):
return []
photo_ids = [photo_id.replace("photo", "") for photo_id in photo_ids]
headers = {"access_token": self.session.token["access_token"], "photos": ",".join(photo_ids), "extended": "1", "v": self.session.api_version}
headers = {
"access_token": self.session.token["access_token"],
"photos": ",".join(photo_ids),
"extended": "1",
"v": self.session.api_version,
}
req = requests.get("https://api.vk.com/method/photos.getById", headers)
api_res = req.json()
res = []
for item in api_res.get("response", []):
res.append({
"id": f'photo{item["owner_id"]}_{item["id"]}',
"text": item.get("text", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": {
"photo": [item["orig_photo"]["url"]]
},
"payload": item
})
res.append(
{
"id": f'photo{item["owner_id"]}_{item["id"]}',
"text": item.get("text", ""),
"datetime": datetime.fromtimestamp(item.get("date", 0)),
"attachments": {"photo": [item["orig_photo"]["url"]]},
"payload": item,
}
)
return res