added capabilities for handling edge cases for video inputs

This commit is contained in:
Tristan Lee
2022-02-17 22:58:07 -06:00
parent a6d2527bc7
commit 53f45967b3
4 changed files with 146 additions and 44 deletions

View File

@@ -4,5 +4,8 @@ Scraper for alt-tech video sharing platform [Odysee](https://odysee.com/).
### TODO
- Implement CLI
- Profile run-time, look into implementing async requests
- Add error handling/backoff waiting to requests
- Implement basic test suite
- Formaize network graph generation into class/module
- Work on reverse-engineering auth_token instead of having it hard-coded

View File

@@ -0,0 +1,66 @@
# -*- coding: UTF-8 -*-
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
from pathlib import Path
import pickle
import os
import polyphemus
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
CHANNEL_NAME = 'PatriotFront'
ITERATIONS = 3
OUTPUT_DIR = '../../data'
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
if __name__ == '__main__':
odysee_channel = polyphemus.base.OdyseeChannel(channel_name = CHANNEL_NAME)
edge_list = list()
already_done = list()
new_videos = odysee_channel.get_all_videos()
master_video_dict = dict(zip([v.info['claim_id'] for v in new_videos], new_videos))
for iteration in range(ITERATIONS):
print(f'\n\nITERATION: {iteration}, N_VIDEOS: {len(new_videos)}\n\n')
for i, video in enumerate(new_videos):
claim_id = video.info['claim_id']
title = video.info['title']
print(f'\nVIDEO: {i}; CLAIM_ID: {claim_id}\n')
recommended_video_info = polyphemus.api.get_recommended(title, claim_id)
for rec_video_info in recommended_video_info:
rec_claim_id = rec_video_info['claim_id']
print(f'REC_CLAIM_ID: {rec_claim_id}')
edge_list.append((claim_id, rec_claim_id))
if rec_video_info['claim_id'] not in master_video_dict:
master_video_dict[rec_claim_id] = polyphemus.base.OdyseeVideo(rec_video_info)
already_done.append(claim_id)
new_videos = [video for video in master_video_dict.values() if video.info['claim_id'] not in already_done]
#-------------------------------------------------------------------------#
os.makedirs(OUTPUT_DIR, exist_ok = True)
with open(Path(OUTPUT_DIR, 'master_video_dict.pkl'), 'wb') as f:
pickle.dump(master_video_dict, f)
with open(Path(OUTPUT_DIR, 'edge_list.pkl'), 'wb') as f:
pickle.dump(edge_list)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -15,6 +15,16 @@ import requests
#TODO Figure out how to reverse-engineer this
AUTH_TOKEN = 'BseGAiye641UqUsv4g31ZcUCRiLasv3U'
# API endpoints for Odysee data
#-----------------------------------------------------------------------------#
BACKEND_API_URL = 'https://api.na-backend.odysee.com/api/v1/proxy'
SUBSCRIBER_API_URL = 'https://api.odysee.com/subscription/sub_count'
VIEW_API_URL = 'https://api.odysee.com/file/view_count'
REACTION_API_URL = 'https://api.odysee.com/reaction/list'
COMMENT_API_URL = 'https://comments.odysee.com/api/v2'
RECOMMENDATION_API_URL = 'https://recsys.odysee.com/search'
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_channel_info(channel_name):
@@ -24,8 +34,6 @@ def get_channel_info(channel_name):
channel_url = f'lbry://@{channel_name}'
api_url = 'https://api.na-backend.odysee.com/api/v1/proxy'
post_json = {
"jsonrpc":"2.0",
"method":"resolve",
@@ -33,7 +41,7 @@ def get_channel_info(channel_name):
"urls":[channel_url]}}
response = requests.post(
url = api_url,
url = BACKEND_API_URL,
json = post_json)
result = json.loads(response.text)
@@ -58,13 +66,11 @@ def get_subscribers(claim_id):
"""Get the number of subscribers for a channel.
"""
api_url = 'https://api.odysee.com/subscription/sub_count'
post_data = {
json_data = {
'auth_token': AUTH_TOKEN,
'claim_id': claim_id }
response = requests.post(url = api_url, data = post_data)
response = requests.post(url = SUBSCRIBER_API_URL, data = json_data)
result = json.loads(response.text)
subscribers = result['data'][0]
@@ -84,15 +90,13 @@ def get_all_videos(channel_id):
"""
api_url = 'https://api.na-backend.odysee.com/api/v1/proxy'
all_videos = []
page = 1
while True:
post_data = {
json_data = {
"jsonrpc":"2.0",
"method":"claim_search",
"params":{
@@ -102,8 +106,8 @@ def get_all_videos(channel_id):
"channel_ids":[channel_id]}}
response = requests.post(
url = api_url,
json = post_data)
url = BACKEND_API_URL,
json = json_data)
result = json.loads(response.text)
@@ -124,13 +128,11 @@ def get_views(claim_id):
"""Get the number of views for a given video.
"""
api_url = 'https://api.odysee.com/file/view_count'
params = {
'auth_token': AUTH_TOKEN,
'claim_id': claim_id }
response = requests.get(api_url, params = params)
response = requests.get(url = VIEW_API_URL, params = params)
views = json.loads(response.text)['data'][0]
return views
@@ -142,17 +144,18 @@ def get_video_reactions(claim_id):
"""Get all reactions for a given video.
"""
api_url = 'https://api.odysee.com/reaction/list'
post_data = {
'auth_token': AUTH_TOKEN,
'claim_ids': claim_id }
response = requests.post(url = api_url, data = post_data)
response = requests.post(url = REACTION_API_URL, data = post_data)
result = json.loads(response.text)
reactions = result['data']['others_reactions'][claim_id ]
return reactions['like'], reactions['dislike']
if result['success']:
reactions = result['data']['others_reactions'][claim_id ]
return reactions['like'], reactions['dislike']
else:
return None, None
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
@@ -173,15 +176,13 @@ def get_all_comments(claim_id):
containing data about a single comment for the specified video.
"""
api_url = 'https://comments.odysee.com/api/v2'
all_comments = []
page = 1
while True:
post_data = {
json_data = {
"jsonrpc":"2.0",
"id":1,
"method":"comment.List",
@@ -193,8 +194,8 @@ def get_all_comments(claim_id):
"sort_by":3}}
response = requests.post(
url = api_url,
json = post_data)
url = COMMENT_API_URL,
json = json_data)
result = json.loads(response.text)
@@ -233,15 +234,14 @@ def append_comment_reactions(comments):
comment_ids = ','.join([c['comment_id'] for c in comments])
post_data = {
json_data = {
"jsonrpc":"2.0",
"id":1,
"method":"reaction.List",
"params":{
"comment_ids":comment_ids}}
api_url = 'https://comments.odysee.com/api/v2'
response = requests.post(url = api_url, json = post_data)
response = requests.post(url = COMMENT_API_URL, json = json_data)
result = json.loads(response.text)
reactions = result['result']['others_reactions']
@@ -256,8 +256,6 @@ def append_comment_reactions(comments):
def get_recommended(title, claim_id):
api_url = 'https://recsys.odysee.com/search'
name = quote(title)
params = {
@@ -266,11 +264,11 @@ def get_recommended(title, claim_id):
'from':'0',
'related_to':claim_id}
response = requests.get(api_url, params = params)
response = requests.get(url = RECOMMENDATION_API_URL, params = params)
result = json.loads(response.text)
recommended_video_info = [ name_to_video_info(r['name']) for r in result]
recommended_video_info = [vi for vi in recommended_video_info if vi['value_type'] == 'stream']
recommended_video_info = [vi for vi in recommended_video_info if ((vi.get('value_type') == 'stream') & any(key in vi.get('value', []) for key in ('video', 'audio')))]
return recommended_video_info
@@ -278,19 +276,17 @@ def get_recommended(title, claim_id):
def name_to_video_info(name):
url = f"lbry://{name}"
video_url = f"lbry://{name}"
post_data = {
json_data = {
"jsonrpc":"2.0",
"method":"resolve",
"params":{
"urls":[url]}}
"urls":[video_url]}}
api_url = 'https://api.na-backend.odysee.com/api/v1/proxy'
response = requests.post(url = api_url, json = post_data)
response = requests.post(url = BACKEND_API_URL, json = json_data)
result = json.loads(response.text)
return result['result'][url]
return result['result'][video_url]
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -68,21 +68,58 @@ class OdyseeVideo:
#-------------------------------------------------------------------------#
def __init__(self, full_video_info):
# Handle edge cases
#.....................................................................#
if 'video' in full_video_info['value']:
video_type = 'video'
duration = full_video_info['value']['video'].get('duration')
elif 'audio' in full_video_info['value']:
video_type = 'audio'
duration = full_video_info['value']['audio'].get('duration')
else:
raise KeyError(f'nether `video` or `audio` keys are in `full_video_info["value"]`, only {full_video_info["value"].keys()}')
if 'signing_channel' in full_video_info:
channel_name = full_video_info['signing_channel'].get('name')
if 'claim_id' in full_video_info['signing_channel']:
channel_id = full_video_info['signing_channel']['claim_id']
else:
channel_id = full_video_info['signing_channel']['channel_id']
else:
channel_name = None
channel_id = None
if 'release_time' in full_video_info['value']:
created = full_video_info['value']['release_time']
else:
created = full_video_info['meta']['creation_timestamp']
if 'thumbnail' in full_video_info['value']:
thumbnail = full_video_info['value']['thumbnail'].get('url', None)
else:
thumbnail = None
# Store relevant information in flat dict
#.....................................................................#
self.info = {
'canonical_url' : full_video_info['canonical_url'],
'channel' : full_video_info['signing_channel']['name'],
'type' : video_type,
'channel_id' : channel_id,
'channel' : channel_name,
'claim_id' : full_video_info['claim_id'],
'created' : full_video_info['value']['release_time'],
'created' : created,
'description' : full_video_info['value'].get('description'),
'languages' : full_video_info['value'].get('languages'),
'tags' : full_video_info['value'].get('tags',[]),
'title' : full_video_info['value']['title'],
'duration' : full_video_info['value']['video']['duration'],
'thumbnail' : full_video_info['value']['thumbnail']['url'],
'duration' : duration,
'thumbnail' : thumbnail,
'raw' : json.dumps(full_video_info)}
self._claim_id = self.info ['claim_id']
self._claim_id = self.info['claim_id']
self.info['views'] = api.get_views(claim_id=self._claim_id)