Update octosuite

This commit is contained in:
Richard Mwewa
2022-11-30 02:37:07 +02:00
committed by GitHub
parent 9984c71a57
commit 558715fdab
15 changed files with 864 additions and 830 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -1,83 +0,0 @@
Metadata-Version: 2.1
Name: octosuite
Version: 3.0.0
Summary: Advanced Github OSINT Framework
Home-page: https://github.com/bellingcat/octosuite
Author: Richard Mwewa
Author-email: rly0nheart@duck.com
License: GNU General Public License v3 (GPLv3)
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Information Technology
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3
Description-Content-Type: text/markdown
License-File: LICENSE
![logo](https://user-images.githubusercontent.com/74001397/175805580-fffc96d4-e0ef-48bb-a55c-80b2da3e714d.png)
A framework fro gathering osint on GitHub users, repositories and organizations
[![Upload Python Package](https://github.com/bellingcat/octosuite/actions/workflows/python-publish.yml/badge.svg)](https://github.com/bellingcat/octosuite/actions/workflows/python-publish.yml)
[![CodeQL](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml/badge.svg)](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml)
![GitHub](https://img.shields.io/github/license/bellingcat/octosuite?style=flat)
![PyPI](https://img.shields.io/pypi/v/octosuite?style=flat&logo=pypi)
![PyPI - Downloads](https://img.shields.io/pypi/dw/octosuite?style=flat&logo=pypi)
![PyPI - Status](https://img.shields.io/pypi/status/octosuite?style=flat&logo=pypi)
![GitHub repo size](https://img.shields.io/github/repo-size/bellingcat/octosuite?style=flat&logo=github)
![octosuite_gui_exe (2)](https://user-images.githubusercontent.com/74001397/186889610-4530ee26-d3c6-46fc-8c92-8709f89617fd.png "Octosuite' about window")
![octosuite_gui_exe (4)](https://user-images.githubusercontent.com/74001397/186889897-c1c17fac-fddc-4967-9084-39cfe2d1307f.png "Octosuite user profile window")
# Wiki
[Refer to the Wiki](https://github.com/bellingcat/octosuite/wiki) for installation instructions, in addition to all other documentation.
# Features
- [x] Fetches an organization's profile information
- [x] Fetches an oganization's events
- [x] Returns an organization's repositories
- [x] Returns an organization's public members
- [x] Fetches a repository's information
- [x] Returns a repository's contributors
- [x] Returns a repository's languages
- [x] Fetches a repository's stargazers
- [x] Fetches a repository's forks
- [x] Fetches a repository's releases
- [x] Returns a list of files in a specified path of a repository
- [x] Fetches a user's profile information
- [x] Returns a user's gists
- [x] Returns organizations that a user owns/belongs to
- [x] Fetches a user's events
- [x] Fetches a list of users followed by the target
- [x] Fetches a user's followers
- [x] Checks if user A follows user B
- [x] Checks if user is a public member of an organizations
- [x] Returns a user's subscriptions
- [x] Gets a user's subscriptions
- [x] Gets a user's events
- [x] Searches users
- [x] Searches repositories
- [x] Searches topics
- [x] Searches issues
- [x] Searches commits
- [x] Automatically logs network activity (.logs folder)
- [x] User can view, read and delete logs
- [x] ...And more
## Note
> Octosuite automatically logs network and user activity of each session, the logs are saved by date and time in the .logs folder
# License
![license](https://user-images.githubusercontent.com/74001397/137917929-2f2cdb0c-4d1d-4e4b-9f0d-e01589e027b5.png)
# Donations
If you like Octosuite and would like to show support, you can Buy A Coffee for the developer using the button below
<a href="https://www.buymeacoffee.com/189381184" target="_blank"><img src="https://cdn.buymeacoffee.com/buttons/default-orange.png" alt="Buy Me A Coffee" height="41" width="174"></a>
Your support will be much appreciated😊

View File

@@ -1,18 +0,0 @@
LICENSE
README.md
setup.py
octosuite/__init__.py
octosuite/banners.py
octosuite/colors.py
octosuite/csv_loggers.py
octosuite/helper.py
octosuite/log_roller.py
octosuite/main.py
octosuite/message_prefixes.py
octosuite/octosuite.py
octosuite.egg-info/PKG-INFO
octosuite.egg-info/SOURCES.txt
octosuite.egg-info/dependency_links.txt
octosuite.egg-info/entry_points.txt
octosuite.egg-info/requires.txt
octosuite.egg-info/top_level.txt

View File

@@ -1 +0,0 @@

View File

@@ -1,2 +0,0 @@
[console_scripts]
octosuite = octosuite.main:octosuite

View File

@@ -1,3 +0,0 @@
requests
rich
psutil

View File

@@ -1 +0,0 @@
octosuite

View File

@@ -29,7 +29,7 @@ xprint(system_tree)
print("\n")
while True:
try:
color_chooser = input(f"[?] Welcome, would you like to enable colors for this session? (yes/no) ").lower()
color_chooser = input(f"[PROMPT] Welcome, would you like to enable colors for this session? (yes/no) ").lower()
if color_chooser == "yes":
header_title = "bold white"
red = "[red]"
@@ -44,8 +44,8 @@ while True:
header_title = red = white = green = red_bold = white_bold = green_bold = reset = ""
break
else:
print(f"\n[!] Your response '{color_chooser}' is invalid (expected yes or no)")
print(f"\n[INVALID] Your response '{color_chooser}' is invalid (expected yes or no)")
except KeyboardInterrupt:
exit(f"[!] Process interrupted with Ctrl+C.")
exit(f"[WARNING] Process interrupted with Ctrl+C.")

File diff suppressed because it is too large Load Diff

View File

@@ -2,163 +2,168 @@ from rich.table import Table
from rich import print as xprint
from octosuite.colors import white, green, white_bold, green_bold, header_title, reset
# Help
# This class holds the help text for available commands.
class Help:
usage_text = 'Use syntax {} to get started with %s{}%s.' % (green_bold, reset)
usage_text_1 = '%sUse {} to view all available subcommands.%s' % (white, reset)
usage_text_2 = "%sThe {} command works with subcommands. %s" % (white, reset)
# helper.py
# This file holds the help text for available commands.
usage_text = 'Use syntax {} to get started with %s{}%s.' % (green_bold, reset)
usage_text_1 = '%sUse {} to view all available subcommands.%s' % (white, reset)
usage_text_2 = "%sThe {} command works with subcommands. %s" % (white, reset)
def org():
xprint(
Help.usage_text_2.format(f"{green_bold}org{reset}") + Help.usage_text_1.format(
f"{green_bold}help:org{reset}"))
def repo():
xprint(Help.usage_text_2.format(f"{green_bold}repo{reset}") + Help.usage_text_1.format(
f"{green_bold}help:repo{reset}"))
def org():
xprint(usage_text_2.format(f"{green_bold}org{reset}") + usage_text_1.format(f"{green_bold}help:org{reset}"))
def user():
xprint(Help.usage_text_2.format(f"{green_bold}user{reset}") + Help.usage_text_1.format(
f"{green_bold}help:user{reset}"))
def search():
xprint(Help.usage_text_2.format(f"{green_bold}search{reset}") + Help.usage_text_1.format(
f"{green_bold}help:search{reset}"))
def repo():
xprint(usage_text_2.format(f"{green_bold}repo{reset}") + usage_text_1.format(f"{green_bold}help:repo{reset}"))
def source():
xprint(Help.usage_text_2.format(f"{green_bold}source{reset}") + Help.usage_text_1.format(
f"{green_bold}help:source{reset}"))
def logs():
xprint(Help.usage_text_2.format(f"{green_bold}logs{reset}") + Help.usage_text_1.format(
f"{green_bold}help:logs{reset}"))
def user():
xprint(usage_text_2.format(f"{green_bold}user{reset}") + usage_text_1.format(f"{green_bold}help:user{reset}"))
def csv():
xprint(
Help.usage_text_2.format(f"{green_bold}csv{reset}") + Help.usage_text_1.format(
f"{green_bold}help:csv{reset}"))
def source_command():
source_cmd_table = Table(show_header=True, header_style=header_title)
source_cmd_table.add_column("Command", style="dim")
source_cmd_table.add_column("Description")
source_cmd_table.add_row("zipball", "Download source code Zipball")
source_cmd_table.add_row("tarball", "Download source code Tarball")
def search():
xprint(usage_text_2.format(f"{green_bold}search{reset}") + usage_text_1.format(f"{green_bold}help:search{reset}"))
syntax = f"{green}source:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'source code downloads')}")
xprint(source_cmd_table)
def search_command():
search_cmd_table = Table(show_header=True, header_style=header_title)
search_cmd_table.add_column("Command", style="dim")
search_cmd_table.add_column("Description")
search_cmd_table.add_row("users", "Search user(s)")
search_cmd_table.add_row("repos", "Search repositor[y][ies]")
search_cmd_table.add_row("topics", "Search topic(s)")
search_cmd_table.add_row("issues", "Search issue(s)")
search_cmd_table.add_row("commits", "Search commit(s)")
def source():
xprint(usage_text_2.format(f"{green_bold}source{reset}") + usage_text_1.format(f"{green_bold}help:source{reset}"))
syntax = f"{green}search:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'target discovery')}")
xprint(search_cmd_table)
def user_command():
user_cmd_table = Table(show_header=True, header_style=header_title)
user_cmd_table.add_column("Command", style="dim")
user_cmd_table.add_column("Description")
user_cmd_table.add_row("profile", "Get a target's profile info")
user_cmd_table.add_row("gists", "Return a users's gists")
user_cmd_table.add_row("org", "Return organizations that a target belongs to/owns")
user_cmd_table.add_row("repos", "Return a target's repositories")
user_cmd_table.add_row("events", "Return a target's events")
user_cmd_table.add_row("follows", "Check if user(A) follows user(B)")
user_cmd_table.add_row("followers", "Return a target's followers")
user_cmd_table.add_row("following", "Return a list of users the target is following")
user_cmd_table.add_row("subscriptions", "Return a target's subscriptions")
def logs():
xprint(usage_text_2.format(f"{green_bold}logs{reset}") + usage_text_1.format(f"{green_bold}help:logs{reset}"))
syntax = f"{green}user:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'user investigation(s)')}")
xprint(user_cmd_table)
def org_command():
org_cmd_table = Table(show_header=True, header_style=header_title)
org_cmd_table.add_column("Command", style="dim")
org_cmd_table.add_column("Description")
org_cmd_table.add_row("profile", "Get a target organization' profile info")
org_cmd_table.add_row("repos", "Return a target organization' repositories")
org_cmd_table.add_row("events", "Return a target organization' events")
org_cmd_table.add_row("member", "Check if a specified user is a public member of the target organization")
def csv():
xprint(usage_text_2.format(f"{green_bold}csv{reset}") + usage_text_1.format(f"{green_bold}help:csv{reset}"))
syntax = f"{green}org:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'organization investigation(s)')}")
xprint(org_cmd_table)
def repo_command():
repo_cmd_table = Table(show_header=True, header_style=header_title)
repo_cmd_table.add_column("Command", style="dim")
repo_cmd_table.add_column("Description")
repo_cmd_table.add_row("profile", "Get a repository's info")
repo_cmd_table.add_row("issues", "Return a repository's issues")
repo_cmd_table.add_row("forks", "Return a repository's forks")
repo_cmd_table.add_row("releases", "Return a repository's releases")
repo_cmd_table.add_row("stargazers", "Return a repository's stargazers")
repo_cmd_table.add_row("contributors", "Return a repository's contributors")
repo_cmd_table.add_row("path_contents", "List contents in a path of a repository")
def source_command():
source_cmd_table = Table(show_header=True, header_style=header_title)
source_cmd_table.add_column("Command", style="dim")
source_cmd_table.add_column("Description")
source_cmd_table.add_row("zipball", "Download source code Zipball")
source_cmd_table.add_row("tarball", "Download source code Tarball")
syntax = f"{green}repo:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'repository investigation(s)')}")
xprint(repo_cmd_table)
syntax = f"{green}source:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'source code downloads')}")
xprint(source_cmd_table)
def logs_command():
logs_cmd_table = Table(show_header=True, header_style=header_title)
logs_cmd_table.add_column("Command", style="dim")
logs_cmd_table.add_column("Description")
logs_cmd_table.add_row("view", "View logs")
logs_cmd_table.add_row("read", "Read log")
logs_cmd_table.add_row("delete", "Delete log")
logs_cmd_table.add_row("clear", "clear logs")
syntax = f"{green}logs:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'log(s) management')}")
xprint(logs_cmd_table)
def search_command():
search_cmd_table = Table(show_header=True, header_style=header_title)
search_cmd_table.add_column("Command", style="dim")
search_cmd_table.add_column("Description")
search_cmd_table.add_row("users", "Search user(s)")
search_cmd_table.add_row("repos", "Search repositor[y][ies]")
search_cmd_table.add_row("topics", "Search topic(s)")
search_cmd_table.add_row("issues", "Search issue(s)")
search_cmd_table.add_row("commits", "Search commit(s)")
def csv_command():
csv_cmd_table = Table(show_header=True, header_style=header_title)
csv_cmd_table.add_column("Command", style="dim")
csv_cmd_table.add_column("Description")
csv_cmd_table.add_row("view", "View csv files")
csv_cmd_table.add_row("read", "Read csv")
csv_cmd_table.add_row("delete", "Delete csv")
csv_cmd_table.add_row("clear", "clear csv files")
syntax = f"{green}search:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'target discovery')}")
xprint(search_cmd_table)
syntax = f"{green}csv:<command>{reset}"
xprint(f"{Help.usage_text.format(syntax, 'csv management')}")
xprint(csv_cmd_table)
def help_command():
core_cmd_table = Table(show_header=True, header_style=header_title)
core_cmd_table.add_column("Command", style="dim", width=12)
core_cmd_table.add_column("Description")
core_cmd_table.add_row("help", "Help menu")
core_cmd_table.add_row("exit", "Close session")
core_cmd_table.add_row("clear", "Clear screen")
core_cmd_table.add_row("about", "Program's info")
core_cmd_table.add_row("author", "Developer's info")
def user_command():
user_cmd_table = Table(show_header=True, header_style=header_title)
user_cmd_table.add_column("Command", style="dim")
user_cmd_table.add_column("Description")
user_cmd_table.add_row("profile", "Get a target's profile info")
user_cmd_table.add_row("gists", "Return a users's gists")
user_cmd_table.add_row("org", "Return organizations that a target belongs to/owns")
user_cmd_table.add_row("repos", "Return a target's repositories")
user_cmd_table.add_row("events", "Return a target's events")
user_cmd_table.add_row("follows", "Check if user(A) follows user(B)")
user_cmd_table.add_row("followers", "Return a target's followers")
user_cmd_table.add_row("following", "Return a list of users the target is following")
user_cmd_table.add_row("subscriptions", "Return a target's subscriptions")
help_sub_cmd_table = Table(show_header=True, header_style=header_title)
help_sub_cmd_table.add_column("Command", style="dim", width=12)
help_sub_cmd_table.add_column("Description")
help_sub_cmd_table.add_row("csv", "List all csv management commands")
help_sub_cmd_table.add_row("logs", "List all logs management commands")
help_sub_cmd_table.add_row("org", "List all organization investigation commands")
help_sub_cmd_table.add_row("user", "List all users investigation commands")
help_sub_cmd_table.add_row("repo", "List all repository investigation commands")
help_sub_cmd_table.add_row("search", "List all target discovery commands")
help_sub_cmd_table.add_row("source", "List all source code download commands (for developers)")
syntax = f"{green}user:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'user investigation(s)')}")
xprint(user_cmd_table)
syntax = f"{green}help:<command>{reset}"
xprint(core_cmd_table)
xprint(f"\n\n{Help.usage_text.format(syntax, 'octosuite')}")
xprint(help_sub_cmd_table)
def org_command():
org_cmd_table = Table(show_header=True, header_style=header_title)
org_cmd_table.add_column("Command", style="dim")
org_cmd_table.add_column("Description")
org_cmd_table.add_row("profile", "Get a target organization' profile info")
org_cmd_table.add_row("repos", "Return a target organization' repositories")
org_cmd_table.add_row("events", "Return a target organization' events")
org_cmd_table.add_row("member", "Check if a specified user is a public member of the target organization")
syntax = f"{green}org:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'organization investigation(s)')}")
xprint(org_cmd_table)
def repo_command():
repo_cmd_table = Table(show_header=True, header_style=header_title)
repo_cmd_table.add_column("Command", style="dim")
repo_cmd_table.add_column("Description")
repo_cmd_table.add_row("profile", "Get a repository's info")
repo_cmd_table.add_row("issues", "Return a repository's issues")
repo_cmd_table.add_row("forks", "Return a repository's forks")
repo_cmd_table.add_row("releases", "Return a repository's releases")
repo_cmd_table.add_row("stargazers", "Return a repository's stargazers")
repo_cmd_table.add_row("contributors", "Return a repository's contributors")
repo_cmd_table.add_row("path_contents", "List contents in a path of a repository")
syntax = f"{green}repo:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'repository investigation(s)')}")
xprint(repo_cmd_table)
def logs_command():
logs_cmd_table = Table(show_header=True, header_style=header_title)
logs_cmd_table.add_column("Command", style="dim")
logs_cmd_table.add_column("Description")
logs_cmd_table.add_row("view", "View logs")
logs_cmd_table.add_row("read", "Read log")
logs_cmd_table.add_row("delete", "Delete log")
logs_cmd_table.add_row("clear", "clear logs")
syntax = f"{green}logs:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'log(s) management')}")
xprint(logs_cmd_table)
def csv_command():
csv_cmd_table = Table(show_header=True, header_style=header_title)
csv_cmd_table.add_column("Command", style="dim")
csv_cmd_table.add_column("Description")
csv_cmd_table.add_row("view", "View csv files")
csv_cmd_table.add_row("read", "Read csv")
csv_cmd_table.add_row("delete", "Delete csv")
csv_cmd_table.add_row("clear", "clear csv files")
syntax = f"{green}csv:<command>{reset}"
xprint(f"{usage_text.format(syntax, 'csv management')}")
xprint(csv_cmd_table)
def help_command():
core_cmd_table = Table(show_header=True, header_style=header_title)
core_cmd_table.add_column("Command", style="dim", width=12)
core_cmd_table.add_column("Description")
core_cmd_table.add_row("help", "Help menu")
core_cmd_table.add_row("exit", "Close session")
core_cmd_table.add_row("clear", "Clear screen")
core_cmd_table.add_row("about", "Program's info")
core_cmd_table.add_row("author", "Developer's info")
help_sub_cmd_table = Table(show_header=True, header_style=header_title)
help_sub_cmd_table.add_column("Command", style="dim", width=12)
help_sub_cmd_table.add_column("Description")
help_sub_cmd_table.add_row("csv", "List all csv management commands")
help_sub_cmd_table.add_row("logs", "List all logs management commands")
help_sub_cmd_table.add_row("org", "List all organization investigation commands")
help_sub_cmd_table.add_row("user", "List all users investigation commands")
help_sub_cmd_table.add_row("repo", "List all repository investigation commands")
help_sub_cmd_table.add_row("search", "List all target discovery commands")
help_sub_cmd_table.add_row("source", "List all source code download commands (for developers)")
syntax = f"{green}help:<command>{reset}"
xprint(core_cmd_table)
xprint(f"\n\n{usage_text.format(syntax, 'octosuite')}")
xprint(help_sub_cmd_table)

