diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index 18bdfef..e6fd82c 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -8,20 +8,16 @@ import getpass import requests import platform import subprocess -from rich.text import Text -from rich.tree import Tree -from rich.table import Table from datetime import datetime -from rich import print as xprint from octosuite.banner import version_tag, banner -from octosuite.config import red, white, green, header_title, reset, create_parser, args +from octosuite.config import Tree, Text, Table, Prompt, xprint, create_parser, args, red, white, green, yellow, header_title, reset from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol # seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :) from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, \ logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, \ deleted, reading, file_downloading, file_downloaded, info_not_found, \ - user_not_found, org_not_found, repo_or_user_not_found, limit_output + user_not_found, org_not_found, repo_or_user_not_found, limit_output, prompt_log_csv from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, \ log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, \ log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, \ @@ -29,7 +25,6 @@ from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_pr log_commits_search - if os.name == "nt": try: from pyreadline3 import Readline @@ -74,15 +69,14 @@ def configure_logging(): # Check if the remote tag_name from the latest release matches the one in the program -# if it does, it means the program is up to date. +# if it does, it means the program is up-to-date. # If it doesn't match, notify the user about a new release def check_updates(): response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() if response['tag_name'] == version_tag: pass else: - xprint( - f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") def list_dir_and_files(): @@ -94,8 +88,7 @@ def delete_csv(): if args.csv_file: csv_file = args.csv_file else: - xprint(f"{green}csv {white}(filename):{reset} ", end="") - csv_file = input() + csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}") os.remove(os.path.join("output", csv_file)) logging.info(deleted.format(csv_file)) xprint(f"{POSITIVE} {deleted.format(csv_file)}") @@ -103,11 +96,10 @@ def delete_csv(): # Clear csv files def clear_csv(): - xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") - prompt = input().lower() + prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue?", choices=['yes', 'no']) if prompt == "yes": shutil.rmtree('output', ignore_errors=True) - xprint(f"{INFO} .csv files cleared successfully!") + xprint(f"{INFO} csv files cleared successfully!") else: pass @@ -129,8 +121,7 @@ def read_csv(): if args.csv_file: csv_file = args.csv_file else: - xprint(f"{green}csv {white}(filename.csv):{reset} ", end="") - csv_file = input() + csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}") with open(os.path.join("output", csv_file), "r") as file: logging.info(reading.format(csv_file)) text = Text(file.read()) @@ -153,8 +144,8 @@ def view_logs(): def read_log(): if args.log_file: log_file = args.log_file - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() + else: + log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}") with open(os.path.join(".logs", log_file + ".log"), "r") as log: logging.info(reading.format(log_file)) xprint("\n" + log.read()) @@ -165,8 +156,7 @@ def delete_log(): if args.log_file: log_file = args.log_file else: - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() + log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}") os.remove(os.path.join(".logs", log_file)) logging.info(deleted.format(log_file)) xprint(f"{POSITIVE} {deleted.format(log_file)}") @@ -174,8 +164,7 @@ def delete_log(): # Clear logs def clear_logs(): - xprint(f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") - prompt = input().lower() + prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} log files, continue?", choices=['yes', 'no']) if prompt == "yes": shutil.rmtree('.logs', ignore_errors=True) xprint(f"{INFO} .log files cleared successfully!") @@ -186,9 +175,8 @@ def clear_logs(): # Exit session def exit_session(): - xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="") - prompt = input().lower() - if prompt == "yes": + exit_prompt = Prompt.ask(f"{PROMPT} This will close the current session, continue?", choices=['yes', 'no']) + if exit_prompt == "yes": logging.info(session_closed.format(datetime.now())) xprint(f"{INFO} {session_closed.format(datetime.now())}") exit() @@ -226,8 +214,7 @@ GitHub REST API documentation: https://docs.github.com/rest class Octosuite: - def __init__(self, args): - self.args = args + def __init__(self): # API endpoint self.endpoint = 'https://api.github.com' @@ -531,10 +518,10 @@ class Octosuite: # Fetching organization info def org_profile(self): - if self.args.organization: - organization = self.args.organization + if args.organization: + organization = args.organization else: - organization = input("@Organization: ") + organization = Prompt.ask(f"{white}@{green}Organization{reset}") response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -549,10 +536,10 @@ class Octosuite: # Fetching user information def user_profile(self): - if self.args.username: - username = self.args.username + if args.username: + username = args.username else: - username = input("@Username: ") + username = Prompt.ask(f"{white}@{green}Username{reset}") response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -561,19 +548,22 @@ class Octosuite: for attr in self.profile_attrs: user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}") xprint(user_profile_tree) - log_user_profile(response) + + # Logging output to a csv file + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_profile(response) else: xprint(response.json()) # Fetching repository information def repo_profile(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username else: - repo_name = input("%Repository: ") - username = input("@Username: ") - response = requests.get(f"{self.endpoint}/repos/{self.username}/{repo_name}") + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: @@ -581,20 +571,22 @@ class Octosuite: for attr in self.repo_attrs: repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}") xprint(repo_profile_tree) - log_repo_profile(response) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_profile(response) else: xprint(response.json()) # Get path contents def path_contents(self): - if self.args.repository and self.args.username and self.args.path_name: - repo_name = self.args.repository - username = self.args.username - path_name = self.args.path_name + if args.repository and args.username and args.path_name: + repo_name = args.repository + username = args.username + path_name = args.path_name else: - repo_name = input("%Repository: ") - username = input("@Username: ") - path_name = input("~/path/name ") + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + path_name = Prompt.ask("~/path/name ") response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}") if response.status_code == 404: xprint(f"{NEGATIVE} {info_not_found.format(repo_name, username, path_name)}") @@ -611,14 +603,14 @@ class Octosuite: # repo contributors def repo_contributors(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username - limit = self.args.limit + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit else: - repo_name = input("%Repository: ") - username = input("@Username: ") - limit = input(limit_output.format("contributors")) + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("contributors")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -628,20 +620,22 @@ class Octosuite: for attr in self.user_attrs: contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}") xprint(contributor_tree) - log_repo_contributors(contributor, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_contributors(contributor, repo_name) else: xprint(response.json()) # repo stargazers def repo_stargazers(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username - limit = self.args.limit + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit else: - repo_name = input("%Repository: ") - username = input("@Username: ") - limit = input(limit_output.format("stargazers")) + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("stargazers")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -653,20 +647,22 @@ class Octosuite: for attr in self.user_attrs: stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}") xprint(stargazer_tree) - log_repo_stargazers(stargazer, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_stargazers(stargazer, repo_name) else: xprint(response.json()) # repo forks def repo_forks(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username - limit = self.args.limit + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit else: - repo_name = input("%Repository: ") - username = input("@Username: ") - limit = input(limit_output.format("forks")) + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("forks")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -678,20 +674,22 @@ class Octosuite: for attr in self.repo_attrs: fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}") xprint(fork_tree) - log_repo_forks(fork, count) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_forks(fork, count) else: xprint(response.json()) # Repo issues def repo_issues(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username - limit = self.args.limit + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit else: - repo_name = input("%Repository: ") - username = input("@Username: ") - limit = input(limit_output.format("issues")) + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("issues")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -710,14 +708,14 @@ class Octosuite: # Repo releases def repo_releases(self): - if self.args.repository and self.args.username and self.args.limit: - repo_name = self.args.repository - username = self.args.username - limit = self.args.limit + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit else: - repo_name = input("%Repository: ") - username = input("@Username: ") - limit = input(limit_output.format("repository releases")) + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("repository releases")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -730,18 +728,20 @@ class Octosuite: releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}") xprint(releases_tree) xprint(release['body']) - log_repo_releases(release, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_releases(release, repo_name) else: xprint(response.json()) # Fetching organization repositories def org_repos(self): - if self.args.organization and self.args.limit: - organization = self.args.organization - limit = self.args.limit + if args.organization and args.limit: + organization = args.organization + limit = args.limit else: - organization = input("@Organization: ") - limit = input(limit_output.format("organization repositories")) + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + limit = Prompt.ask(limit_output.format("organization repositories")) response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -751,18 +751,20 @@ class Octosuite: for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - log_org_repos(repository, organization) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_org_repos(repository, organization) else: xprint(response.json()) # organization events def org_events(self): - if self.args.organization and self.args.limit: - organization = self.args.organization - limit = self.args.limit + if args.organization and args.limit: + organization = args.organization + limit = args.limit else: - organization = input("@Organization: ") - limit = input(limit_output.format("organization events")) + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + limit = Prompt.ask(limit_output.format("organization events")) response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -779,12 +781,12 @@ class Octosuite: # organization member def org_member(self): - if self.args.organization and self.args.username: - organization = self.args.organization - username = self.args.username + if args.organization and args.username: + organization = args.organization + username = args.username else: - organization = input("@Organization: ") - username = input("@Username: ") + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: xprint(f"{POSITIVE} User ({username}) is a public member of the organization -> ({organization})") @@ -793,12 +795,12 @@ class Octosuite: # Fetching user repositories def user_repos(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("repositories")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("repositories")) response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -808,18 +810,20 @@ class Octosuite: for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - log_user_repos(repository, username) - else: - xprint(response.json()) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_repos(repository, username) + else: + xprint(response.json()) # Fetching user's gists def user_gists(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format('gists')) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format('gists')) response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}") if not response.json(): xprint(f"{NEGATIVE} User does not have gists.") @@ -831,18 +835,20 @@ class Octosuite: for attr in self.gists_attrs: gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}") xprint(gists_tree) - log_user_gists(gist) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_gists(gist) else: xprint(response.json()) # Fetching a list of organizations that a user owns or belongs to def user_orgs(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("user organizations")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user organizations")) response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}") if not response.json(): xprint(f"{NEGATIVE} User ({username}) does not (belong to/own) any organizations.") @@ -854,18 +860,20 @@ class Octosuite: for attr in self.user_orgs_attrs: org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}") xprint(org_tree) - log_user_orgs(organization, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_orgs(organization, username) else: xprint(response.json()) # Fetching a users events def user_events(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("events")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("events")) response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -884,12 +892,12 @@ class Octosuite: # Fetching a target user's subscriptions def user_subscriptions(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("user subscriptions")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user subscriptions")) response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}") if not response.json(): xprint(f"{NEGATIVE} User does not have any subscriptions.") @@ -901,18 +909,20 @@ class Octosuite: for attr in self.repo_attrs: subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(subscriptions_tree) - log_user_subscriptions(repository, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_subscriptions(repository, username) else: xprint(response.json()) # Fetching a list of users the target follows def user_following(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("user' following")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user' following")) response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}") if not response.json(): xprint(f"{NEGATIVE} User ({username})does not follow anyone.") @@ -924,18 +934,20 @@ class Octosuite: for attr in self.user_attrs: following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(following_tree) - log_user_following(user, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_following(user, username) else: xprint(response.json()) # Fetching user's followers def user_followers(self): - if self.args.username and self.args.limit: - username = self.args.username - limit = self.args.limit + if args.username and args.limit: + username = args.username + limit = args.limit else: - username = input("@Username: ") - limit = input(limit_output.format("user followers")) + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user followers")) response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}") if not response.json(): xprint(f"{NEGATIVE} User ({username})does not have followers.") @@ -947,18 +959,20 @@ class Octosuite: for attr in self.user_attrs: followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}") xprint(followers_tree) - log_user_followers(follower, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_followers(follower, username) else: xprint(response.json()) # Checking whether user[A] follows user[B] def user_follows(self): - if self.args.username and self.args.username_b: - user_a = self.args.username - user_b = self.args.username_b + if args.username and args.username_b: + user_a = args.username + user_b = args.username_b else: - user_a = input("@User_A: ") - user_b = input("@User_B") + user_a = Prompt.ask(f"{white}@{green}User_A{reset}") + user_b = Prompt.ask(f"{white}@{green}User_B{reset}") response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: xprint(f"{POSITIVE} @{user_a} FOLLOWS @{user_b}") @@ -967,60 +981,66 @@ class Octosuite: # User search def users_search(self): - if self.args.query and self.args.limit and self.args.limit: - query = self.args.query - limit = self.args.limit + if args.query and args.limit and args.limit: + query = args.query + limit = args.limit else: - query = input("@Username (search): ") - limit = input(limit_output.format("user search")) + query = Prompt.ask(f"{white}@{green}Username{reset} (search)") + limit = Prompt.ask(limit_output.format("user search")) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: users_search_tree = Tree("\n" + user['login']) for attr in self.user_attrs: users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(users_search_tree) - log_users_search(user, query) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_users_search(user, query) # Repository search def repos_search(self): - if self.args.query and self.args.limit: - query = self.args.query - limit = self.args.limit + if args.query and args.limit: + query = args.query + limit = args.limit else: - query = input("%Repository (search): ") - limit = input(limit_output.format("repositor[y][ies] search")) + query = Prompt.ask(f"{white}%{green}Repository{reset} (search)") + limit = Prompt.ask(limit_output.format("repositor[y][ies] search")) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: repos_search_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_search_tree) - log_repos_search(repository, query) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repos_search(repository, query) # Topics search def topics_search(self): - if self.args.query and self.args.limit: - query = self.args.query - limit = self.args.limit + if args.query and args.limit: + query = args.query + limit = args.limit else: - query = input(":Topic (search): ") - limit = input(limit_output.format("topic(s) search")) + query = Prompt.ask(f"{white}:{green}Topics{reset} (search)") + limit = Prompt.ask(limit_output.format("topic(s) search")) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: topics_search_tree = Tree("\n" + topic['name']) for attr in self.topic_attrs: topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") xprint(topics_search_tree) - log_topics_search(topic, query) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_topics_search(topic, query) # Issue search def issues_search(self): - if self.args.query and self.args.limit: - query = self.args.query - limit = self.args.limit + if args.query and args.limit: + query = args.query + limit = args.limit else: - query = input("!Issue (search): ") - limit = input(limit_output.format("issue(s) search")) + query = Prompt.ask(f"{white}!{green}Issues{reset} (search)") + limit = Prompt.ask(limit_output.format("issue(s) search")) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: issues_search_tree = Tree("\n" + issue['title']) @@ -1028,16 +1048,18 @@ class Octosuite: issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_search_tree) xprint(issue['body']) - log_issues_search(issue, query) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_issues_search(issue, query) # Commits search def commits_search(self): - if self.args.query and self.args.limit: - query = self.args.query - limit = self.args.limit + if args.query and args.limit: + query = args.query + limit = args.limit else: - query = input(";Commit (search): ") - limit = input(limit_output.format("commit(s) search")) + query = Prompt.ask(f"{white};{green}Commits{reset} (search)") + limit = Prompt.ask(limit_output.format("commit(s) search")) response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json() for commit in response['items']: commits_search_tree = Tree("\n" + commit['commit']['tree']['sha']) @@ -1049,7 +1071,9 @@ class Octosuite: commits_search_tree.add(f"URL: {commit['html_url']}") xprint(commits_search_tree) xprint(commit['commit']['message']) - log_commits_search(commit, query) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_commits_search(commit, query) # Downloading release tarball def download_tarball(self): @@ -1063,7 +1087,7 @@ class Octosuite: logging.info(file_downloaded.format(f"octosuite.v{version_tag}.tar")) xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.tar")) - # Downloading release zipball + # Downloading release zip ball def download_zipball(self): logging.info(file_downloading.format(f"octosuite.v{version_tag}.zip")) xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.zip"))