Update octosuite.py

This commit is contained in:
Richard Mwewa
2022-11-25 01:46:42 +02:00
committed by GitHub
parent 29d40ec27e
commit d7af8602fe

View File

@@ -13,9 +13,9 @@ from rich.table import Table
from datetime import datetime
from rich import print as xprint
from octosuite.helper import Help
from octosuite.sign_vars import SignVar
from octosuite.log_roller import logRoller
from octosuite.csv_loggers import csvLogger
from octosuite.log_roller import LogRoller
from octosuite.csv_loggers import CsvLogger
from octosuite.message_prefixes import MessagePrefix
from octosuite.banners import version_tag, ascii_banner
from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset
@@ -26,60 +26,60 @@ class Octosuite:
self.endpoint = 'https://api.github.com'
# A list of tuples mapping commands to their methods
self.command_map = [("exit", self.exitSession),
("clear", self.clearScreen),
self.command_map = [("exit", self.exit_session),
("clear", self.clear_screen),
("about", self.about),
("author", self.author),
("help", Help.helpCommand),
("help:source", Help.sourceCommand),
("help:search", Help.searchCommand),
("help:user", Help.userCommand),
("help:repo", Help.repoCommand),
("help:logs", Help.logsCommand),
("help:csv", Help.csvCommand),
("help:org", Help.orgCommand),
("source", Help.Source),
("source:tarball", self.downloadTarball),
("source:zipball", self.downloadZipball),
("org", Help.Org),
("org:events", self.orgEvents),
("org:profile", self.orgProfile),
("org:repos", self.orgRepos),
("org:member", self.orgMember),
("repo", Help.Repo),
("repo:path_contents", self.pathContents),
("repo:profile", self.repoProfile),
("repo:contributors", self.repoContributors),
("repo:stargazers", self.repoStargazers),
("repo:forks", self.repoForks),
("repo:issues", self.repoIssues),
("repo:releases", self.repoReleases),
("user", Help.User),
("user:repos", self.userRepos),
("user:gists", self.userGists),
("user:orgs", self.userOrgs),
("user:profile", self.userProfile),
("user:events", self.userEvents),
("user:followers", self.userFollowers),
("user:follows", self.userFollows),
("user:following", self.userFollowing),
("user:subscriptions", self.userSubscriptions),
("search", Help.Search),
("search:users", self.userSearch),
("search:repos", self.repoSearch),
("search:topics", self.topicSearch),
("search:issues", self.issueSearch),
("search:commits", self.commitsSearch),
("logs", Help.Logs),
("logs:view", self.viewLogs),
("logs:read", self.readLog),
("logs:delete", self.deleteLog),
("logs:clear", self.clearLogs),
("csv", Help.Csv),
("csv:view", self.viewCsv),
("csv:read", self.readCsv),
("csv:delete", self.deleteCsv),
("csv:clear", self.clearCsv)]
("help", Help.help_command),
("help:source", Help.source_command),
("help:search", Help.search_command),
("help:user", Help.user_command),
("help:repo", Help.repo_command),
("help:logs", Help.logs_command),
("help:csv", Help.csv_command),
("help:org", Help.org_command),
("source", Help.cource),
("source:tarball", self.download_tarball),
("source:zipball", self.download_zipball),
("org", Help.org),
("org:events", self.org_events),
("org:profile", self.org_profile),
("org:repos", self.org_repos),
("org:member", self.org_member),
("repo", Help.repo),
("repo:path_contents", self.path_contents),
("repo:profile", self.repo_profile),
("repo:contributors", self.repo_contributors),
("repo:stargazers", self.repo_stargazers),
("repo:forks", self.repo_forks),
("repo:issues", self.repo_issues),
("repo:releases", self.repo_releases),
("user", Help.user),
("user:repos", self.user_repos),
("user:gists", self.user_gists),
("user:orgs", self.user_orgs),
("user:profile", self.user_profile),
("user:events", self.user_events),
("user:followers", self.user_followers),
("user:follows", self.user_follows),
("user:following", self.user_following),
("user:subscriptions", self.user_subscriptions),
("search", Help.search),
("search:users", self.user_search),
("search:repos", self.repo_search),
("search:topics", self.topic_search),
("search:issues", self.issue_search),
("search:commits", self.commits_search),
("logs", Help.logs),
("logs:view", self.view_logs),
("logs:read", self.read_log),
("logs:delete", self.delete_log),
("logs:clear", self.clear_logs),
("csv", Help.csv),
("csv:view", self.view_csv),
("csv:read", self.read_csv),
("csv:delete", self.delete_csv),
("csv:clear", self.clear_csv)]
# Path attribute
self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url']
@@ -286,12 +286,12 @@ class Octosuite:
"""
pathFinder()
path_finder()
This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders,
enabling logging to automatically log network/user activity to a file,
and logging the start of a session.
"""
def pathFinder(self):
def path_finder(self):
"""
Check 3 directories (.logs, output, downloads) on startup
If they exist, ignore, otherwise, create them
@@ -313,12 +313,10 @@ class Octosuite:
logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG)
# Log the start of a session
logging.info(logRoller.sessionOpened.format(platform.node(), getpass.getuser()))
logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser()))
"""
Check for updates
"""
# Check for updates
def check_updates(self):
response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json()
if response['tag_name'] == version_tag:
@@ -327,19 +325,19 @@ class Octosuite:
"""
pass
else:
xprint(f"{SignVar.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
xprint(f"{MessagePrefix.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
"""
onStart()
on_start()
This is the main method, responsible for mapping commands, calling other methods, and catching exceptions
"""
def onStart(self):
self.pathFinder()
self.clearScreen()
def on_start(self):
self.path_finder()
self.clear_screen()
self.configure_logging()
self.check_updates()
xprint(ascii_banner()[1], ascii_banner()[0})
xprint(ascii_banner()[1], ascii_banner()[0])
"""
Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C)
@@ -362,60 +360,60 @@ class Octosuite:
# Fetching organization info
def orgProfile(self):
def org_profile(self):
xprint(f"{white}>> @{green}Organization {white}(username){reset} ", end="")
organization = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
org_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.org_attrs:
org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}")
xprint(org_profile_tree)
csvLogger.logOrgProfile(response)
CsvLogger.log_org_profile(response)
else:
xprint(response.json())
# Fetching user information
def userProfile(self):
def user_profile(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
response = requests.get(f"{self.endpoint}/users/{username}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
user_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.profile_attrs:
user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}")
xprint(user_profile_tree)
csvLogger.logUserProfile(response)
CsvLogger.log_user_profile(response)
else:
xprint(response.json())
# Fetching repository information
def repoProfile(self):
def repo_profile(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
username = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
repo_profile_tree = Tree("\n" + response.json()['full_name'])
for attr in self.repo_attrs:
repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}")
xprint(repo_profile_tree)
csvLogger.logRepoProfile(response)
CsvLogger.log_repo_profile(response)
else:
xprint(response.json())
# Get path contents
def pathContents(self):
def path_contents(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
@@ -424,101 +422,101 @@ class Octosuite:
path_name = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.infoNotFound.format(repo_name, username, path_name)}")
xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}")
elif response.status_code == 200:
for content_count, content in enumerate(response.json(), start=1):
path_contents_tree = Tree("\n" + content['name'])
for attr in self.path_attrs:
path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}")
xprint(path_contents_tree)
csvLogger.logRepoPathContents(content, repo_name)
xprint(SignVar.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
CsvLogger.log_repo_path_contents(content, repo_name)
xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
else:
xprint(response.json())
# repo contributors
def repoContributors(self):
def repo_contributors(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("contributors"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
for contributor in response.json():
contributor_tree = Tree("\n" + contributor['login'])
for attr in self.user_attrs:
contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}")
xprint(contributor_tree)
csvLogger.logRepoContributors(contributor, repo_name)
CsvLogger.log_repo_contributors(contributor, repo_name)
else:
xprint(response.json())
# repo stargazers
def repoStargazers(self):
def repo_stargazers(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository stargazers"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{SignVar.negative} Repository does not have any stargazers -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})")
elif response.status_code == 200:
for stargazer in response.json():
stargazer_tree = Tree("\n" + stargazer['login'])
for attr in self.user_attrs:
stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}")
xprint(stargazer_tree)
csvLogger.logRepoStargazers(stargazer, repo_name)
CsvLogger.log_repo_stargazers(stargazer, repo_name)
else:
xprint(response.json())
# repo forks
def repoForks(self):
def repo_forks(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository forks"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{SignVar.negative} Repository does not have forks -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})")
elif response.status_code == 200:
for count, fork in enumerate(response.json()):
fork_tree = Tree("\n" + fork['full_name'])
for attr in self.repo_attrs:
fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}")
xprint(fork_tree)
csvLogger.logRepoForks(fork, count)
CsvLogger.log_repo_forks(fork, count)
else:
xprint(response.json())
# Repo issues
def repoIssues(self):
def repo_issues(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository issues"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{SignVar.negative} Repository does not have open issues -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})")
elif response.status_code == 200:
for issue in response.json():
issues_tree = Tree("\n" + issue['title'])
@@ -526,24 +524,24 @@ class Octosuite:
issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issues_tree)
xprint(issue['body'])
csvLogger.logRepoIssues(issue, repo_name)
CsvLogger.log_repo_issues(issue, repo_name)
else:
xprint(response.json())
# Repo releases
def repoReleases(self):
def repo_releases(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository releases"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{SignVar.negative} Repository does not have releases -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})")
elif response.status_code == 200:
for release in response.json():
releases_tree = Tree("\n" + release['name'])
@@ -551,40 +549,40 @@ class Octosuite:
releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}")
xprint(releases_tree)
xprint(release['body'])
csvLogger.logRepoReleases(release, repo_name)
CsvLogger.log_repo_releases(release, repo_name)
else:
xprint(response.json())
# Fetching organization repositories
def orgRepos(self):
def org_repos(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
csvLogger.logOrgRepos(repository, organization)
CsvLogger.log_org_repos(repository, organization)
else:
xprint(response.json())
# organization events
def orgEvents(self):
def org_events(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -592,97 +590,97 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
csvLogger.logOrgEvents(event, organization)
CsvLogger.log_org_events(event, organization)
else:
xprint(response.json())
# organization member
def orgMember(self):
def org_member(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}")
if response.status_code == 204:
xprint(f"{SignVar.positive} User ({username}) is a public member of the organization -> ({organization})")
xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})")
else:
xprint(f"{SignVar.negative} {response.json()['message']}")
xprint(f"{MessagePrefix.negative} {response.json()['message']}")
# Fetching user repositories
def userRepos(self):
def user_repos(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
csvLogger.logUserRepos(repository, username)
CsvLogger.log_user_repos(repository, username)
else:
xprint(response.json())
# Fetching user's gists
def userGists(self):
def user_gists(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format('gists'), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User does not have gists.")
xprint(f"{MessagePrefix.negative} User does not have gists.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for gist in response.json():
gists_tree = Tree("\n" + gist['id'])
for attr in self.gists_attrs:
gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}")
xprint(gists_tree)
csvLogger.logUserGists(gist)
CsvLogger.log_user_gists(gist)
else:
xprint(response.json())
# Fetching a list of organizations that a user owns or belongs to
def userOrgs(self):
def user_orgs(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("user organizations"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username}) does not (belong to/own) any organizations.")
xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for organization in response.json():
org_tree = Tree("\n" + organization['login'])
for attr in self.user_orgs_attrs:
org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}")
xprint(org_tree)
csvLogger.logUserOrgs(organization, username)
CsvLogger.log_user_orgs(organization, username)
else:
xprint(response.json())
# Fetching a users events
def userEvents(self):
def user_events(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("events"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -692,95 +690,95 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
csvLogger.logUserEvents(event)
CsvLogger.log_user_events(event)
else:
xprint(response.json())
# Fetching a target user's subscriptions
def userSubscriptions(self):
def user_subscriptions(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user subscriptions"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User does not have any subscriptions.")
xprint(f"{MessagePrefix.negative} User does not have any subscriptions.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for repository in response.json():
subscriptions_tree =Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(subscriptions_tree)
csvLogger.logUserSubscriptions(repository, username)
CsvLogger.log_user_subscriptions(repository, username)
else:
xprint(response.json())
# Fetching a list of users the target follows
def userFollowing(self):
def user_following(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user' following"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username})does not follow anyone.")
xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for user in response.json():
following_tree = Tree("\n" + user['login'])
for attr in self.user_attrs:
following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(following_tree)
csvLogger.logUserFollowing(user, username)
CsvLogger.log_user_following(user, username)
else:
xprint(response.json())
# Fetching user's followers
def userFollowers(self):
def user_followers(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user followers"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username})does not have followers.")
xprint(f"{MessagePrefix.negative} User ({username})does not have followers.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for follower in response.json():
followers_tree = Tree("\n" + follower['login'])
for attr in self.user_attrs:
followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}")
xprint(followers_tree)
csvLogger.logUserFollowers(follower, username)
CsvLogger.log_user_followers(follower, username)
else:
xprint(response.json())
# Checking whether user[A] follows user[B]
def userFollows(self):
def user_follows(self):
xprint(f"{white}>> @{green}user{white}(A) (username){reset} ", end="")
user_a = input()
xprint(f"{white}>> @{green}user{white}(B) (username){reset} ", end="")
user_b = input()
response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}")
if response.status_code == 204:
xprint(f"{SignVar.positive} @{user_a} FOLLOWS @{user_b}")
xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}")
else:
xprint(f"{SignVar.negative} @{user_a} DOES NOT FOLLOW @{user_b}")
xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}")
# User search
def userSearch(self):
def users_search(self):
xprint(f"{white}>> @{green}Query{white} (eg. john){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("user search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json()
for user in response['items']:
@@ -788,14 +786,14 @@ class Octosuite:
for attr in self.user_attrs:
user_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(user_search_tree)
csvLogger.logUserSearch(user, query)
CsvLogger.log_users_search(user, query)
# Repository search
def repoSearch(self):
def repos_search(self):
xprint(f"{white}>> %{green}Query{white} (eg. git){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repositor[y][ies] search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json()
for repository in response['items']:
@@ -803,14 +801,14 @@ class Octosuite:
for attr in self.repo_attrs:
repo_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repo_search_tree)
csvLogger.logRepoSearch(repository, query)
CsvLogger.log_repos_search(repository, query)
# Topics search
def topicSearch(self):
def topics_search(self):
xprint(f"{white}>> #{green}Query{white} (eg. osint){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("topic(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json()
for topic in response['items']:
@@ -818,14 +816,14 @@ class Octosuite:
for attr in self.topic_attrs:
topic_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}")
xprint(topic_search_tree)
csvLogger.logTopicSearch(topic, query)
CsvLogger.log_topics_search(topic, query)
# Issue search
def issueSearch(self):
def issues_search(self):
xprint(f"{white}>> !{green}Query{white} (eg. error){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("issue(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json()
for issue in response['items']:
@@ -834,14 +832,14 @@ class Octosuite:
issue_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issue_search_tree)
xprint(issue['body'])
csvLogger.logIssueSearch(issue, query)
CsvLogger.log_issues_search(issue, query)
# Commits search
def commitsSearch(self):
def commits_search(self):
xprint(f"{white}>> :{green}Query{white} (eg. filename:index.php){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("commit(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json()
for commit in response['items']:
@@ -854,12 +852,12 @@ class Octosuite:
commits_search_tree.add(f"URL: {commit['html_url']}")
xprint(commits_search_tree)
xprint(commit['commit']['message'])
csvLogger.logCommitsSearch(commit, query)
CsvLogger.log_commits_search(commit, query)
# View csv files
def viewCsv(self):
logging.info(logRoller.viewingCsv)
def view_csv(self):
logging.info(LogRoller.viewing_csv)
csv_files = os.listdir("output")
csv_table = Table(show_header=True, header_style=header_title)
csv_table.add_column("CSV", style="dim")
@@ -870,39 +868,38 @@ class Octosuite:
# Read a specified csv file
def readCsv(self):
def read_csv(self):
xprint(f"{white}>> {green}.csv {reset}(filename) ", end="")
csv_file = input()
with open(os.path.join("output", csv_file + ".csv"), "r") as file:
logging.info(logRoller.readingCsv.format(csv_file))
logging.info(LogRoller.reading_csv.format(csv_file))
text = Text(file.read())
xprint(text)
# xprint("\n" + file.read())
# Delete a specified csv file
def deleteCsv(self):
def delete_csv(self):
xprint(f"{white}>> {green}.csv {reset}filename{reset} ", end="")
csv_file = input()
os.remove(os.path.join("output", csv_file))
logging.info(logRoller.deletedCsv.format(csv_file))
xprint(f"{SignVar.positive} {logRoller.deletedCsv.format(csv_file)}")
logging.info(LogRoller.deleted_csv.format(csv_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}")
# Clear csv
def clearCsv(self):
xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (y/n) ", end="")
def clear_csv(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "y":
if prompt == "yes":
shutil.rmtree("output", ignore_errors=True)
xprint(f"{SignVar.info} CSV files cleared successfully!")
xprint(f"{MessagePrefix.info} CSV files cleared successfully!")
else:
pass
# View octosuite log files
def viewLogs(self):
logging.info(logRoller.viewingLogs)
def view_logs(self):
logging.info(LogRoller.viewing_logs)
logs = os.listdir(".logs")
logs_table = Table(show_header=True, header_style=header_title)
logs_table.add_column("Log", style="dim")
@@ -913,60 +910,60 @@ class Octosuite:
# Read a specified log file
def readLog(self):
def read_log(self):
xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="")
log_file = input()
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
logging.info(logRoller.readingLog.format(log_file))
logging.info(LogRoller.reading_log.format(log_file))
xprint("\n" + log.read())
# Delete a specified log file
def deleteLog(self):
def delete_log(self):
xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="")
log_file = input()
os.remove(os.path.join(".logs", log_file))
logging.info(logRoller.deletedLog.format(log_file))
xprint(f"{SignVar.positive} {logRoller.deletedLog.format(log_file)}")
logging.info(LogRoller.deleted_log.format(log_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}")
# Clear logs
def clearLogs(self):
xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (y/n) ", end="")
def clear_logs(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "y":
if prompt == "yes":
shutil.rmtree(".logs", ignore_errors=True)
xprint(f"{SignVar.info} Logs cleared successfully!")
xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}")
xprint(f"{MessagePrefix.info} Logs cleared successfully!")
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
exit()
else:
pass
# Downloading release tarball
def downloadTarball(self):
logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar"))
xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar"))
def download_tarball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file:
file.write(data.content)
file.close()
logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar"))
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
# Downloading release zipball
def downloadZipball(self):
logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip"))
xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip"))
def download_zipball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file:
file.write(data.content)
file.close()
logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip"))
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
# Author info
@@ -999,25 +996,22 @@ GitHub REST API documentation: https://docs.github.com/rest
# Close session
def exitSession(self):
xprint(f"{SignVar.prompt} This will close the current session, continue? (Y/n) ", end="")
def exit_session(self):
xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == 'y':
logging.info(logRoller.sessionClosed.format(datetime.now()))
xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}")
if prompt == "yes":
logging.info(LogRoller.session_closed.format(datetime.now()))
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
exit()
else:
pass
# Clear screen
def clearScreen(self):
def clear_screen(self):
"""
using 'cls' on Windows machines to clear the screen,
otherwise, use 'clear'
"""
if sys.platform.lower().startswith("win"):
os.system('cls')
else:
os.system('clear')
os.system('cls' if os.name == 'nt' else 'clear')