View File

@@ -2,24 +2,23 @@
# cases (they're being used by logging to be written to log files, and being printed out to the screen).
class LogRoller:
ctrl_c = "Session terminated with Ctrl+C."
error = "An error occurred: {}"
session_opened = "Opened new session on {}:{}"
session_closed = "Session closed at {}."
viewing_logs = "Viewing logs"
viewing_csv = "Viewing CSV file(s)..."
deleted_log = "Deleted log: {}"
reading_log = "Reading log: {}"
reading_csv = 'Reading csv: {}'
deleted_csv = 'Deleted csv: {}'
file_downloading = "Downloading: {}"
file_downloaded = "Downloaded: downloads/{}"
info_not_found = "Information not found: {}, {}, {}"
user_not_found = "User not found: @{}"
org_not_found = "Organization not found: @{}"
repo_or_user_not_found = "Repository or User not found: {}, @{}"
prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) "
logged_to_csv = "Output logged: {}"
logging_skipped = "Logging skipped: {}"
limit_output = "Limit '{}' output to how many? (1-100) "
ctrl_c = "Session terminated with Ctrl+C."
error = "An error occurred: {}"
session_opened = "Opened new session on {}:{}"
session_closed = "Session closed at {}."
viewing_logs = "Viewing logs"
viewing_csv = "Viewing CSV file(s)..."
deleted_log = "Deleted log: {}"
reading_log = "Reading log: {}"
reading_csv = 'Reading csv: {}'
deleted_csv = 'Deleted csv: {}'
file_downloading = "Downloading: {}"
file_downloaded = "Downloaded: downloads/{}"
info_not_found = "Information not found: {}, {}, {}"
user_not_found = "User not found: @{}"
org_not_found = "Organization not found: @{}"
repo_or_user_not_found = "Repository or User not found: {}, @{}"
prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) "
logged_to_csv = "Output logged: {}"
logging_skipped = "Logging skipped: {}"
limit_output = "Limit '{}' output to how many? (1-100) "

