Update octosuite.py

This commit is contained in:
Richard Mwewa
2022-12-25 02:51:56 +02:00
committed by GitHub
parent 3bd6af4f28
commit ee4af7d266

View File

@@ -8,20 +8,16 @@ import getpass
import requests
import platform
import subprocess
from rich.text import Text
from rich.tree import Tree
from rich.table import Table
from datetime import datetime
from rich import print as xprint
from octosuite.banner import version_tag, banner
from octosuite.config import red, white, green, header_title, reset, create_parser, args
from octosuite.config import Tree, Text, Table, Prompt, xprint, create_parser, args, red, white, green, yellow, header_title, reset
from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol
# seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :)
from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, \
logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv
from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, \
deleted, reading, file_downloading, file_downloaded, info_not_found, \
user_not_found, org_not_found, repo_or_user_not_found, limit_output
user_not_found, org_not_found, repo_or_user_not_found, limit_output, prompt_log_csv
from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, \
log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, \
log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, \
@@ -29,7 +25,6 @@ from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_pr
log_commits_search
if os.name == "nt":
try:
from pyreadline3 import Readline
@@ -74,15 +69,14 @@ def configure_logging():
# Check if the remote tag_name from the latest release matches the one in the program
# if it does, it means the program is up to date.
# if it does, it means the program is up-to-date.
# If it doesn't match, notify the user about a new release
def check_updates():
response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json()
if response['tag_name'] == version_tag:
pass
else:
xprint(
f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
def list_dir_and_files():
@@ -94,8 +88,7 @@ def delete_csv():
if args.csv_file:
csv_file = args.csv_file
else:
xprint(f"{green}csv {white}(filename):{reset} ", end="")
csv_file = input()
csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}")
os.remove(os.path.join("output", csv_file))
logging.info(deleted.format(csv_file))
xprint(f"{POSITIVE} {deleted.format(csv_file)}")
@@ -103,11 +96,10 @@ def delete_csv():
# Clear csv files
def clear_csv():
xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
prompt = input().lower()
prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue?", choices=['yes', 'no'])
if prompt == "yes":
shutil.rmtree('output', ignore_errors=True)
xprint(f"{INFO} .csv files cleared successfully!")
xprint(f"{INFO} csv files cleared successfully!")
else:
pass
@@ -129,8 +121,7 @@ def read_csv():
if args.csv_file:
csv_file = args.csv_file
else:
xprint(f"{green}csv {white}(filename.csv):{reset} ", end="")
csv_file = input()
csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}")
with open(os.path.join("output", csv_file), "r") as file:
logging.info(reading.format(csv_file))
text = Text(file.read())
@@ -153,8 +144,8 @@ def view_logs():
def read_log():
if args.log_file:
log_file = args.log_file
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
log_file = input()
else:
log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}")
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
logging.info(reading.format(log_file))
xprint("\n" + log.read())
@@ -165,8 +156,7 @@ def delete_log():
if args.log_file:
log_file = args.log_file
else:
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
log_file = input()
log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}")
os.remove(os.path.join(".logs", log_file))
logging.info(deleted.format(log_file))
xprint(f"{POSITIVE} {deleted.format(log_file)}")
@@ -174,8 +164,7 @@ def delete_log():
# Clear logs
def clear_logs():
xprint(f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} log files, continue?", choices=['yes', 'no'])
if prompt == "yes":
shutil.rmtree('.logs', ignore_errors=True)
xprint(f"{INFO} .log files cleared successfully!")
@@ -186,9 +175,8 @@ def clear_logs():
# Exit session
def exit_session():
xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "yes":
exit_prompt = Prompt.ask(f"{PROMPT} This will close the current session, continue?", choices=['yes', 'no'])
if exit_prompt == "yes":
logging.info(session_closed.format(datetime.now()))
xprint(f"{INFO} {session_closed.format(datetime.now())}")
exit()
@@ -226,8 +214,7 @@ GitHub REST API documentation: https://docs.github.com/rest
class Octosuite:
def __init__(self, args):
self.args = args
def __init__(self):
# API endpoint
self.endpoint = 'https://api.github.com'
@@ -531,10 +518,10 @@ class Octosuite:
# Fetching organization info
def org_profile(self):
if self.args.organization:
organization = self.args.organization
if args.organization:
organization = args.organization
else:
organization = input("@Organization: ")
organization = Prompt.ask(f"{white}@{green}Organization{reset}")
response = requests.get(f"{self.endpoint}/orgs/{organization}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
@@ -549,10 +536,10 @@ class Octosuite:
# Fetching user information
def user_profile(self):
if self.args.username:
username = self.args.username
if args.username:
username = args.username
else:
username = input("@Username: ")
username = Prompt.ask(f"{white}@{green}Username{reset}")
response = requests.get(f"{self.endpoint}/users/{username}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
@@ -561,19 +548,22 @@ class Octosuite:
for attr in self.profile_attrs:
user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}")
xprint(user_profile_tree)
log_user_profile(response)
# Logging output to a csv file
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_profile(response)
else:
xprint(response.json())
# Fetching repository information
def repo_profile(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
response = requests.get(f"{self.endpoint}/repos/{self.username}/{repo_name}")
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
@@ -581,20 +571,22 @@ class Octosuite:
for attr in self.repo_attrs:
repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}")
xprint(repo_profile_tree)
log_repo_profile(response)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repo_profile(response)
else:
xprint(response.json())
# Get path contents
def path_contents(self):
if self.args.repository and self.args.username and self.args.path_name:
repo_name = self.args.repository
username = self.args.username
path_name = self.args.path_name
if args.repository and args.username and args.path_name:
repo_name = args.repository
username = args.username
path_name = args.path_name
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
path_name = input("~/path/name ")
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
path_name = Prompt.ask("~/path/name ")
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {info_not_found.format(repo_name, username, path_name)}")
@@ -611,14 +603,14 @@ class Octosuite:
# repo contributors
def repo_contributors(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
limit = self.args.limit
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
limit = args.limit
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
limit = input(limit_output.format("contributors"))
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("contributors"))
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
@@ -628,20 +620,22 @@ class Octosuite:
for attr in self.user_attrs:
contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}")
xprint(contributor_tree)
log_repo_contributors(contributor, repo_name)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repo_contributors(contributor, repo_name)
else:
xprint(response.json())
# repo stargazers
def repo_stargazers(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
limit = self.args.limit
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
limit = args.limit
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
limit = input(limit_output.format("stargazers"))
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("stargazers"))
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
@@ -653,20 +647,22 @@ class Octosuite:
for attr in self.user_attrs:
stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}")
xprint(stargazer_tree)
log_repo_stargazers(stargazer, repo_name)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repo_stargazers(stargazer, repo_name)
else:
xprint(response.json())
# repo forks
def repo_forks(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
limit = self.args.limit
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
limit = args.limit
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
limit = input(limit_output.format("forks"))
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("forks"))
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
@@ -678,20 +674,22 @@ class Octosuite:
for attr in self.repo_attrs:
fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}")
xprint(fork_tree)
log_repo_forks(fork, count)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repo_forks(fork, count)
else:
xprint(response.json())
# Repo issues
def repo_issues(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
limit = self.args.limit
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
limit = args.limit
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
limit = input(limit_output.format("issues"))
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("issues"))
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
@@ -710,14 +708,14 @@ class Octosuite:
# Repo releases
def repo_releases(self):
if self.args.repository and self.args.username and self.args.limit:
repo_name = self.args.repository
username = self.args.username
limit = self.args.limit
if args.repository and args.username and args.limit:
repo_name = args.repository
username = args.username
limit = args.limit
else:
repo_name = input("%Repository: ")
username = input("@Username: ")
limit = input(limit_output.format("repository releases"))
repo_name = Prompt.ask(f"{white}%{green}Repository{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("repository releases"))
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
@@ -730,18 +728,20 @@ class Octosuite:
releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}")
xprint(releases_tree)
xprint(release['body'])
log_repo_releases(release, repo_name)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repo_releases(release, repo_name)
else:
xprint(response.json())
# Fetching organization repositories
def org_repos(self):
if self.args.organization and self.args.limit:
organization = self.args.organization
limit = self.args.limit
if args.organization and args.limit:
organization = args.organization
limit = args.limit
else:
organization = input("@Organization: ")
limit = input(limit_output.format("organization repositories"))
organization = Prompt.ask(f"{white}@{green}Organization{reset}")
limit = Prompt.ask(limit_output.format("organization repositories"))
response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
@@ -751,18 +751,20 @@ class Octosuite:
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
log_org_repos(repository, organization)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_org_repos(repository, organization)
else:
xprint(response.json())
# organization events
def org_events(self):
if self.args.organization and self.args.limit:
organization = self.args.organization
limit = self.args.limit
if args.organization and args.limit:
organization = args.organization
limit = args.limit
else:
organization = input("@Organization: ")
limit = input(limit_output.format("organization events"))
organization = Prompt.ask(f"{white}@{green}Organization{reset}")
limit = Prompt.ask(limit_output.format("organization events"))
response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
@@ -779,12 +781,12 @@ class Octosuite:
# organization member
def org_member(self):
if self.args.organization and self.args.username:
organization = self.args.organization
username = self.args.username
if args.organization and args.username:
organization = args.organization
username = args.username
else:
organization = input("@Organization: ")
username = input("@Username: ")
organization = Prompt.ask(f"{white}@{green}Organization{reset}")
username = Prompt.ask(f"{white}@{green}Username{reset}")
response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}")
if response.status_code == 204:
xprint(f"{POSITIVE} User ({username}) is a public member of the organization -> ({organization})")
@@ -793,12 +795,12 @@ class Octosuite:
# Fetching user repositories
def user_repos(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("repositories"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("repositories"))
response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
@@ -808,18 +810,20 @@ class Octosuite:
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
log_user_repos(repository, username)
else:
xprint(response.json())
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_repos(repository, username)
else:
xprint(response.json())
# Fetching user's gists
def user_gists(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format('gists'))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format('gists'))
response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}")
if not response.json():
xprint(f"{NEGATIVE} User does not have gists.")
@@ -831,18 +835,20 @@ class Octosuite:
for attr in self.gists_attrs:
gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}")
xprint(gists_tree)
log_user_gists(gist)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_gists(gist)
else:
xprint(response.json())
# Fetching a list of organizations that a user owns or belongs to
def user_orgs(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("user organizations"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("user organizations"))
response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}")
if not response.json():
xprint(f"{NEGATIVE} User ({username}) does not (belong to/own) any organizations.")
@@ -854,18 +860,20 @@ class Octosuite:
for attr in self.user_orgs_attrs:
org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}")
xprint(org_tree)
log_user_orgs(organization, username)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_orgs(organization, username)
else:
xprint(response.json())
# Fetching a users events
def user_events(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("events"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("events"))
response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}")
if response.status_code == 404:
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
@@ -884,12 +892,12 @@ class Octosuite:
# Fetching a target user's subscriptions
def user_subscriptions(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("user subscriptions"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("user subscriptions"))
response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}")
if not response.json():
xprint(f"{NEGATIVE} User does not have any subscriptions.")
@@ -901,18 +909,20 @@ class Octosuite:
for attr in self.repo_attrs:
subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(subscriptions_tree)
log_user_subscriptions(repository, username)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_subscriptions(repository, username)
else:
xprint(response.json())
# Fetching a list of users the target follows
def user_following(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("user' following"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("user' following"))
response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}")
if not response.json():
xprint(f"{NEGATIVE} User ({username})does not follow anyone.")
@@ -924,18 +934,20 @@ class Octosuite:
for attr in self.user_attrs:
following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(following_tree)
log_user_following(user, username)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_following(user, username)
else:
xprint(response.json())
# Fetching user's followers
def user_followers(self):
if self.args.username and self.args.limit:
username = self.args.username
limit = self.args.limit
if args.username and args.limit:
username = args.username
limit = args.limit
else:
username = input("@Username: ")
limit = input(limit_output.format("user followers"))
username = Prompt.ask(f"{white}@{green}Username{reset}")
limit = Prompt.ask(limit_output.format("user followers"))
response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}")
if not response.json():
xprint(f"{NEGATIVE} User ({username})does not have followers.")
@@ -947,18 +959,20 @@ class Octosuite:
for attr in self.user_attrs:
followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}")
xprint(followers_tree)
log_user_followers(follower, username)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_user_followers(follower, username)
else:
xprint(response.json())
# Checking whether user[A] follows user[B]
def user_follows(self):
if self.args.username and self.args.username_b:
user_a = self.args.username
user_b = self.args.username_b
if args.username and args.username_b:
user_a = args.username
user_b = args.username_b
else:
user_a = input("@User_A: ")
user_b = input("@User_B")
user_a = Prompt.ask(f"{white}@{green}User_A{reset}")
user_b = Prompt.ask(f"{white}@{green}User_B{reset}")
response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}")
if response.status_code == 204:
xprint(f"{POSITIVE} @{user_a} FOLLOWS @{user_b}")
@@ -967,60 +981,66 @@ class Octosuite:
# User search
def users_search(self):
if self.args.query and self.args.limit and self.args.limit:
query = self.args.query
limit = self.args.limit
if args.query and args.limit and args.limit:
query = args.query
limit = args.limit
else:
query = input("@Username (search): ")
limit = input(limit_output.format("user search"))
query = Prompt.ask(f"{white}@{green}Username{reset} (search)")
limit = Prompt.ask(limit_output.format("user search"))
response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json()
for user in response['items']:
users_search_tree = Tree("\n" + user['login'])
for attr in self.user_attrs:
users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(users_search_tree)
log_users_search(user, query)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_users_search(user, query)
# Repository search
def repos_search(self):
if self.args.query and self.args.limit:
query = self.args.query
limit = self.args.limit
if args.query and args.limit:
query = args.query
limit = args.limit
else:
query = input("%Repository (search): ")
limit = input(limit_output.format("repositor[y][ies] search"))
query = Prompt.ask(f"{white}%{green}Repository{reset} (search)")
limit = Prompt.ask(limit_output.format("repositor[y][ies] search"))
response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json()
for repository in response['items']:
repos_search_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_search_tree)
log_repos_search(repository, query)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_repos_search(repository, query)
# Topics search
def topics_search(self):
if self.args.query and self.args.limit:
query = self.args.query
limit = self.args.limit
if args.query and args.limit:
query = args.query
limit = args.limit
else:
query = input(":Topic (search): ")
limit = input(limit_output.format("topic(s) search"))
query = Prompt.ask(f"{white}:{green}Topics{reset} (search)")
limit = Prompt.ask(limit_output.format("topic(s) search"))
response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json()
for topic in response['items']:
topics_search_tree = Tree("\n" + topic['name'])
for attr in self.topic_attrs:
topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}")
xprint(topics_search_tree)
log_topics_search(topic, query)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_topics_search(topic, query)
# Issue search
def issues_search(self):
if self.args.query and self.args.limit:
query = self.args.query
limit = self.args.limit
if args.query and args.limit:
query = args.query
limit = args.limit
else:
query = input("!Issue (search): ")
limit = input(limit_output.format("issue(s) search"))
query = Prompt.ask(f"{white}!{green}Issues{reset} (search)")
limit = Prompt.ask(limit_output.format("issue(s) search"))
response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json()
for issue in response['items']:
issues_search_tree = Tree("\n" + issue['title'])
@@ -1028,16 +1048,18 @@ class Octosuite:
issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issues_search_tree)
xprint(issue['body'])
log_issues_search(issue, query)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_issues_search(issue, query)
# Commits search
def commits_search(self):
if self.args.query and self.args.limit:
query = self.args.query
limit = self.args.limit
if args.query and args.limit:
query = args.query
limit = args.limit
else:
query = input(";Commit (search): ")
limit = input(limit_output.format("commit(s) search"))
query = Prompt.ask(f"{white};{green}Commits{reset} (search)")
limit = Prompt.ask(limit_output.format("commit(s) search"))
response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json()
for commit in response['items']:
commits_search_tree = Tree("\n" + commit['commit']['tree']['sha'])
@@ -1049,7 +1071,9 @@ class Octosuite:
commits_search_tree.add(f"URL: {commit['html_url']}")
xprint(commits_search_tree)
xprint(commit['commit']['message'])
log_commits_search(commit, query)
if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes":
log_commits_search(commit, query)
# Downloading release tarball
def download_tarball(self):
@@ -1063,7 +1087,7 @@ class Octosuite:
logging.info(file_downloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.tar"))
# Downloading release zipball
# Downloading release zip ball
def download_zipball(self):
logging.info(file_downloading.format(f"octosuite.v{version_tag}.zip"))
xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.zip"))