Merge pull request #2 from bellingcat/refactor

Refactored package structure, added capability to download all channel videos rather than first 1000
This commit is contained in:
Tristan Lee
2022-04-12 23:10:40 -05:00
committed by GitHub
7 changed files with 437 additions and 230 deletions

View File

@@ -6,61 +6,39 @@ from pathlib import Path
import pickle
import os
import networkx as nx
import polyphemus
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
CHANNEL_NAME = 'PatriotFront'
ITERATIONS = 3
ITERATIONS = 2
OUTPUT_DIR = '../../data'
OUTPUT_DIR = Path('../../data', f'{CHANNEL_NAME}_recommendation_iterations={ITERATIONS}')
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
if __name__ == '__main__':
odysee_channel = polyphemus.base.OdyseeChannel(channel_name = CHANNEL_NAME)
engine = polyphemus.base.RecommendationEngine(channel_list= [CHANNEL_NAME])
edge_list = list()
already_done = list()
weighted_edge_list, claim_id_to_video = engine.generate(iterations = 1)
new_videos = odysee_channel.get_all_videos()
master_video_dict = dict(zip([v.info['claim_id'] for v in new_videos], new_videos))
for iteration in range(ITERATIONS):
print(f'\n\nITERATION: {iteration}, N_VIDEOS: {len(new_videos)}\n\n')
for i, video in enumerate(new_videos):
claim_id = video.info['claim_id']
title = video.info['title']
print(f'\nVIDEO: {i}; CLAIM_ID: {claim_id}\n')
recommended_video_info = polyphemus.api.get_recommended(title, claim_id)
for rec_video_info in recommended_video_info:
rec_claim_id = rec_video_info['claim_id']
print(f'REC_CLAIM_ID: {rec_claim_id}')
edge_list.append((claim_id, rec_claim_id))
if rec_video_info['claim_id'] not in master_video_dict:
master_video_dict[rec_claim_id] = polyphemus.base.OdyseeVideo(rec_video_info)
already_done.append(claim_id)
new_videos = [video for video in master_video_dict.values() if video.info['claim_id'] not in already_done]
G = nx.DiGraph()
G.add_weighted_edges_from(weighted_edge_list)
#-------------------------------------------------------------------------#
os.makedirs(OUTPUT_DIR, exist_ok = True)
with open(Path(OUTPUT_DIR, 'master_video_dict.pkl'), 'wb') as f:
pickle.dump(master_video_dict, f)
nx.write_gexf(G = G, path = Path(OUTPUT_DIR, 'network.gexf'))
with open(Path(OUTPUT_DIR, 'edge_list.pkl'), 'wb') as f:
pickle.dump(edge_list)
with open(Path(OUTPUT_DIR, f'weighted_edge_list.pkl'), 'wb') as f:
pickle.dump(weighted_edge_list, f)
with open(Path(OUTPUT_DIR, f'claim_id_to_video.pkl'), 'wb') as f:
pickle.dump(claim_id_to_video, f)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -11,7 +11,7 @@ import os
import pandas as pd
from polyphemus.base import OdyseeChannel
from polyphemus.base import OdyseeChannelScraper
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
@@ -22,13 +22,13 @@ OUTPUT_DIR = Path('.').resolve().parents[1]/'data'
if __name__ == '__main__':
odysee_channel = OdyseeChannel(channel_name = CHANNEL_NAME)
odysee_channel = OdyseeChannelScraper(channel_name = CHANNEL_NAME)
video_list, comment_list = odysee_channel.get_all_videos_and_comments()
channel_df = pd.DataFrame([odysee_channel.info])
video_df = pd.DataFrame([v.info for v in video_list])
comment_df = pd.DataFrame([c.info for c in comment_list])
channel_df = pd.DataFrame([odysee_channel.get_entity().__dict__])
video_df = pd.DataFrame([v.__dict__ for v in video_list])
comment_df = pd.DataFrame([c.__dict__ for c in comment_list])
output_subdir = Path(OUTPUT_DIR, CHANNEL_NAME)
os.makedirs(output_subdir, exist_ok = True)

View File