View File

@@ -1,5 +1,5 @@
# import everything from the octosuite.py file
from octosuite.octosuite import *
from octosuite.octosuite import * # I drifted away from the 'pythonic way' here
def octosuite():
@@ -8,9 +8,9 @@ def octosuite():
run.on_start()
except KeyboardInterrupt:
logging.warning(LogRoller.ctrl_c)
xprint(f"{MessagePrefix.warning} {LogRoller.ctrl_c}")
logging.warning(ctrl_c)
xprint(f"{WARNING} {ctrl_c}")
except Exception as e:
logging.error(LogRoller.error.format(e))
xprint(f"{MessagePrefix.error} {LogRoller.error.format(e)}")
logging.error(error.format(e))
xprint(f"{ERROR} {error.format(e)}")

View File

@@ -1,14 +1,13 @@
from octosuite.colors import red, white, green, reset
"""
MessagePrefix *Even here, I couldn't think of a good name.* The Attributes class holds the signs/symbols that show what
message prefixes that show what
a notification in OctoSuite might be all about. This might not be very important or necessary in some cases,
but I think it's better to know the severity of the notifications you get in a program.
but I think it's better to know the severity of the notifications you get in a program.
"""
class MessagePrefix:
prompt = f"{white}[{green}?{white}]{reset}"
warning = f"{white}[{red}!{white}]{reset}"
error = f"{white}[{red}x{white}]{reset}"
positive = f"{white}[{green}+{white}]{reset}"
negative = f"{white}[{red}-{white}]{reset}"
info = f"{white}[{green}*{white}]{reset}"
PROMPT = f"{white}[{green}PROMPT{white}]{reset}"
WARNING = f"{white}[{red}WARNING{white}]{reset}"
ERROR = f"{white}[{red}ERROR{white}]{reset}"
POSITIVE = f"{white}[{green}POSITIVE{white}]{reset}"
NEGATIVE = f"{white}[{red}NEGATIVE{white}]{reset}"
INFO = f"{white}[{green}INFO{white}]{reset}"

