Update octosuite.py

This commit is contained in:
Richard Mwewa
2022-11-25 02:04:54 +02:00
committed by GitHub
parent 94c1e0a737
commit 597ad07490

View File

@@ -7,15 +7,14 @@ import logging
import getpass
import requests
import platform
from rich.text import Text
from rich.tree import Tree
from rich.table import Table
from datetime import datetime
from rich import print as xprint
from octosuite.helper import Help
from octosuite.sign_vars import SignVar
from octosuite.log_roller import logRoller
from octosuite.csv_loggers import csvLogger
from octosuite.log_roller import LogRoller
from octosuite.csv_loggers import CsvLogger
from octosuite.message_prefixes import MessagePrefix
from octosuite.banners import version_tag, ascii_banner
from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset
@@ -26,60 +25,60 @@ class Octosuite:
self.endpoint = 'https://api.github.com'
# A list of tuples mapping commands to their methods
self.command_map = [("exit", self.exitSession),
("clear", self.clearScreen),
self.command_map = [("exit", self.exit_session),
("clear", self.clear_screen),
("about", self.about),
("author", self.author),
("help", Help.helpCommand),
("help:source", Help.sourceCommand),
("help:search", Help.searchCommand),
("help:user", Help.userCommand),
("help:repo", Help.repoCommand),
("help:logs", Help.logsCommand),
("help:csv", Help.csvCommand),
("help:org", Help.orgCommand),
("source", Help.Source),
("source:tarball", self.downloadTarball),
("source:zipball", self.downloadZipball),
("org", Help.Org),
("org:events", self.orgEvents),
("org:profile", self.orgProfile),
("org:repos", self.orgRepos),
("org:member", self.orgMember),
("repo", Help.Repo),
("repo:path_contents", self.pathContents),
("repo:profile", self.repoProfile),
("repo:contributors", self.repoContributors),
("repo:stargazers", self.repoStargazers),
("repo:forks", self.repoForks),
("repo:issues", self.repoIssues),
("repo:releases", self.repoReleases),
("user", Help.User),
("user:repos", self.userRepos),
("user:gists", self.userGists),
("user:orgs", self.userOrgs),
("user:profile", self.userProfile),
("user:events", self.userEvents),
("user:followers", self.userFollowers),
("user:follows", self.userFollows),
("user:following", self.userFollowing),
("user:subscriptions", self.userSubscriptions),
("search", Help.Search),
("search:users", self.userSearch),
("search:repos", self.repoSearch),
("search:topics", self.topicSearch),
("search:issues", self.issueSearch),
("search:commits", self.commitsSearch),
("logs", Help.Logs),
("logs:view", self.viewLogs),
("logs:read", self.readLog),
("logs:delete", self.deleteLog),
("logs:clear", self.clearLogs),
("csv", Help.Csv),
("csv:view", self.viewCsv),
("csv:read", self.readCsv),
("csv:delete", self.deleteCsv),
("csv:clear", self.clearCsv)]
("help", Help.help_command),
("help:source", Help.source_command),
("help:search", Help.search_command),
("help:user", Help.user_command),
("help:repo", Help.repo_command),
("help:logs", Help.logs_command),
("help:csv", Help.csv_command),
("help:org", Help.org_command),
("source", Help.cource),
("source:tarball", self.download_tarball),
("source:zipball", self.download_zipball),
("org", Help.org),
("org:events", self.org_events),
("org:profile", self.org_profile),
("org:repos", self.org_repos),
("org:member", self.org_member),
("repo", Help.repo),
("repo:path_contents", self.path_contents),
("repo:profile", self.repo_profile),
("repo:contributors", self.repo_contributors),
("repo:stargazers", self.repo_stargazers),
("repo:forks", self.repo_forks),
("repo:issues", self.repo_issues),
("repo:releases", self.repo_releases),
("user", Help.user),
("user:repos", self.user_repos),
("user:gists", self.user_gists),
("user:orgs", self.user_orgs),
("user:profile", self.user_profile),
("user:events", self.user_events),
("user:followers", self.user_followers),
("user:follows", self.user_follows),
("user:following", self.user_following),
("user:subscriptions", self.user_subscriptions),
("search", Help.search),
("search:users", self.user_search),
("search:repos", self.repo_search),
("search:topics", self.topic_search),
("search:issues", self.issue_search),
("search:commits", self.commits_search),
("logs", Help.logs),
("logs:view", self.view_logs),
("logs:read", self.read_log),
("logs:delete", self.delete_log),
("logs:clear", self.clear_logs),
("csv", Help.csv),
("csv:view", self.view_csv),
("csv:read", self.read_csv),
("csv:delete", self.delete_csv),
("csv:clear", self.clear_csv)]
# Path attribute
self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url']
@@ -286,12 +285,12 @@ class Octosuite:
"""
pathFinder()
path_finder()
This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders,
enabling logging to automatically log network/user activity to a file,
and logging the start of a session.
"""
def pathFinder(self):
def path_finder(self):
"""
Check 3 directories (.logs, output, downloads) on startup
If they exist, ignore, otherwise, create them
@@ -313,12 +312,10 @@ class Octosuite:
logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG)
# Log the start of a session
logging.info(logRoller.sessionOpened.format(platform.node(), getpass.getuser()))
logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser()))
"""
Check for updates
"""
# Check for updates
def check_updates(self):
response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json()
if response['tag_name'] == version_tag:
@@ -327,16 +324,16 @@ class Octosuite:
"""
pass
else:
xprint(f"{SignVar.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
"""
onStart()
on_start()
This is the main method, responsible for mapping commands, calling other methods, and catching exceptions
"""
def onStart(self):
self.pathFinder()
self.clearScreen()
def on_start(self):
self.path_finder()
self.clear_screen()
self.configure_logging()
self.check_updates()
xprint(ascii_banner())
@@ -362,60 +359,60 @@ class Octosuite:
# Fetching organization info
def orgProfile(self):
def org_profile(self):
xprint(f"{white}>> @{green}Organization {white}(username){reset} ", end="")
organization = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
org_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.org_attrs:
org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}")
xprint(org_profile_tree)
csvLogger.logOrgProfile(response)
CsvLogger.log_org_profile(response)
else:
xprint(response.json())
# Fetching user information
def userProfile(self):
def user_profile(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
response = requests.get(f"{self.endpoint}/users/{username}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
user_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.profile_attrs:
user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}")
xprint(user_profile_tree)
csvLogger.logUserProfile(response)
CsvLogger.log_user_profile(response)
else:
xprint(response.json())
# Fetching repository information
def repoProfile(self):
def repo_profile(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
username = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
repo_profile_tree = Tree("\n" + response.json()['full_name'])
for attr in self.repo_attrs:
repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}")
xprint(repo_profile_tree)
csvLogger.logRepoProfile(response)
CsvLogger.log_repo_profile(response)
else:
xprint(response.json())
# Get path contents
def pathContents(self):
def path_contents(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
@@ -424,101 +421,101 @@ class Octosuite:
path_name = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.infoNotFound.format(repo_name, username, path_name)}")
xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}")
elif response.status_code == 200:
for content_count, content in enumerate(response.json(), start=1):
path_contents_tree = Tree("\n" + content['name'])
for attr in self.path_attrs:
path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}")
xprint(path_contents_tree)
csvLogger.logRepoPathContents(content, repo_name)
xprint(SignVar.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
CsvLogger.log_repo_path_contents(content, repo_name)
xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
else:
xprint(response.json())
# repo contributors
def repoContributors(self):
def repo_contributors(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("contributors"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
for contributor in response.json():
contributor_tree = Tree("\n" + contributor['login'])
for attr in self.user_attrs:
contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}")
xprint(contributor_tree)
csvLogger.logRepoContributors(contributor, repo_name)
CsvLogger.log_repo_contributors(contributor, repo_name)
else:
xprint(response.json())
# repo stargazers
def repoStargazers(self):
def repo_stargazers(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository stargazers"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{SignVar.negative} Repository does not have any stargazers -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})")
elif response.status_code == 200:
for stargazer in response.json():
stargazer_tree = Tree("\n" + stargazer['login'])
for attr in self.user_attrs:
stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}")
xprint(stargazer_tree)
csvLogger.logRepoStargazers(stargazer, repo_name)
CsvLogger.log_repo_stargazers(stargazer, repo_name)
else:
xprint(response.json())
# repo forks
def repoForks(self):
def repo_forks(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository forks"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{SignVar.negative} Repository does not have forks -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})")
elif response.status_code == 200:
for count, fork in enumerate(response.json()):
fork_tree = Tree("\n" + fork['full_name'])
for attr in self.repo_attrs:
fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}")
xprint(fork_tree)
csvLogger.logRepoForks(fork, count)
CsvLogger.log_repo_forks(fork, count)
else:
xprint(response.json())
# Repo issues
def repoIssues(self):
def repo_issues(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository issues"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{SignVar.negative} Repository does not have open issues -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})")
elif response.status_code == 200:
for issue in response.json():
issues_tree = Tree("\n" + issue['title'])
@@ -526,24 +523,24 @@ class Octosuite:
issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issues_tree)
xprint(issue['body'])
csvLogger.logRepoIssues(issue, repo_name)
CsvLogger.log_repo_issues(issue, repo_name)
else:
xprint(response.json())
# Repo releases
def repoReleases(self):
def repo_releases(self):
xprint(f"{white}>> %{green}Repository{reset} ", end="")
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repository releases"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{SignVar.negative} Repository does not have releases -> ({repo_name})")
xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})")
elif response.status_code == 200:
for release in response.json():
releases_tree = Tree("\n" + release['name'])
@@ -551,40 +548,40 @@ class Octosuite:
releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}")
xprint(releases_tree)
xprint(release['body'])
csvLogger.logRepoReleases(release, repo_name)
CsvLogger.log_repo_releases(release, repo_name)
else:
xprint(response.json())
# Fetching organization repositories
def orgRepos(self):
def org_repos(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
csvLogger.logOrgRepos(repository, organization)
CsvLogger.log_org_repos(repository, organization)
else:
xprint(response.json())
# organization events
def orgEvents(self):
def org_events(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}")
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -592,97 +589,97 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
csvLogger.logOrgEvents(event, organization)
CsvLogger.log_org_events(event, organization)
else:
xprint(response.json())
# organization member
def orgMember(self):
def org_member(self):
xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="")
organization = input()
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}")
if response.status_code == 204:
xprint(f"{SignVar.positive} User ({username}) is a public member of the organization -> ({organization})")
xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})")
else:
xprint(f"{SignVar.negative} {response.json()['message']}")
xprint(f"{MessagePrefix.negative} {response.json()['message']}")
# Fetching user repositories
def userRepos(self):
def user_repos(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repositories"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
csvLogger.logUserRepos(repository, username)
CsvLogger.log_user_repos(repository, username)
else:
xprint(response.json())
# Fetching user's gists
def userGists(self):
def user_gists(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format('gists'), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User does not have gists.")
xprint(f"{MessagePrefix.negative} User does not have gists.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for gist in response.json():
gists_tree = Tree("\n" + gist['id'])
for attr in self.gists_attrs:
gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}")
xprint(gists_tree)
csvLogger.logUserGists(gist)
CsvLogger.log_user_gists(gist)
else:
xprint(response.json())
# Fetching a list of organizations that a user owns or belongs to
def userOrgs(self):
def user_orgs(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("user organizations"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username}) does not (belong to/own) any organizations.")
xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for organization in response.json():
org_tree = Tree("\n" + organization['login'])
for attr in self.user_orgs_attrs:
org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}")
xprint(org_tree)
csvLogger.logUserOrgs(organization, username)
CsvLogger.log_user_orgs(organization, username)
else:
xprint(response.json())
# Fetching a users events
def userEvents(self):
def user_events(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input()
xprint(SignVar.prompt, logRoller.limitInput.format("events"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}")
if response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -692,95 +689,95 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
csvLogger.logUserEvents(event)
CsvLogger.log_user_events(event)
else:
xprint(response.json())
# Fetching a target user's subscriptions
def userSubscriptions(self):
def user_subscriptions(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user subscriptions"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User does not have any subscriptions.")
xprint(f"{MessagePrefix.negative} User does not have any subscriptions.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for repository in response.json():
subscriptions_tree =Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(subscriptions_tree)
csvLogger.logUserSubscriptions(repository, username)
CsvLogger.log_user_subscriptions(repository, username)
else:
xprint(response.json())
# Fetching a list of users the target follows
def userFollowing(self):
def user_following(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user' following"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username})does not follow anyone.")
xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for user in response.json():
following_tree = Tree("\n" + user['login'])
for attr in self.user_attrs:
following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(following_tree)
csvLogger.logUserFollowing(user, username)
CsvLogger.log_user_following(user, username)
else:
xprint(response.json())
# Fetching user's followers
def userFollowers(self):
def user_followers(self):
xprint(f"{white}>> @{green}Username{reset} ", end="")
username = input().lower()
xprint(SignVar.prompt, logRoller.limitInput.format("user followers"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}")
if response.json() == []:
xprint(f"{SignVar.negative} User ({username})does not have followers.")
xprint(f"{MessagePrefix.negative} User ({username})does not have followers.")
elif response.status_code == 404:
xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}")
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
elif response.status_code == 200:
for follower in response.json():
followers_tree = Tree("\n" + follower['login'])
for attr in self.user_attrs:
followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}")
xprint(followers_tree)
csvLogger.logUserFollowers(follower, username)
CsvLogger.log_user_followers(follower, username)
else:
xprint(response.json())
# Checking whether user[A] follows user[B]
def userFollows(self):
def user_follows(self):
xprint(f"{white}>> @{green}user{white}(A) (username){reset} ", end="")
user_a = input()
xprint(f"{white}>> @{green}user{white}(B) (username){reset} ", end="")
user_b = input()
response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}")
if response.status_code == 204:
xprint(f"{SignVar.positive} @{user_a} FOLLOWS @{user_b}")
xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}")
else:
xprint(f"{SignVar.negative} @{user_a} DOES NOT FOLLOW @{user_b}")
xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}")
# User search
def userSearch(self):
def users_search(self):
xprint(f"{white}>> @{green}Query{white} (eg. john){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("user search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json()
for user in response['items']:
@@ -788,14 +785,14 @@ class Octosuite:
for attr in self.user_attrs:
user_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(user_search_tree)
csvLogger.logUserSearch(user, query)
CsvLogger.log_users_search(user, query)
# Repository search
def repoSearch(self):
def repos_search(self):
xprint(f"{white}>> %{green}Query{white} (eg. git){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("repositor[y][ies] search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json()
for repository in response['items']:
@@ -803,14 +800,14 @@ class Octosuite:
for attr in self.repo_attrs:
repo_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repo_search_tree)
csvLogger.logRepoSearch(repository, query)
CsvLogger.log_repos_search(repository, query)
# Topics search
def topicSearch(self):
def topics_search(self):
xprint(f"{white}>> #{green}Query{white} (eg. osint){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("topic(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json()
for topic in response['items']:
@@ -818,14 +815,14 @@ class Octosuite:
for attr in self.topic_attrs:
topic_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}")
xprint(topic_search_tree)
csvLogger.logTopicSearch(topic, query)
CsvLogger.log_topics_search(topic, query)
# Issue search
def issueSearch(self):
def issues_search(self):
xprint(f"{white}>> !{green}Query{white} (eg. error){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("issue(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json()
for issue in response['items']:
@@ -834,14 +831,14 @@ class Octosuite:
issue_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issue_search_tree)
xprint(issue['body'])
csvLogger.logIssueSearch(issue, query)
CsvLogger.log_issues_search(issue, query)
# Commits search
def commitsSearch(self):
def commits_search(self):
xprint(f"{white}>> :{green}Query{white} (eg. filename:index.php){reset} ", end="")
query = input()
xprint(SignVar.prompt, logRoller.limitInput.format("commit(s) search"), end="")
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json()
for commit in response['items']:
@@ -854,12 +851,12 @@ class Octosuite:
commits_search_tree.add(f"URL: {commit['html_url']}")
xprint(commits_search_tree)
xprint(commit['commit']['message'])
csvLogger.logCommitsSearch(commit, query)
CsvLogger.log_commits_search(commit, query)
# View csv files
def viewCsv(self):
logging.info(logRoller.viewingCsv)
def view_csv(self):
logging.info(LogRoller.viewing_csv)
csv_files = os.listdir("output")
csv_table = Table(show_header=True, header_style=header_title)
csv_table.add_column("CSV", style="dim")
@@ -870,39 +867,38 @@ class Octosuite:
# Read a specified csv file
def readCsv(self):
def read_csv(self):
xprint(f"{white}>> {green}.csv {reset}(filename) ", end="")
csv_file = input()
with open(os.path.join("output", csv_file + ".csv"), "r") as file:
logging.info(logRoller.readingCsv.format(csv_file))
logging.info(LogRoller.reading_csv.format(csv_file))
text = Text(file.read())
xprint(text)
# xprint("\n" + file.read())
# Delete a specified csv file
def deleteCsv(self):
def delete_csv(self):
xprint(f"{white}>> {green}.csv {reset}filename{reset} ", end="")
csv_file = input()
os.remove(os.path.join("output", csv_file))
logging.info(logRoller.deletedCsv.format(csv_file))
xprint(f"{SignVar.positive} {logRoller.deletedCsv.format(csv_file)}")
logging.info(LogRoller.deleted_csv.format(csv_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}")
# Clear csv
def clearCsv(self):
xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (y/n) ", end="")
def clear_csv(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "y":
if prompt == "yes":
shutil.rmtree("output", ignore_errors=True)
xprint(f"{SignVar.info} CSV files cleared successfully!")
xprint(f"{MessagePrefix.info} CSV files cleared successfully!")
else:
pass
# View octosuite log files
def viewLogs(self):
logging.info(logRoller.viewingLogs)
def view_logs(self):
logging.info(LogRoller.viewing_logs)
logs = os.listdir(".logs")
logs_table = Table(show_header=True, header_style=header_title)
logs_table.add_column("Log", style="dim")
@@ -913,60 +909,60 @@ class Octosuite:
# Read a specified log file
def readLog(self):
def read_log(self):
xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="")
log_file = input()
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
logging.info(logRoller.readingLog.format(log_file))
logging.info(LogRoller.reading_log.format(log_file))
xprint("\n" + log.read())
# Delete a specified log file
def deleteLog(self):
def delete_log(self):
xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="")
log_file = input()
os.remove(os.path.join(".logs", log_file))
logging.info(logRoller.deletedLog.format(log_file))
xprint(f"{SignVar.positive} {logRoller.deletedLog.format(log_file)}")
logging.info(LogRoller.deleted_log.format(log_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}")
# Clear logs
def clearLogs(self):
xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (y/n) ", end="")
def clear_logs(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "y":
if prompt == "yes":
shutil.rmtree(".logs", ignore_errors=True)
xprint(f"{SignVar.info} Logs cleared successfully!")
xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}")
xprint(f"{MessagePrefix.info} Logs cleared successfully!")
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
exit()
else:
pass
# Downloading release tarball
def downloadTarball(self):
logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar"))
xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar"))
def download_tarball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file:
file.write(data.content)
file.close()
logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar"))
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
# Downloading release zipball
def downloadZipball(self):
logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip"))
xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip"))
def download_zipball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file:
file.write(data.content)
file.close()
logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip"))
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
# Author info
@@ -979,45 +975,41 @@ class Octosuite:
# About program
def about(self):
about_text = Text(f"""
OCTOSUITE © 2022 Richard Mwewa
about_text = f"""
OCTOSUITE © 2023 Richard Mwewa
An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations.
With over 20+ features, Octosuite only runs on 2 external dependencies, and returns the gathered intelligence in a highly readable format.
An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations, with over 20+ features.
Whats new in v{version_tag}?
[fixed] Minor fixes
[improved] Removed width from tables, so that they can auto adjust
[added] Added the 'log:clear' command, which will be used to clear all logs
[added] Added the 'csv:clear' command, which will be used to clear all csv files
[{green}fixed{reset}] Minor fixes
[{green}improved{reset}] Removed width from tables, so that they can auto adjust
[{green}added{reset}] Added the 'log:clear' command, which will be used to clear all logs
[{green}added{reset}] Added the 'csv:clear' command, which will be used to clear all csv files
Read the wiki: https://github.com/bellingcat/octosuite/wiki
GitHub REST API documentation: https://docs.github.com/rest
""")
"""
xprint(about_text)
# Close session
def exitSession(self):
xprint(f"{SignVar.prompt} This will close the current session, continue? (Y/n) ", end="")
def exit_session(self):
xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == 'y':
logging.info(logRoller.sessionClosed.format(datetime.now()))
xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}")
if prompt == "yes":
logging.info(LogRoller.session_closed.format(datetime.now()))
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
exit()
else:
pass
# Clear screen
def clearScreen(self):
def clear_screen(self):
"""
using 'cls' on Windows machines to clear the screen,
otherwise, use 'clear'
"""
if sys.platform.lower().startswith("win"):
os.system('cls')
else:
os.system('clear')
os.system('cls' if os.name == 'nt' else 'clear')