@@ -7,6 +7,9 @@
import json
from urllib.parse import quote
from typing import Tuple, Optional, List, Callable
import time
import requests
@@ -21,35 +24,84 @@ COMMENT_API_URL = 'https://comments.odysee.com/api/v2'
RECOMMENDATION_API_URL = 'https://recsys.odysee.com/search'
NEW_USER_API_URL = 'https://api.odysee.com/user/new'
# Allow responses to `get_streaming_url` that contain no `streaming_url` field
ALLOWED_ERROR_CODES = [-32603]
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def make_request(request, kwargs):
def make_request(request: Callable, kwargs: dict) -> requests.Response:
"""Wrapper for retrying request multiple times.
"""Wrapper for retrying request multiple times and handling errors.
This function handles Python exceptions (e.g. HTTPConnectionPool),
unsuccessful HTTP error codes (e.g. 429, 403), and errors in the
JSON response. If after 5 retries (using exponential backoff) the request
is unsuccessful, an exception is raised.
Parameters
----------
request: function
The requests function to be called.
One of {requests.get and requests.post}
kwargs: dict
Keyword arguments for the ``request`` function. Must include ``url`` key.
e.g. ``{'url': 'https://api.odysee.com/user/new'}``
Uses a default timeout of 15 seconds.
Returns
-------
response: requests.Response
"""
if request not in [requests.get, requests.post]:
msg = f'`request` argument must be either `requests.get` or `requests.post`, not {type(request)}'
raise ValueError(msg)
if 'timeout' not in kwargs:
kwargs['timeout'] = 15
n_retries = 0
response = request(**kwargs)
while response.status_code != 200 and n_retries < 5:
n_retries += 1
response = request(**kwargs)
response = requests.Response()
response.status_code = 418
if response.status_code != 200:
msg = f'Maximum number of retries reached for request {request} with kwargs {kwargs}: status code {response.status_code}'
raise ValueError(msg)
retry_reasons = []
return response
# TODO this looks a bit gross, try to refactor
while n_retries < 5:
time.sleep(2 ** n_retries - 1)
try:
response = request(**kwargs)
if response.status_code == 200:
parsed_response = json.loads(response.text)
if isinstance(parsed_response, list):
return response
if parsed_response.get('error') is not None:
if parsed_response['error'].get('code', None) not in ALLOWED_ERROR_CODES:
retry_reasons.append(f'JSON response error: {parsed_response["error"]}')
n_retries += 1
else:
return response
else:
return response
else:
retry_reasons.append(f'HTTP status code: {response.status_code}')
n_retries += 1
except Exception as exception:
retry_reasons.append(f'Python exception: {exception}')
n_retries += 1
msg = f'Maximum number of retries reached for request {request} with kwargs {kwargs}. Retry reasons: {retry_reasons}'
raise ValueError(msg)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_auth_token():
def get_auth_token() -> str:
"""Get a fresh authorization token, to use for API calls that require it.
"""Get a fresh authorization token, to use for API calls that require it.
Note: calling this function many times in quick succession may result in a
503 error.
"""
response = make_request(
@@ -63,7 +115,7 @@ def get_auth_token():
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_channel_info(channel_name):
def get_channel_info(channel_name: str) -> dict:
"""Get the channel information and ID from the channel name.
"""
@@ -99,7 +151,7 @@ def get_channel_info(channel_name):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_subscribers(channel_id, auth_token = None):
def get_subscribers(channel_id: str, auth_token: str = None) -> int:
"""Get the number of subscribers for a channel.
"""
@@ -124,21 +176,35 @@ def get_subscribers(channel_id, auth_token = None):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_all_videos(channel_id):
def get_raw_video_info_list(channel_id: str) -> dict:
"""Get a list of all videos posted by a specified channel name.
Odysee's ``claim_search`` API (which is used on the browser and LBRY
desktop app) only allows up to 1000 videos to be fetched for a single value
of the ``release_time`` parameter. You can check this by going to an Odysee
channel with a lot of videos (e.g. @etresouverain) and holding the
"Page Down" button until you reach the bottom, there will only be 1000
videos.
This function loops over all pages for a single ``release_time`` and
fetches the raw video info for all videos until it reaches that 1000 video
limit, then uses the minimum of the ``creation_timestamp`` for all videos
as the new ``release_time``, and starts over looping over all pages for
that new ``release_time``.
Returns
-------
all_videos: list<dict>
raw_video_info_list: list<dict>
List of dictionaries, with each dict corresponding to a JSON response
containing data about a single video.
"""
all_videos = []
claim_id_to_raw_video_info = {}
page = 1
release_time = int(time.time()) + 86400
hit_video_limit = False
while True:
@@ -149,7 +215,8 @@ def get_all_videos(channel_id):
"page_size":30,
"page":page,
"order_by":["release_time"],
"channel_ids":[channel_id]}}
"channel_ids":[channel_id],
"release_time": f"<{release_time}"}}
response = make_request(
request = requests.post,
@@ -160,18 +227,34 @@ def get_all_videos(channel_id):
result = json.loads(response.text)
videos = result['result']['items']
new_videos = {video['claim_id'] : video for video in videos if video['claim_id'] not in claim_id_to_raw_video_info}
if not videos:
break
if len(new_videos) == 0:
# if there are no new videos that haven't already been scraped
if hit_video_limit:
# if Odysee's limit of 1000 videos for a given timestamp was
# reached (which updates the `release_time`) on the last
# request, this means we have scraped all videos on the channel,
# so we break the loop.
break
else:
# we have hit Odysee's limit of 1000 videos for a given
# timestamp, so we update `release_time` and reset `page`
hit_video_limit = True
release_time = min([raw_video_info['meta']['creation_timestamp'] for raw_video_info in claim_id_to_raw_video_info.values()], default = 0)
page = 1
else:
all_videos.extend(videos)
# there were unscraped videos from the last request, so we keep
# going in the loop and increment the `page` variable
claim_id_to_raw_video_info.update(new_videos)
page += 1
hit_video_limit = False
return all_videos
return list(claim_id_to_raw_video_info.values())
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_views(video_id, auth_token = None):
def get_views(video_id: str, auth_token: str = None) -> int:
"""Get the number of views for a given video.
"""
@@ -195,7 +278,7 @@ def get_views(video_id, auth_token = None):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_video_reactions(video_id, auth_token = None):
def get_video_reactions(video_id: str, auth_token: str = None) -> Tuple[Optional[int], Optional[int]]:
"""Get all reactions for a given video.
"""
@@ -223,7 +306,7 @@ def get_video_reactions(video_id, auth_token = None):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_all_comments(video_id):
def get_all_comments(video_id: str) -> List[dict]:
"""Get a list of all comments for a single video.
@@ -277,7 +360,7 @@ def get_all_comments(video_id):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def append_comment_reactions(comment_info_list):
def append_comment_reactions(comment_info_list: List[dict]) -> List[dict]:
"""Get reaction data for each comment and insert ``'reactions'`` key into
dict for each comment.
@@ -325,7 +408,11 @@ def append_comment_reactions(comment_info_list):
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_recommended(video_title, video_id):
def get_recommended(video_title: str, video_id: str) -> List[dict]:
"""Get list of raw video info dicts for a specified video title and video
claim_id.
"""
name = quote(video_title)
@@ -342,23 +429,31 @@ def get_recommended(video_title, video_id):
'params': params})
result = json.loads(response.text)
recommended_video_info = [ normalized_name_to_video_info(r['name']) for r in result]
recommended_video_info = normalized_names_to_video_info([r['name'] for r in result])
recommended_video_info = [vi for vi in recommended_video_info if ((vi.get('value_type') == 'stream') & any(key in vi.get('value', []) for key in ('video', 'audio')))]
return recommended_video_info
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def normalized_name_to_video_info(normalized_name):
def normalized_names_to_video_info(normalized_names: List[str]) -> dict:
video_url = f"lbry://{normalized_name}"
"""Convert a list of normalized names of videos to a list of raw video dicts for those videos. Example of a "normalized name" is:
``'si-une-tude-montre-que-le-masque-permet'``,
corresponding to the video:
``https://odysee.com/@filsdepangolin#e/si-une-tude-montre-que-le-masque-permet#e``.
"""
video_urls = [f"lbry://{normalized_name}" for normalized_name in normalized_names]
json_data = {
"jsonrpc":"2.0",
"method":"resolve",
"params":{
"urls":[video_url]}}
"urls":video_urls}}
response = make_request(
request = requests.post,
@@ -368,11 +463,14 @@ def normalized_name_to_video_info(normalized_name):
result = json.loads(response.text)
return result['result'][video_url]
return [result['result'][video_url] for video_url in video_urls]
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
def get_streaming_url(canonical_url):
def get_streaming_url(canonical_url: str) -> str:
"""Retrieve the `streaming_url` for a specified video.
"""
json_data = {
"jsonrpc":"2.0",
@@ -386,7 +484,7 @@ def get_streaming_url(canonical_url):
'url' : BACKEND_API_URL,
'json': json_data})
video_url = json.loads(response.text)['result'].get('streaming_url')
video_url = json.loads(response.text).get('result', {}).get('streaming_url')
return video_url

