From d7af8602fe76fe0215bebd458739095f8205c656 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:46:42 +0200 Subject: [PATCH] Update octosuite.py --- octosuite/octosuite.py | 414 ++++++++++++++++++++--------------------- 1 file changed, 204 insertions(+), 210 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index f0088c2..5324633 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -13,9 +13,9 @@ from rich.table import Table from datetime import datetime from rich import print as xprint from octosuite.helper import Help -from octosuite.sign_vars import SignVar -from octosuite.log_roller import logRoller -from octosuite.csv_loggers import csvLogger +from octosuite.log_roller import LogRoller +from octosuite.csv_loggers import CsvLogger +from octosuite.message_prefixes import MessagePrefix from octosuite.banners import version_tag, ascii_banner from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset @@ -26,60 +26,60 @@ class Octosuite: self.endpoint = 'https://api.github.com' # A list of tuples mapping commands to their methods - self.command_map = [("exit", self.exitSession), - ("clear", self.clearScreen), + self.command_map = [("exit", self.exit_session), + ("clear", self.clear_screen), ("about", self.about), ("author", self.author), - ("help", Help.helpCommand), - ("help:source", Help.sourceCommand), - ("help:search", Help.searchCommand), - ("help:user", Help.userCommand), - ("help:repo", Help.repoCommand), - ("help:logs", Help.logsCommand), - ("help:csv", Help.csvCommand), - ("help:org", Help.orgCommand), - ("source", Help.Source), - ("source:tarball", self.downloadTarball), - ("source:zipball", self.downloadZipball), - ("org", Help.Org), - ("org:events", self.orgEvents), - ("org:profile", self.orgProfile), - ("org:repos", self.orgRepos), - ("org:member", self.orgMember), - ("repo", Help.Repo), - ("repo:path_contents", self.pathContents), - ("repo:profile", self.repoProfile), - ("repo:contributors", self.repoContributors), - ("repo:stargazers", self.repoStargazers), - ("repo:forks", self.repoForks), - ("repo:issues", self.repoIssues), - ("repo:releases", self.repoReleases), - ("user", Help.User), - ("user:repos", self.userRepos), - ("user:gists", self.userGists), - ("user:orgs", self.userOrgs), - ("user:profile", self.userProfile), - ("user:events", self.userEvents), - ("user:followers", self.userFollowers), - ("user:follows", self.userFollows), - ("user:following", self.userFollowing), - ("user:subscriptions", self.userSubscriptions), - ("search", Help.Search), - ("search:users", self.userSearch), - ("search:repos", self.repoSearch), - ("search:topics", self.topicSearch), - ("search:issues", self.issueSearch), - ("search:commits", self.commitsSearch), - ("logs", Help.Logs), - ("logs:view", self.viewLogs), - ("logs:read", self.readLog), - ("logs:delete", self.deleteLog), - ("logs:clear", self.clearLogs), - ("csv", Help.Csv), - ("csv:view", self.viewCsv), - ("csv:read", self.readCsv), - ("csv:delete", self.deleteCsv), - ("csv:clear", self.clearCsv)] + ("help", Help.help_command), + ("help:source", Help.source_command), + ("help:search", Help.search_command), + ("help:user", Help.user_command), + ("help:repo", Help.repo_command), + ("help:logs", Help.logs_command), + ("help:csv", Help.csv_command), + ("help:org", Help.org_command), + ("source", Help.cource), + ("source:tarball", self.download_tarball), + ("source:zipball", self.download_zipball), + ("org", Help.org), + ("org:events", self.org_events), + ("org:profile", self.org_profile), + ("org:repos", self.org_repos), + ("org:member", self.org_member), + ("repo", Help.repo), + ("repo:path_contents", self.path_contents), + ("repo:profile", self.repo_profile), + ("repo:contributors", self.repo_contributors), + ("repo:stargazers", self.repo_stargazers), + ("repo:forks", self.repo_forks), + ("repo:issues", self.repo_issues), + ("repo:releases", self.repo_releases), + ("user", Help.user), + ("user:repos", self.user_repos), + ("user:gists", self.user_gists), + ("user:orgs", self.user_orgs), + ("user:profile", self.user_profile), + ("user:events", self.user_events), + ("user:followers", self.user_followers), + ("user:follows", self.user_follows), + ("user:following", self.user_following), + ("user:subscriptions", self.user_subscriptions), + ("search", Help.search), + ("search:users", self.user_search), + ("search:repos", self.repo_search), + ("search:topics", self.topic_search), + ("search:issues", self.issue_search), + ("search:commits", self.commits_search), + ("logs", Help.logs), + ("logs:view", self.view_logs), + ("logs:read", self.read_log), + ("logs:delete", self.delete_log), + ("logs:clear", self.clear_logs), + ("csv", Help.csv), + ("csv:view", self.view_csv), + ("csv:read", self.read_csv), + ("csv:delete", self.delete_csv), + ("csv:clear", self.clear_csv)] # Path attribute self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url'] @@ -286,12 +286,12 @@ class Octosuite: """ - pathFinder() + path_finder() This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders, enabling logging to automatically log network/user activity to a file, and logging the start of a session. """ - def pathFinder(self): + def path_finder(self): """ Check 3 directories (.logs, output, downloads) on startup If they exist, ignore, otherwise, create them @@ -313,12 +313,10 @@ class Octosuite: logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG) # Log the start of a session - logging.info(logRoller.sessionOpened.format(platform.node(), getpass.getuser())) + logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser())) - """ - Check for updates - """ + # Check for updates def check_updates(self): response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() if response['tag_name'] == version_tag: @@ -327,19 +325,19 @@ class Octosuite: """ pass else: - xprint(f"{SignVar.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + xprint(f"{MessagePrefix.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") """ - onStart() + on_start() This is the main method, responsible for mapping commands, calling other methods, and catching exceptions """ - def onStart(self): - self.pathFinder() - self.clearScreen() + def on_start(self): + self.path_finder() + self.clear_screen() self.configure_logging() self.check_updates() - xprint(ascii_banner()[1], ascii_banner()[0}) + xprint(ascii_banner()[1], ascii_banner()[0]) """ Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C) @@ -362,60 +360,60 @@ class Octosuite: # Fetching organization info - def orgProfile(self): + def org_profile(self): xprint(f"{white}>> @{green}Organization {white}(username){reset} ", end="") organization = input() response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: org_profile_tree = Tree("\n" + response.json()['name']) for attr in self.org_attrs: org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}") xprint(org_profile_tree) - csvLogger.logOrgProfile(response) + CsvLogger.log_org_profile(response) else: xprint(response.json()) # Fetching user information - def userProfile(self): + def user_profile(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: user_profile_tree = Tree("\n" + response.json()['name']) for attr in self.profile_attrs: user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}") xprint(user_profile_tree) - csvLogger.logUserProfile(response) + CsvLogger.log_user_profile(response) else: xprint(response.json()) # Fetching repository information - def repoProfile(self): + def repo_profile(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") username = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: repo_profile_tree = Tree("\n" + response.json()['full_name']) for attr in self.repo_attrs: repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}") xprint(repo_profile_tree) - csvLogger.logRepoProfile(response) + CsvLogger.log_repo_profile(response) else: xprint(response.json()) # Get path contents - def pathContents(self): + def path_contents(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") @@ -424,101 +422,101 @@ class Octosuite: path_name = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.infoNotFound.format(repo_name, username, path_name)}") + xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}") elif response.status_code == 200: for content_count, content in enumerate(response.json(), start=1): path_contents_tree = Tree("\n" + content['name']) for attr in self.path_attrs: path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}") xprint(path_contents_tree) - csvLogger.logRepoPathContents(content, repo_name) - xprint(SignVar.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.") + CsvLogger.log_repo_path_contents(content, repo_name) + xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.") else: xprint(response.json()) # repo contributors - def repoContributors(self): + def repo_contributors(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("contributors"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: for contributor in response.json(): contributor_tree = Tree("\n" + contributor['login']) for attr in self.user_attrs: contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}") xprint(contributor_tree) - csvLogger.logRepoContributors(contributor, repo_name) + CsvLogger.log_repo_contributors(contributor, repo_name) else: xprint(response.json()) # repo stargazers - def repoStargazers(self): + def repo_stargazers(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository stargazers"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{SignVar.negative} Repository does not have any stargazers -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})") elif response.status_code == 200: for stargazer in response.json(): stargazer_tree = Tree("\n" + stargazer['login']) for attr in self.user_attrs: stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}") xprint(stargazer_tree) - csvLogger.logRepoStargazers(stargazer, repo_name) + CsvLogger.log_repo_stargazers(stargazer, repo_name) else: xprint(response.json()) # repo forks - def repoForks(self): + def repo_forks(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository forks"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{SignVar.negative} Repository does not have forks -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})") elif response.status_code == 200: for count, fork in enumerate(response.json()): fork_tree = Tree("\n" + fork['full_name']) for attr in self.repo_attrs: fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}") xprint(fork_tree) - csvLogger.logRepoForks(fork, count) + CsvLogger.log_repo_forks(fork, count) else: xprint(response.json()) # Repo issues - def repoIssues(self): + def repo_issues(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository issues"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{SignVar.negative} Repository does not have open issues -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})") elif response.status_code == 200: for issue in response.json(): issues_tree = Tree("\n" + issue['title']) @@ -526,24 +524,24 @@ class Octosuite: issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_tree) xprint(issue['body']) - csvLogger.logRepoIssues(issue, repo_name) + CsvLogger.log_repo_issues(issue, repo_name) else: xprint(response.json()) # Repo releases - def repoReleases(self): + def repo_releases(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository releases"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{SignVar.negative} Repository does not have releases -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})") elif response.status_code == 200: for release in response.json(): releases_tree = Tree("\n" + release['name']) @@ -551,40 +549,40 @@ class Octosuite: releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}") xprint(releases_tree) xprint(release['body']) - csvLogger.logRepoReleases(release, repo_name) + CsvLogger.log_repo_releases(release, repo_name) else: xprint(response.json()) # Fetching organization repositories - def orgRepos(self): + def org_repos(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() - xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - csvLogger.logOrgRepos(repository, organization) + CsvLogger.log_org_repos(repository, organization) else: xprint(response.json()) # organization events - def orgEvents(self): + def org_events(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() - xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -592,97 +590,97 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - csvLogger.logOrgEvents(event, organization) + CsvLogger.log_org_events(event, organization) else: xprint(response.json()) # organization member - def orgMember(self): + def org_member(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: - xprint(f"{SignVar.positive} User ({username}) is a public member of the organization -> ({organization})") + xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})") else: - xprint(f"{SignVar.negative} {response.json()['message']}") + xprint(f"{MessagePrefix.negative} {response.json()['message']}") # Fetching user repositories - def userRepos(self): + def user_repos(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - csvLogger.logUserRepos(repository, username) + CsvLogger.log_user_repos(repository, username) else: xprint(response.json()) # Fetching user's gists - def userGists(self): + def user_gists(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format('gists'), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User does not have gists.") + xprint(f"{MessagePrefix.negative} User does not have gists.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for gist in response.json(): gists_tree = Tree("\n" + gist['id']) for attr in self.gists_attrs: gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}") xprint(gists_tree) - csvLogger.logUserGists(gist) + CsvLogger.log_user_gists(gist) else: xprint(response.json()) # Fetching a list of organizations that a user owns or belongs to - def userOrgs(self): + def user_orgs(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("user organizations"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username}) does not (belong to/own) any organizations.") + xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for organization in response.json(): org_tree = Tree("\n" + organization['login']) for attr in self.user_orgs_attrs: org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}") xprint(org_tree) - csvLogger.logUserOrgs(organization, username) + CsvLogger.log_user_orgs(organization, username) else: xprint(response.json()) # Fetching a users events - def userEvents(self): + def user_events(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("events"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -692,95 +690,95 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - csvLogger.logUserEvents(event) + CsvLogger.log_user_events(event) else: xprint(response.json()) # Fetching a target user's subscriptions - def userSubscriptions(self): + def user_subscriptions(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user subscriptions"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User does not have any subscriptions.") + xprint(f"{MessagePrefix.negative} User does not have any subscriptions.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for repository in response.json(): subscriptions_tree =Tree("\n" + repository['full_name']) for attr in self.repo_attrs: subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(subscriptions_tree) - csvLogger.logUserSubscriptions(repository, username) + CsvLogger.log_user_subscriptions(repository, username) else: xprint(response.json()) # Fetching a list of users the target follows - def userFollowing(self): + def user_following(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user' following"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username})does not follow anyone.") + xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for user in response.json(): following_tree = Tree("\n" + user['login']) for attr in self.user_attrs: following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(following_tree) - csvLogger.logUserFollowing(user, username) + CsvLogger.log_user_following(user, username) else: xprint(response.json()) # Fetching user's followers - def userFollowers(self): + def user_followers(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user followers"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username})does not have followers.") + xprint(f"{MessagePrefix.negative} User ({username})does not have followers.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for follower in response.json(): followers_tree = Tree("\n" + follower['login']) for attr in self.user_attrs: followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}") xprint(followers_tree) - csvLogger.logUserFollowers(follower, username) + CsvLogger.log_user_followers(follower, username) else: xprint(response.json()) # Checking whether user[A] follows user[B] - def userFollows(self): + def user_follows(self): xprint(f"{white}>> @{green}user{white}(A) (username){reset} ", end="") user_a = input() xprint(f"{white}>> @{green}user{white}(B) (username){reset} ", end="") user_b = input() response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: - xprint(f"{SignVar.positive} @{user_a} FOLLOWS @{user_b}") + xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}") else: - xprint(f"{SignVar.negative} @{user_a} DOES NOT FOLLOW @{user_b}") + xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}") # User search - def userSearch(self): + def users_search(self): xprint(f"{white}>> @{green}Query{white} (eg. john){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("user search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: @@ -788,14 +786,14 @@ class Octosuite: for attr in self.user_attrs: user_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(user_search_tree) - csvLogger.logUserSearch(user, query) + CsvLogger.log_users_search(user, query) # Repository search - def repoSearch(self): + def repos_search(self): xprint(f"{white}>> %{green}Query{white} (eg. git){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repositor[y][ies] search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: @@ -803,14 +801,14 @@ class Octosuite: for attr in self.repo_attrs: repo_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repo_search_tree) - csvLogger.logRepoSearch(repository, query) + CsvLogger.log_repos_search(repository, query) # Topics search - def topicSearch(self): + def topics_search(self): xprint(f"{white}>> #{green}Query{white} (eg. osint){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("topic(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: @@ -818,14 +816,14 @@ class Octosuite: for attr in self.topic_attrs: topic_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") xprint(topic_search_tree) - csvLogger.logTopicSearch(topic, query) + CsvLogger.log_topics_search(topic, query) # Issue search - def issueSearch(self): + def issues_search(self): xprint(f"{white}>> !{green}Query{white} (eg. error){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("issue(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: @@ -834,14 +832,14 @@ class Octosuite: issue_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issue_search_tree) xprint(issue['body']) - csvLogger.logIssueSearch(issue, query) + CsvLogger.log_issues_search(issue, query) # Commits search - def commitsSearch(self): + def commits_search(self): xprint(f"{white}>> :{green}Query{white} (eg. filename:index.php){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("commit(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json() for commit in response['items']: @@ -854,12 +852,12 @@ class Octosuite: commits_search_tree.add(f"URL: {commit['html_url']}") xprint(commits_search_tree) xprint(commit['commit']['message']) - csvLogger.logCommitsSearch(commit, query) + CsvLogger.log_commits_search(commit, query) # View csv files - def viewCsv(self): - logging.info(logRoller.viewingCsv) + def view_csv(self): + logging.info(LogRoller.viewing_csv) csv_files = os.listdir("output") csv_table = Table(show_header=True, header_style=header_title) csv_table.add_column("CSV", style="dim") @@ -870,39 +868,38 @@ class Octosuite: # Read a specified csv file - def readCsv(self): + def read_csv(self): xprint(f"{white}>> {green}.csv {reset}(filename) ", end="") csv_file = input() with open(os.path.join("output", csv_file + ".csv"), "r") as file: - logging.info(logRoller.readingCsv.format(csv_file)) + logging.info(LogRoller.reading_csv.format(csv_file)) text = Text(file.read()) xprint(text) - # xprint("\n" + file.read()) # Delete a specified csv file - def deleteCsv(self): + def delete_csv(self): xprint(f"{white}>> {green}.csv {reset}filename{reset} ", end="") csv_file = input() os.remove(os.path.join("output", csv_file)) - logging.info(logRoller.deletedCsv.format(csv_file)) - xprint(f"{SignVar.positive} {logRoller.deletedCsv.format(csv_file)}") + logging.info(LogRoller.deleted_csv.format(csv_file)) + xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}") # Clear csv - def clearCsv(self): - xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (y/n) ", end="") + def clear_csv(self): + xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == "y": + if prompt == "yes": shutil.rmtree("output", ignore_errors=True) - xprint(f"{SignVar.info} CSV files cleared successfully!") + xprint(f"{MessagePrefix.info} CSV files cleared successfully!") else: pass # View octosuite log files - def viewLogs(self): - logging.info(logRoller.viewingLogs) + def view_logs(self): + logging.info(LogRoller.viewing_logs) logs = os.listdir(".logs") logs_table = Table(show_header=True, header_style=header_title) logs_table.add_column("Log", style="dim") @@ -913,60 +910,60 @@ class Octosuite: # Read a specified log file - def readLog(self): + def read_log(self): xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") log_file = input() with open(os.path.join(".logs", log_file + ".log"), "r") as log: - logging.info(logRoller.readingLog.format(log_file)) + logging.info(LogRoller.reading_log.format(log_file)) xprint("\n" + log.read()) # Delete a specified log file - def deleteLog(self): + def delete_log(self): xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") log_file = input() os.remove(os.path.join(".logs", log_file)) - logging.info(logRoller.deletedLog.format(log_file)) - xprint(f"{SignVar.positive} {logRoller.deletedLog.format(log_file)}") + logging.info(LogRoller.deleted_log.format(log_file)) + xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}") # Clear logs - def clearLogs(self): - xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (y/n) ", end="") + def clear_logs(self): + xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == "y": + if prompt == "yes": shutil.rmtree(".logs", ignore_errors=True) - xprint(f"{SignVar.info} Logs cleared successfully!") - xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}") + xprint(f"{MessagePrefix.info} Logs cleared successfully!") + xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") exit() else: pass # Downloading release tarball - def downloadTarball(self): - logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar")) - xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar")) + def download_tarball(self): + logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) + xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file: file.write(data.content) file.close() - logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar")) - xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar")) + logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) + xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) # Downloading release zipball - def downloadZipball(self): - logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip")) - xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip")) + def download_zipball(self): + logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) + xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file: file.write(data.content) file.close() - logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip")) - xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip")) + logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) + xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) # Author info @@ -999,25 +996,22 @@ GitHub REST API documentation: https://docs.github.com/rest # Close session - def exitSession(self): - xprint(f"{SignVar.prompt} This will close the current session, continue? (Y/n) ", end="") + def exit_session(self): + xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == 'y': - logging.info(logRoller.sessionClosed.format(datetime.now())) - xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}") + if prompt == "yes": + logging.info(LogRoller.session_closed.format(datetime.now())) + xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") exit() else: pass # Clear screen - def clearScreen(self): + def clear_screen(self): """ using 'cls' on Windows machines to clear the screen, otherwise, use 'clear' """ - if sys.platform.lower().startswith("win"): - os.system('cls') - else: - os.system('clear') + os.system('cls' if os.name == 'nt' else 'clear')