View File

@@ -12,12 +12,13 @@ from rich.tree import Tree
from rich.table import Table
from datetime import datetime
from rich import print as xprint
from octosuite.helper import Help
from octosuite.log_roller import LogRoller
from octosuite.csv_loggers import CsvLogger
from octosuite.message_prefixes import MessagePrefix
from octosuite.banners import version_tag, ascii_banner
from octosuite.message_prefixes import PROMPT, WARNING, ERROR, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol
from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset
# seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :)
from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv
from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, deleted_log, reading_log, reading_csv, deleted_csv, file_downloading, file_downloaded, info_not_found, user_not_found, org_not_found, repo_or_user_not_found, prompt_log_csv, logged_to_csv, logging_skipped, limit_output
from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, log_repo_contributors, log_repo_startgazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, log_org_events, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, log_commits_search
class Octosuite:
@@ -30,23 +31,23 @@ class Octosuite:
("clear", self.clear_screen),
("about", self.about),
("author", self.author),
("help", Help.help_command),
("help:source", Help.source_command),
("help:search", Help.search_command),
("help:user", Help.user_command),
("help:repo", Help.repo_command),
("help:logs", Help.logs_command),
("help:csv", Help.csv_command),
("help:org", Help.org_command),
("source", Help.source),
("help", help_command),
("help:source", source_command),
("help:search", search_command),
("help:user", user_command),
("help:repo", repo_command),
("help:logs", logs_command),
("help:csv", csv_command),
("help:org", org_command),
("source", source),
("source:tarball", self.download_tarball),
("source:zipball", self.download_zipball),
("org", Help.org),
("org", org),
("org:events", self.org_events),
("org:profile", self.org_profile),
("org:repos", self.org_repos),
("org:member", self.org_member),
("repo", Help.repo),
("repo", repo),
("repo:path_contents", self.path_contents),
("repo:profile", self.repo_profile),
("repo:contributors", self.repo_contributors),
@@ -54,7 +55,7 @@ class Octosuite:
("repo:forks", self.repo_forks),
("repo:issues", self.repo_issues),
("repo:releases", self.repo_releases),
("user", Help.user),
("user", user),
("user:repos", self.user_repos),
("user:gists", self.user_gists),
("user:orgs", self.user_orgs),
@@ -64,18 +65,18 @@ class Octosuite:
("user:follows", self.user_follows),
("user:following", self.user_following),
("user:subscriptions", self.user_subscriptions),
("search", Help.search),
("search", search),
("search:users", self.users_search),
("search:repos", self.repos_search),
("search:topics", self.topics_search),
("search:issues", self.issues_search),
("search:commits", self.commits_search),
("logs", Help.logs),
("logs", logs),
("logs:view", self.view_logs),
("logs:read", self.read_log),
("logs:delete", self.delete_log),
("logs:clear", self.clear_logs),
("csv", Help.csv),
("csv", csv),
("csv:view", self.view_csv),
("csv:read", self.read_csv),
("csv:delete", self.delete_csv),
@@ -313,7 +314,7 @@ class Octosuite:
logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG)
# Log the start of a session
logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser()))
logging.info(session_opened.format(platform.node(), getpass.getuser()))
# Check for updates
@@ -365,13 +366,13 @@ class Octosuite:
organization = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
elif response.status_code == 200:
org_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.org_attrs:
org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}")
xprint(org_profile_tree)
CsvLogger.log_org_profile(response)
log_org_profile(response)
else:
xprint(response.json())
@@ -382,13 +383,13 @@ class Octosuite:
username = input()
response = requests.get(f"{self.endpoint}/users/{username}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
user_profile_tree = Tree("\n" + response.json()['name'])
for attr in self.profile_attrs:
user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}")
xprint(user_profile_tree)
CsvLogger.log_user_profile(response)
log_user_profile(response)
else:
xprint(response.json())
@@ -401,13 +402,13 @@ class Octosuite:
username = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
repo_profile_tree = Tree("\n" + response.json()['full_name'])
for attr in self.repo_attrs:
repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}")
xprint(repo_profile_tree)
CsvLogger.log_repo_profile(response)
log_repo_profile(response)
else:
xprint(response.json())
@@ -422,15 +423,15 @@ class Octosuite:
path_name = input()
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}")
xprint(f"{NEGATIVE} {info_not_found.format(repo_name, username, path_name)}")
elif response.status_code == 200:
for content_count, content in enumerate(response.json(), start=1):
path_contents_tree = Tree("\n" + content['name'])
for attr in self.path_attrs:
path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}")
xprint(path_contents_tree)
CsvLogger.log_repo_path_contents(content, repo_name)
xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
log_repo_path_contents(content, repo_name)
xprint(INFO, f"Found {content_count} file(s) in {repo_name}/{path_name}.")
else:
xprint(response.json())
@@ -441,18 +442,18 @@ class Octosuite:
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username) ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="")
xprint(PROMPT, limit_output.format("contributors"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.status_code == 200:
for contributor in response.json():
contributor_tree = Tree("\n" + contributor['login'])
for attr in self.user_attrs:
contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}")
xprint(contributor_tree)
CsvLogger.log_repo_contributors(contributor, repo_name)
log_repo_contributors(contributor, repo_name)
else:
xprint(response.json())
@@ -463,20 +464,20 @@ class Octosuite:
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="")
xprint(PROMPT, limit_output.format("repository stargazers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})")
xprint(f"{NEGATIVE} Repository does not have any stargazers -> ({repo_name})")
elif response.status_code == 200:
for stargazer in response.json():
stargazer_tree = Tree("\n" + stargazer['login'])
for attr in self.user_attrs:
stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}")
xprint(stargazer_tree)
CsvLogger.log_repo_stargazers(stargazer, repo_name)
log_repo_stargazers(stargazer, repo_name)
else:
xprint(response.json())
@@ -487,20 +488,20 @@ class Octosuite:
repo_name = input()
xprint(f"{white}@{green}Owner{white} (username):{reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="")
xprint(PROMPT, limit_output.format("repository forks"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == {}:
xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})")
xprint(f"{NEGATIVE} Repository does not have forks -> ({repo_name})")
elif response.status_code == 200:
for count, fork in enumerate(response.json()):
fork_tree = Tree("\n" + fork['full_name'])
for attr in self.repo_attrs:
fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}")
xprint(fork_tree)
CsvLogger.log_repo_forks(fork, count)
log_repo_forks(fork, count)
else:
xprint(response.json())
@@ -510,13 +511,13 @@ class Octosuite:
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="")
xprint(PROMPT, limit_output.format("repository issues"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})")
xprint(f"{NEGATIVE} Repository does not have open issues -> ({repo_name})")
elif response.status_code == 200:
for issue in response.json():
issues_tree = Tree("\n" + issue['title'])
@@ -524,7 +525,7 @@ class Octosuite:
issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issues_tree)
xprint(issue['body'])
CsvLogger.log_repo_issues(issue, repo_name)
log_repo_issues(issue, repo_name)
else:
xprint(response.json())
@@ -535,13 +536,13 @@ class Octosuite:
repo_name = input()
xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="")
xprint(PROMPT, limit_output.format("repository releases"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}")
xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}")
elif response.json() == []:
xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})")
xprint(f"{NEGATIVE} Repository does not have releases -> ({repo_name})")
elif response.status_code == 200:
for release in response.json():
releases_tree = Tree("\n" + release['name'])
@@ -549,7 +550,7 @@ class Octosuite:
releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}")
xprint(releases_tree)
xprint(release['body'])
CsvLogger.log_repo_releases(release, repo_name)
log_repo_releases(release, repo_name)
else:
xprint(response.json())
@@ -558,18 +559,18 @@ class Octosuite:
def org_repos(self):
xprint(f"{white}@{green}organization{white} (username):{reset} ", end="")
organization = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
xprint(PROMPT, limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
CsvLogger.log_org_repos(repository, organization)
log_org_repos(repository, organization)
else:
xprint(response.json())
@@ -578,11 +579,11 @@ class Octosuite:
def org_events(self):
xprint(f"{white}@{green}organization{white} (username):{reset} ", end="")
organization = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="")
xprint(PROMPT, limit_output.format("organization repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}")
xprint(f"{NEGATIVE} {org_not_found.format(organization)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -590,7 +591,7 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
CsvLogger.log_org_events(event, organization)
log_org_events(event, organization)
else:
xprint(response.json())
@@ -603,27 +604,27 @@ class Octosuite:
username = input()
response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}")
if response.status_code == 204:
xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})")
xprint(f"{POSITIVE} User ({username}) is a public member of the organization -> ({organization})")
else:
xprint(f"{MessagePrefix.negative} {response.json()['message']}")
xprint(f"{NEGATIVE} {response.json()['message']}")
# Fetching user repositories
def user_repos(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="")
xprint(PROMPT, limit_output.format("repositories"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for repository in response.json():
repos_tree = Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_tree)
CsvLogger.log_user_repos(repository, username)
log_user_repos(repository, username)
else:
xprint(response.json())
@@ -632,20 +633,20 @@ class Octosuite:
def user_gists(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="")
xprint(PROMPT, limit_output.format('gists'), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}")
if response.json() == []:
xprint(f"{MessagePrefix.negative} User does not have gists.")
xprint(f"{NEGATIVE} User does not have gists.")
elif response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for gist in response.json():
gists_tree = Tree("\n" + gist['id'])
for attr in self.gists_attrs:
gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}")
xprint(gists_tree)
CsvLogger.log_user_gists(gist)
log_user_gists(gist)
else:
xprint(response.json())
@@ -654,20 +655,20 @@ class Octosuite:
def user_orgs(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="")
xprint(PROMPT, limit_output.format("user organizations"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}")
if response.json() == []:
xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.")
xprint(f"{NEGATIVE} User ({username}) does not (belong to/own) any organizations.")
elif response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for organization in response.json():
org_tree = Tree("\n" + organization['login'])
for attr in self.user_orgs_attrs:
org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}")
xprint(org_tree)
CsvLogger.log_user_orgs(organization, username)
log_user_orgs(organization, username)
else:
xprint(response.json())
@@ -676,11 +677,11 @@ class Octosuite:
def user_events(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="")
xprint(PROMPT, limit_output.format("events"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}")
if response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for event in response.json():
events_tree = Tree("\n" + event['id'])
@@ -690,7 +691,7 @@ class Octosuite:
events_tree.add(f"Created at: {event['created_at']}")
xprint(events_tree)
xprint(event['payload'])
CsvLogger.log_user_events(event)
log_user_events(event)
else:
xprint(response.json())
@@ -699,20 +700,20 @@ class Octosuite:
def user_subscriptions(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input().lower()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="")
xprint(PROMPT, limit_output.format("user subscriptions"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}")
if response.json() == []:
xprint(f"{MessagePrefix.negative} User does not have any subscriptions.")
xprint(f"{NEGATIVE} User does not have any subscriptions.")
elif response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for repository in response.json():
subscriptions_tree =Tree("\n" + repository['full_name'])
for attr in self.repo_attrs:
subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(subscriptions_tree)
CsvLogger.log_user_subscriptions(repository, username)
log_user_subscriptions(repository, username)
else:
xprint(response.json())
@@ -721,20 +722,20 @@ class Octosuite:
def user_following(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input().lower()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="")
xprint(PROMPT, limit_output.format("user' following"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}")
if response.json() == []:
xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.")
xprint(f"{NEGATIVE} User ({username})does not follow anyone.")
elif response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for user in response.json():
following_tree = Tree("\n" + user['login'])
for attr in self.user_attrs:
following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(following_tree)
CsvLogger.log_user_following(user, username)
log_user_following(user, username)
else:
xprint(response.json())
@@ -743,20 +744,20 @@ class Octosuite:
def user_followers(self):
xprint(f"{white}@{green}username:{reset} ", end="")
username = input().lower()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="")
xprint(PROMPT, limit_output.format("user followers"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}")
if response.json() == []:
xprint(f"{MessagePrefix.negative} User ({username})does not have followers.")
xprint(f"{NEGATIVE} User ({username})does not have followers.")
elif response.status_code == 404:
xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}")
xprint(f"{NEGATIVE} {user_not_found.format(username)}")
elif response.status_code == 200:
for follower in response.json():
followers_tree = Tree("\n" + follower['login'])
for attr in self.user_attrs:
followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}")
xprint(followers_tree)
CsvLogger.log_user_followers(follower, username)
log_user_followers(follower, username)
else:
xprint(response.json())
@@ -769,16 +770,16 @@ class Octosuite:
user_b = input()
response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}")
if response.status_code == 204:
xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}")
xprint(f"{POSITIVE} @{user_a} FOLLOWS @{user_b}")
else:
xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}")
xprint(f"{NEGATIVE} @{user_a} DOES NOT FOLLOW @{user_b}")
# User search
def users_search(self):
xprint(f"{white}@{green}query{white} (eg. john):{reset} ", end="")
query = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="")
xprint(PROMPT, limit_output.format("user search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json()
for user in response['items']:
@@ -786,14 +787,14 @@ class Octosuite:
for attr in self.user_attrs:
users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}")
xprint(users_search_tree)
CsvLogger.log_users_search(user, query)
log_users_search(user, query)
# Repository search
def repos_search(self):
xprint(f"{white}%{green}query{white} (eg. git):{reset} ", end="")
query = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="")
xprint(PROMPT, limit_output.format("repositor[y][ies] search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json()
for repository in response['items']:
@@ -801,14 +802,14 @@ class Octosuite:
for attr in self.repo_attrs:
repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}")
xprint(repos_search_tree)
CsvLogger.log_repos_search(repository, query)
log_repos_search(repository, query)
# Topics search
def topics_search(self):
xprint(f"{white}#{green}query{white} (eg. osint):{reset} ", end="")
query = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="")
xprint(PROMPT, limit_output.format("topic(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json()
for topic in response['items']:
@@ -816,14 +817,14 @@ class Octosuite:
for attr in self.topic_attrs:
topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}")
xprint(topics_search_tree)
CsvLogger.log_topics_search(topic, query)
log_topics_search(topic, query)
# Issue search
def issues_search(self):
xprint(f"{white}!{green}Query{white} (eg. error):{reset} ", end="")
query = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="")
xprint(PROMPT, limit_output.format("issue(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json()
for issue in response['items']:
@@ -832,14 +833,14 @@ class Octosuite:
issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}")
xprint(issues_search_tree)
xprint(issue['body'])
CsvLogger.log_issues_search(issue, query)
log_issues_search(issue, query)
# Commits search
def commits_search(self):
xprint(f"{white}:{green}Query{white} (eg. filename:index.php):{reset} ", end="")
query = input()
xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="")
xprint(PROMPT, limit_output.format("commit(s) search"), end="")
limit = int(input())
response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json()
for commit in response['items']:
@@ -852,12 +853,12 @@ class Octosuite:
commits_search_tree.add(f"URL: {commit['html_url']}")
xprint(commits_search_tree)
xprint(commit['commit']['message'])
CsvLogger.log_commits_search(commit, query)
log_commits_search(commit, query)
# View csv files
def view_csv(self):
logging.info(LogRoller.viewing_csv)
logging.info(viewing_csv)
csv_files = os.listdir("output")
csv_table = Table(show_header=True, header_style=header_title)
csv_table.add_column("CSV", style="dim")
@@ -872,7 +873,7 @@ class Octosuite:
xprint(f"{green}csv {white}(filename):{reset} ", end="")
csv_file = input()
with open(os.path.join("output", csv_file + ".csv"), "r") as file:
logging.info(LogRoller.reading_csv.format(csv_file))
logging.info(reading_csv.format(csv_file))
text = Text(file.read())
xprint(text)
@@ -882,24 +883,24 @@ class Octosuite:
xprint(f"{green}csv {white}(filename):{reset} ", end="")
csv_file = input()
os.remove(os.path.join("output", csv_file))
logging.info(LogRoller.deleted_csv.format(csv_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}")
logging.info(deleted_csv.format(csv_file))
xprint(f"{POSITIVE} {deleted_csv.format(csv_file)}")
# Clear csv
def clear_csv(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "yes":
shutil.rmtree("output", ignore_errors=True)
xprint(f"{MessagePrefix.info} CSV files cleared successfully!")
xprint(f"{INFO} CSV files cleared successfully!")
else:
pass
# View octosuite log files
def view_logs(self):
logging.info(LogRoller.viewing_logs)
logging.info(viewing_logs)
logs = os.listdir(".logs")
logs_table = Table(show_header=True, header_style=header_title)
logs_table.add_column("Log", style="dim")
@@ -914,7 +915,7 @@ class Octosuite:
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
log_file = input()
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
logging.info(LogRoller.reading_log.format(log_file))
logging.info(reading_log.format(log_file))
xprint("\n" + log.read())
@@ -923,18 +924,18 @@ class Octosuite:
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
log_file = input()
os.remove(os.path.join(".logs", log_file))
logging.info(LogRoller.deleted_log.format(log_file))
xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}")
logging.info(deleted_log.format(log_file))
xprint(f"{POSITIVE} {deleted_log.format(log_file)}")
# Clear logs
def clear_logs(self):
xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
xprint(f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "yes":
shutil.rmtree(".logs", ignore_errors=True)
xprint(f"{MessagePrefix.info} Logs cleared successfully!")
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
xprint(f"{INFO} Logs cleared successfully!")
xprint(f"{INFO} {session_closed.format(datetime.now())}")
exit()
else:
pass
@@ -942,28 +943,28 @@ class Octosuite:
# Downloading release tarball
def download_tarball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar"))
logging.info(file_downloading.format(f"octosuite.v{version_tag}.tar"))
xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.tar"))
data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file:
file.write(data.content)
file.close()
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar"))
logging.info(file_downloaded.format(f"octosuite.v{version_tag}.tar"))
xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.tar"))
# Downloading release zipball
def download_zipball(self):
logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip"))
logging.info(file_downloading.format(f"octosuite.v{version_tag}.zip"))
xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.zip"))
data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}")
with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file:
file.write(data.content)
file.close()
logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip"))
logging.info(file_downloaded.format(f"octosuite.v{version_tag}.zip"))
xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.zip"))
# Author info
@@ -997,11 +998,11 @@ GitHub REST API documentation: https://docs.github.com/rest
# Close session
def exit_session(self):
xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="")
xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="")
prompt = input().lower()
if prompt == "yes":
logging.info(LogRoller.session_closed.format(datetime.now()))
xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}")
logging.info(session_closed.format(datetime.now()))
xprint(f"{INFO} {session_closed.format(datetime.now())}")
exit()
else:
pass