View File

@@ -7,48 +7,115 @@
import json
from urllib.parse import unquote
from dataclasses import dataclass
import typing
from datetime import datetime
from collections import Counter
from polyphemus import api
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class OdyseeChannel:
@dataclass
class Channel:
channel_id: str
created: datetime
subscribers: int
raw : str
title : typing.Optional[str] = None
description: typing.Optional[str] = None
cover_image: typing.Optional[str] = None
thumbnail_image: typing.Optional[str] = None
@dataclass
class Video:
canonical_url: str
type: str
claim_id: str
created: datetime
title: str
raw: str
views: typing.Optional[int] = None
streaming_url: typing.Optional[str] = None
text: typing.Optional[str] = None
thumbnail : typing.Optional[str] = None
channel_id: typing.Optional[str] = None
channel_name: typing.Optional[str] = None
duration: typing.Optional[int] = None
languages : typing.Optional[typing.List[str]] = None
tags: typing.Optional[typing.List[str]] = None
likes: typing.Optional[int] = None
dislikes: typing.Optional[int] = None
is_comment: bool = False
@dataclass
class Comment:
text: str
created: datetime
claim_id : str
video_claim_id : str
channel_id: str
channel_name : str
replies: int
likes: int
dislikes: int
raw : str
is_comment: bool = True
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class OdyseeChannelScraper:
#-------------------------------------------------------------------------#
def __init__(self, channel_name, auth_token = None):
def __init__(self, channel_name: str, auth_token: str = None):
self._channel_name = unquote(channel_name)
info = api.get_channel_info(channel_name = self._channel_name)
self.info = info
self._channel_id = self.info['channel_id']
if auth_token is None:
self.auth_token = api.get_auth_token()
else:
self.auth_token = auth_token
self.info['subscribers'] = api.get_subscribers(
channel_id = self.info['channel_id'],
auth_token = self.auth_token)
self._raw_channel_info = api.get_channel_info(channel_name = self._channel_name)
self._channel_id = self._raw_channel_info['channel_id']
#-------------------------------------------------------------------------#
def get_all_videos(self):
def get_entity(self) -> Channel:
"""Return list of OdyseeVideo objects for all videos posted by the channel
"""Return Channel object containing information about the specified channel.
"""
all_video_info = api.get_all_videos(channel_id=self.info['channel_id'])
self.all_videos = (OdyseeVideo(video, self.auth_token) for video in all_video_info)
subscribers = api.get_subscribers(
channel_id = self._channel_id,
auth_token = self.auth_token)
return Channel(
channel_id=self._raw_channel_info['channel_id'],
title=self._raw_channel_info['title'],
created=datetime.fromtimestamp(self._raw_channel_info['created']),
description=self._raw_channel_info['description'],
cover_image=self._raw_channel_info['cover_image'],
thumbnail_image=self._raw_channel_info['thumbnail_image'],
raw=self._raw_channel_info['raw'],
subscribers=subscribers)
return self.all_videos
#-------------------------------------------------------------------------#
def get_all_videos(self, additional_fields: bool = True) -> typing.Generator[Video, None, None]:
"""Return list of Video objects for all videos posted by the specified channel
"""
raw_video_info_list = api.get_raw_video_info_list(channel_id=self._channel_id)
videos = (process_raw_video_info(raw_video_info = raw_video_info, auth_token = self.auth_token, additional_fields = additional_fields) for raw_video_info in raw_video_info_list)
return videos
#-------------------------------------------------------------------------#
def get_all_videos_and_comments(self):
def get_all_videos_and_comments(self) -> typing.Tuple[typing.List['Video'], typing.List['Comment']]:
"""Return list of OdyseeVideo and OdyseeComment objects for all videos
posted by the channel and all comments posted to those videos
@@ -56,133 +123,194 @@ class OdyseeChannel:
all_videos = list(self.get_all_videos())
all_comments = []
raw_comment_info_list = []
for video in all_videos:
all_comments.extend(video.get_all_comments())
raw_comment_info_list.extend(api.get_all_comments(video_id=video.claim_id))
all_comments = [process_raw_comment_info(raw_comment_info) for raw_comment_info in raw_comment_info_list]
return all_videos, all_comments
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class OdyseeVideo:
def process_raw_video_info(raw_video_info: dict, auth_token: str = None, additional_fields: bool = True) -> Video:
#-------------------------------------------------------------------------#
if auth_token is None:
auth_token = api.get_auth_token()
else:
auth_token = auth_token
raw = json.dumps(raw_video_info)
claim_id = raw_video_info['claim_id']
# Handle edge cases
#.....................................................................#
if 'video' in raw_video_info['value']:
video_type = 'video'
duration = raw_video_info['value']['video'].get('duration')
elif 'audio' in raw_video_info['value']:
video_type = 'audio'
duration = raw_video_info['value']['audio'].get('duration')
elif 'claim_hash' in raw_video_info['value']:
video_type = 'repost'
duration = None
if 'reposted_claim' in raw_video_info:
raw_video_info['value'] = raw_video_info['reposted_claim']['value']
raw_video_info['canonical_url'] = raw_video_info['reposted_claim']['canonical_url']
claim_id = raw_video_info['reposted_claim']['claim_id']
else:
raw_video_info['value'] = {}
elif 'image' in raw_video_info['value']:
video_type = 'image'
duration = None
else:
video_type = 'other'
duration = None
if 'signing_channel' in raw_video_info:
channel_name = raw_video_info['signing_channel'].get('name')
if 'claim_id' in raw_video_info['signing_channel']:
channel_id = raw_video_info['signing_channel']['claim_id']
else:
channel_id = raw_video_info['signing_channel']['channel_id']
else:
channel_name = None
channel_id = None
if 'release_time' in raw_video_info['value']:
created = raw_video_info['value']['release_time']
else:
created = raw_video_info['meta']['creation_timestamp']
if 'thumbnail' in raw_video_info['value']:
thumbnail = raw_video_info['value']['thumbnail'].get('url', None)
else:
thumbnail = None
def __init__(self, full_video_info, auth_token = None):
# Retrieve additional fields
#.....................................................................#
if auth_token is None:
self.auth_token = api.get_auth_token()
if additional_fields:
if raw_video_info['name'] == 'live':
streaming_url = None
else:
self.auth_token = auth_token
streaming_url = api.get_streaming_url(raw_video_info['canonical_url'])
views = api.get_views(video_id=claim_id, auth_token = auth_token)
likes, dislikes = api.get_video_reactions(
video_id = claim_id,
auth_token = auth_token)
# Handle edge cases
#.....................................................................#
else:
streaming_url = None
views = None
likes = None
dislikes = None
if 'video' in full_video_info['value']:
video_type = 'video'
duration = full_video_info['value']['video'].get('duration')
elif 'audio' in full_video_info['value']:
video_type = 'audio'
duration = full_video_info['value']['audio'].get('duration')
elif 'claim_hash' in full_video_info['value']:
video_type = 'repost'
duration = None
full_video_info['value'] = full_video_info['reposted_claim']['value']
full_video_info['canonical_url'] = full_video_info['reposted_claim']['canonical_url']
elif 'image' in full_video_info['value']:
video_type = 'image'
duration = None
else:
video_type = 'other'
duration = None
# Return Video object
#.....................................................................#
if 'signing_channel' in full_video_info:
channel_name = full_video_info['signing_channel'].get('name')
if 'claim_id' in full_video_info['signing_channel']:
channel_id = full_video_info['signing_channel']['claim_id']
else:
channel_id = full_video_info['signing_channel']['channel_id']
else:
channel_name = None
channel_id = None
if 'release_time' in full_video_info['value']:
created = full_video_info['value']['release_time']
else:
created = full_video_info['meta']['creation_timestamp']
if 'thumbnail' in full_video_info['value']:
thumbnail = full_video_info['value']['thumbnail'].get('url', None)
else:
thumbnail = None
# Store relevant information in flat dict
#.....................................................................#
self.info = {
'canonical_url' : full_video_info['canonical_url'],
'type' : video_type,
'channel_id' : channel_id,
'channel_name' : channel_name,
'claim_id' : full_video_info['claim_id'],
'created' : int(created),
'text' : full_video_info['value'].get('description'),
'languages' : full_video_info['value'].get('languages'),
'tags' : full_video_info['value'].get('tags',[]),
'title' : full_video_info['value']['title'],
'duration' : duration,
'thumbnail' : thumbnail,
'is_comment' : False,
'raw' : json.dumps(full_video_info)}
self.claim_id = self.info['claim_id']
self.info['views'] = api.get_views(video_id=self.claim_id, auth_token = self.auth_token)
self.info['likes'], self.info['dislikes'] = api.get_video_reactions(
video_id = self.claim_id,
auth_token = self.auth_token)
self.info['streaming_url'] = api.get_streaming_url(self.info['canonical_url'])
#-------------------------------------------------------------------------#
def get_all_comments(self):
all_comment_info = api.get_all_comments(video_id=self.claim_id)
self.all_comments = (OdyseeComment(comment) for comment in all_comment_info)
return self.all_comments
#-------------------------------------------------------------------------#
def get_recommended(self):
recommended_video_info = api.get_recommended(
video_title=self.info['title'], video_id=self.claim_id)
recommended_videos = [OdyseeVideo(video_info, self.auth_token) for video_info in recommended_video_info]
return recommended_videos
return Video(
canonical_url = raw_video_info['canonical_url'],
type = video_type,
channel_id = channel_id,
channel_name = channel_name,
claim_id = raw_video_info['claim_id'],
created = datetime.fromtimestamp(int(created)),
text = raw_video_info['value'].get('description'),
languages = raw_video_info['value'].get('languages'),
tags = raw_video_info['value'].get('tags',[]),
title = raw_video_info['value'].get('title'),
duration = duration,
thumbnail = thumbnail,
is_comment = False,
raw = raw,
views = views,
likes = likes,
dislikes = dislikes,
streaming_url = streaming_url)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class OdyseeComment:
def process_raw_comment_info(raw_comment_info: dict) -> Comment:
def __init__(self, full_comment_info):
# Store relevant information in flat dict
self.info = {
'text' : full_comment_info['comment'],
'created' : full_comment_info['timestamp'],
'claim_id' : full_comment_info.get('comment_id'),
'video_claim_id' : full_comment_info['claim_id'],
'channel_id' : full_comment_info['channel_id'],
'channel_name' : full_comment_info['channel_name'],
'replies' : full_comment_info.get('replies', 0),
'likes' : full_comment_info['likes'],
'dislikes' : full_comment_info['dislikes'],
'is_comment' : True,
'raw' : json.dumps(full_comment_info)}
return Comment(
text = raw_comment_info['comment'],
created = raw_comment_info['timestamp'],
claim_id = raw_comment_info.get('comment_id'),
video_claim_id = raw_comment_info['claim_id'],
channel_id = raw_comment_info['channel_id'],
channel_name = raw_comment_info['channel_name'],
replies = raw_comment_info.get('replies', 0),
likes = raw_comment_info['likes'],
dislikes = raw_comment_info['dislikes'],
is_comment = True,
raw = json.dumps(raw_comment_info))
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class RecommendationEngine:
#-------------------------------------------------------------------------#
def __init__(self, channel_list):
self.channel_list = channel_list
self.auth_token = api.get_auth_token()
self.edge_list = []
self.new_videos = []
self.already_done_claim_ids = []
self.claim_id_to_video = {}
#-------------------------------------------------------------------------#
def generate(self, iterations = 1):
for channel_name in self.channel_list:
print(channel_name)
scraper = OdyseeChannelScraper(channel_name = channel_name, auth_token = self.auth_token)
self.new_videos.extend(list(scraper.get_all_videos(additional_fields = False)))
self.claim_id_to_video = dict(zip([v.claim_id for v in self.new_videos], self.new_videos))
for iteration in range(int(iterations)):
for i, video in enumerate(self.new_videos):
claim_id = video.claim_id
title = video.title
print(f'ITERATION: {iteration} | VIDEO: {i} / {len(self.new_videos)} | CLAIM_ID: {claim_id}')
recommended_video_info = api.get_recommended(video_title = title, video_id = claim_id)
for rec_video_info in recommended_video_info:
rec_claim_id = rec_video_info['claim_id']
self.edge_list.append((claim_id, rec_claim_id))
if rec_video_info['claim_id'] not in self.claim_id_to_video:
self.claim_id_to_video[rec_claim_id] = process_raw_video_info(
raw_video_info = rec_video_info,
auth_token = self.auth_token,
additional_fields = False)
self.already_done_claim_ids.append(claim_id)
self.new_videos = [video for video in self.claim_id_to_video.values() if video.claim_id not in self.already_done_claim_ids]
claim_id_to_channel = {claim_id : video.channel_name for claim_id, video in self.claim_id_to_video.items()}
_channel_edge_list = [(claim_id_to_channel[target], claim_id_to_channel[source]) for target, source in self.edge_list]
channel_edge_list = [(source, target) for source, target in _channel_edge_list if all(item is not None for item in (source, target))]
c = Counter(channel_edge_list)
self.weighted_edge_list = [(source, target, weight) for (source, target), weight in c.most_common()]
return self.weighted_edge_list, self.claim_id_to_video
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -23,23 +23,23 @@ KWARGS_LIST = [
('get_auth_token', []),
('get_channel_info', ['channel_name']),
('get_subscribers', ['channel_id', 'auth_token']),
('get_all_videos', ['channel_id']),
('get_raw_video_info_list', ['channel_id']),
('get_views', ['video_id', 'auth_token']),
('get_video_reactions', ['video_id', 'auth_token']),
('get_all_comments', ['video_id']),
('append_comment_reactions', ['comment_info_list']),
('normalized_name_to_video_info', ['normalized_name']),
('get_streaming_url', ['canonical_url']),
('get_recommended', ['video_title', 'video_id']),]
('get_recommended', ['video_title', 'video_id']),
('normalized_names_to_video_info', ['normalized_names']),
('get_streaming_url', ['canonical_url']),]
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
@pytest.mark.parametrize( 'function_str,kwargs', KWARGS_LIST )
def test_minimal_init( resources, function_str, kwargs ):
@pytest.mark.parametrize('function_str,kwargs', KWARGS_LIST)
def test_minimal_init(resources, function_str, kwargs):
function = eval( f'api.{function_str}')
function_kwargs = { kwarg : resources[ kwarg ] for kwarg in kwargs }
function = eval(f'api.{function_str}')
function_kwargs = {kwarg: resources[kwarg] for kwarg in kwargs}
function( **function_kwargs )
function(**function_kwargs)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -19,38 +19,40 @@ from polyphemus import base
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class TestOdyseeChannel:
class TestOdyseeChannelScraper:
@pytest.fixture(autouse=True)
def test_simple_init(self, resources):
self.channel = base.OdyseeChannel(channel_name = resources['channel_name'])
self.scraper = base.OdyseeChannelScraper(channel_name = resources['channel_name'])
def test_get_entity(self):
self.scraper.get_entity()
def test_get_all_videos(self):
self.channel.get_all_videos()
self.scraper.get_all_videos()
def test_get_all_videos_and_comments(self):
self.channel.get_all_videos_and_comments()
self.scraper.get_all_videos_and_comments()
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class TestOdyseeVideo:
def test_process_raw_video_info(resources):
video = base.process_raw_video_info(raw_video_info = resources['full_video_info'], auth_token = resources['auth_token'])
@pytest.fixture(autouse=True)
def test_simple_init(self, resources):
self.video = base.OdyseeVideo(full_video_info = resources['full_video_info'])
def test_get_all_comments(self):
self.video.get_all_comments()
def test_get_recommended(self):
self.video.get_recommended()
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class TestOdyseeComment:
def test_process_raw_comment_info(resources):
base.process_raw_comment_info(raw_comment_info = resources['full_comment_info'])
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#
class TestRecommendationEngine:
@pytest.fixture(autouse=True)
def test_simple_init(self, resources):
self.comment = base.OdyseeComment(full_comment_info = resources['full_comment_info'])
self.engine = base.RecommendationEngine(channel_list = [resources['channel_name']])
def test_generate(self):
self.engine.generate(iterations = 1)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++#

View File

@@ -89,9 +89,10 @@ def resources():
video_id = VIDEO_ID,
video_title = VIDEO_TITLE,
normalized_name = NORMALIZED_NAME,
normalized_names = [NORMALIZED_NAME],
canonical_url = CANONICAL_URL,
full_video_info = FULL_VIDEO_INFO,
full_comment_info = {**COMMENT_INFO_LIST[0], **{'likes' : 8, 'dislikes' : 0}},
full_comment_info = {**COMMENT_INFO_LIST[0], **{'likes': 8, 'dislikes': 0}},
comment_info_list = COMMENT_INFO_LIST,
auth_token = get_auth_token())