From 5a1b5378ee25018eff60e475f8a4cdd2fa321f17 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Thu, 24 Nov 2022 22:09:11 +0200 Subject: [PATCH 01/14] Update setup.py --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index f3041a3..a8d9f8f 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r", encoding="utf-8") as file: setuptools.setup( name="octosuite", - version="2.3.3", + version="3.0.0", author="Richard Mwewa", author_email="rly0nheart@duck.com", packages=["octosuite"], @@ -14,7 +14,7 @@ setuptools.setup( long_description_content_type="text/markdown", url="https://github.com/rly0nheart/octosuite", license="GNU General Public License v3 (GPLv3)", - install_requires=["requests", "rich"], + install_requires=["requests", "rich", "psutil"], classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Information Technology', @@ -25,7 +25,7 @@ setuptools.setup( ], entry_points={ "console_scripts": [ - "octosuite=octosuite.main:main", + "octosuite=octosuite.main:octosuite", ] }, ) From 8d337c621c090cc5dcfab3c64e91170aac207985 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:52:26 +0200 Subject: [PATCH 02/14] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a8d9f8f..354b5fc 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setuptools.setup( description="Advanced Github OSINT Framework", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/rly0nheart/octosuite", + url="https://github.com/bellingcat/octosuite", license="GNU General Public License v3 (GPLv3)", install_requires=["requests", "rich", "psutil"], classifiers=[ From d42b7af59fb7ebca591b9d3969f1f68ce4cbb2f2 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:53:51 +0200 Subject: [PATCH 03/14] Update colors.py --- octosuite/colors.py | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/octosuite/colors.py b/octosuite/colors.py index 5a29de1..e3bfd8e 100644 --- a/octosuite/colors.py +++ b/octosuite/colors.py @@ -1,29 +1,35 @@ +import psutil import platform +from richt.tree import Tree from datetime import datetime + # This file is responsible for enabling/disabling colors in OctoSuite # This file gets called first at start up before any other file gets called # colors.py is the reason why users get to choose whether to enable/disable colors -system_info = [("Processor",platform.processor), - ("Node", platform.node), - ("Release", platform.release), - ("Architecture", platform.architecture), - ("Version", platform.version)] -banner = f""" - OCTOSUITE © 2022 Richard Mwewa +# delete this file, the entire program breaks +system_info = [("RAM", f"{str(round(psutil.virtual_memory().total / (1024.0 ** 3)))}GB"), + ("Node", platform.node()), + ("Release", platform.release()), + ("Version", platform.version()), + ("Processor", platform.processor()), + ("Architecture", platform.architecture())] +first_banner = f""" + OCTOSUITE © 2023 Richard Mwewa {datetime.now().strftime('%A %d %B %Y, %H:%M:%S%p')} - - """ + +""" -print(banner) -print(f"\t{platform.system()}") -for key, value in system_info: - print(f"\t├─ {key}: {value()}") +print(first_banner) +system_tree = Tree(platform.system()) +for system_key, system_value in system_info: + system_tree.add(f"{system_key}: {system_value}") +xprint(system_tree) print("\n") while True: try: - color_chooser = input(f"[ ? ] Welcome, would you like to enable colors for this session? (Y/n) ").lower() - if color_chooser == "y": + color_chooser = input(f"[?] Welcome, would you like to enable colors for this session? (yes/no) ").lower() + if color_chooser == "yes": header_title = "bold white" red = "[red]" white = "[white]" @@ -33,11 +39,12 @@ while True: green_bold = "[green bold]" reset = "[/]" break - elif color_chooser == "n": + elif color_chooser == "no": header_title = red = white = green = red_bold = white_bold = green_bold = reset = "" break else: - print(f"\n[ ! ] Your response '{color_chooser}' is invalid (expected y or n)") + print(f"\n[!] Your response '{color_chooser}' is invalid (expected yes or no)") except KeyboardInterrupt: - exit(f"[ ! ] Process interrupted with [Ctrl+C].") + exit(f"[!] Process interrupted with Ctrl+C.") + From 3fa3e7a18290ef1af58b8446f1d31fcb07d99e8a Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:54:25 +0200 Subject: [PATCH 04/14] Update banners.py --- octosuite/banners.py | 66 +++++++++++++------------------------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/octosuite/banners.py b/octosuite/banners.py index fab9cc0..1041cf6 100644 --- a/octosuite/banners.py +++ b/octosuite/banners.py @@ -3,69 +3,41 @@ import getpass from octosuite.colors import red, white, green, reset -""" -banner.py -This file holds the program's banners and version tag -""" -version_tag = "2.3.3" +# banners.py +# This file holds the program's banners and version tag +version_tag = "3.0.0" def ascii_banner(): + banner_tree = Tree(getpass.getuser()) + banner_tree.add(f"use ‘{green}help{reset}’ command for usage") + banner_tree.add(f"commands are case insensitive\n") ascii_banners = [ -""" + """ _______ __ _______ __ __ | |.----.| |_.-----.| __|.--.--.|__| |_.-----. | - || __|| _| _ ||__ || | || | _| -__| |_______||____||____|_____||_______||_____||__|____|_____| - """, -""" - ▒█████ ▄████▄ ▄▄▄█████▓ ▒█████ ██████ █ ██ ██▓▄▄▄█████▓▓█████ -▒██▒ ██▒▒██▀ ▀█ ▓ ██▒ ▓▒▒██▒ ██▒▒██ ▒ ██ ▓██▒▓██▒▓ ██▒ ▓▒▓█ ▀ -▒██░ ██▒▒▓█ ▄ ▒ ▓██░ ▒░▒██░ ██▒░ ▓██▄ ▓██ ▒██░▒██▒▒ ▓██░ ▒░▒███ -▒██ ██░▒▓▓▄ ▄██▒░ ▓██▓ ░ ▒██ ██░ ▒ ██▒▓▓█ ░██░░██░░ ▓██▓ ░ ▒▓█ ▄ -░ ████▓▒░▒ ▓███▀ ░ ▒██▒ ░ ░ ████▓▒░▒██████▒▒▒▒█████▓ ░██░ ▒██▒ ░ ░▒████▒ -░ ▒░▒░▒░ ░ ░▒ ▒ ░ ▒ ░░ ░ ▒░▒░▒░ ▒ ▒▓▒ ▒ ░░▒▓▒ ▒ ▒ ░▓ ▒ ░░ ░░ ▒░ ░ - ░ ▒ ▒░ ░ ▒ ░ ░ ▒ ▒░ ░ ░▒ ░ ░░░▒░ ░ ░ ▒ ░ ░ ░ ░ ░ -░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ░ ░░░ ░ ░ ▒ ░ ░ ░ - ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ - ░ """, -""" - ▄▄· ▄▄▄▄▄ .▄▄ · ▄• ▄▌▪ ▄▄▄▄▄▄▄▄ . -▪ ▐█ ▌▪•██ ▪ ▐█ ▀. █▪██▌██ •██ ▀▄.▀· - ▄█▀▄ ██ ▄▄ ▐█.▪ ▄█▀▄ ▄▀▀▀█▄█▌▐█▌▐█· ▐█.▪▐▀▀▪▄ -▐█▌.▐▌▐███▌ ▐█▌·▐█▌.▐▌▐█▄▪▐█▐█▄█▌▐█▌ ▐█▌·▐█▄▄▌ - ▀█▄▀▪·▀▀▀ ▀▀▀ ▀█▄▀▪ ▀▀▀▀ ▀▀▀ ▀▀▀ ▀▀▀ ▀▀▀""", -""" + """, + """ ╔═╗┌─┐┌┬┐┌─┐╔═╗┬ ┬┬┌┬┐┌─┐ ║ ║│ │ │ │╚═╗│ ││ │ ├┤ -╚═╝└─┘ ┴ └─┘╚═╝└─┘┴ ┴ └─┘""", -""" +╚═╝└─┘ ┴ └─┘╚═╝└─┘┴ ┴ └─┘ + """, + """ ░▒█▀▀▀█░█▀▄░▀█▀░▄▀▀▄░▒█▀▀▀█░█░▒█░░▀░░▀█▀░█▀▀ ░▒█░░▒█░█░░░░█░░█░░█░░▀▀▀▄▄░█░▒█░░█▀░░█░░█▀▀ -░▒█▄▄▄█░▀▀▀░░▀░░░▀▀░░▒█▄▄▄█░░▀▀▀░▀▀▀░░▀░░▀▀▀""", -""" - ______ __ ______ __ __ - / \ | \ / \ | \ | \ -| ▓▓▓▓▓▓\ _______ _| ▓▓_ ______ | ▓▓▓▓▓▓\__ __ \▓▓_| ▓▓_ ______ -| ▓▓ | ▓▓/ \ ▓▓ \ / \| ▓▓___\▓▓ \ | \ \ ▓▓ \ / \ -| ▓▓ | ▓▓ ▓▓▓▓▓▓▓ \▓▓▓▓▓▓ | ▓▓▓▓▓▓\\▓▓ \| ▓▓ | ▓▓ ▓▓\▓▓▓▓▓▓ | ▓▓▓▓▓▓\ - -| ▓▓ | ▓▓ ▓▓ | ▓▓ __| ▓▓ | ▓▓_\▓▓▓▓▓▓\ ▓▓ | ▓▓ ▓▓ | ▓▓ __| ▓▓ ▓▓ -| ▓▓__/ ▓▓ ▓▓_____ | ▓▓| \ ▓▓__/ ▓▓ \__| ▓▓ ▓▓__/ ▓▓ ▓▓ | ▓▓| \ ▓▓▓▓▓▓▓▓ - \▓▓ ▓▓\▓▓ \ \▓▓ ▓▓\▓▓ ▓▓\▓▓ ▓▓\▓▓ ▓▓ ▓▓ \▓▓ ▓▓\▓▓ \ - - \▓▓▓▓▓▓ \▓▓▓▓▓▓▓ \▓▓▓▓ \▓▓▓▓▓▓ \▓▓▓▓▓▓ \▓▓▓▓▓▓ \▓▓ \▓▓▓▓ \▓▓▓▓▓▓▓ - """, -""" +░▒█▄▄▄█░▀▀▀░░▀░░░▀▀░░▒█▄▄▄█░░▀▀▀░▀▀▀░░▀░░▀▀▀ + """, + """ ▄▀▄ ▄▀▀ ▀█▀ ▄▀▄ ▄▀▀ █ █ █ ▀█▀ ██▀ - ▀▄▀ ▀▄▄ █ ▀▄▀ ▄██ ▀▄█ █ █ █▄▄"""] + ▀▄▀ ▀▄▄ █ ▀▄▀ ▄██ ▀▄█ █ █ █▄▄ + """] ascii_banner = random.choice(ascii_banners) - return f"""{ascii_banner} v{version_tag} + return banner_tree, f"""{ascii_banner} v{version_tag} {white}— Advanced Github {red}OSINT{white} Framework -.:{getpass.getuser()}:. -├─ use ‘{green}help{reset}{white}’ command for usage -└╼ commands are case insensitive{reset} """ + From a57f5d47e2578a00893e67dd38caacdc10d3049e Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:55:29 +0200 Subject: [PATCH 05/14] Update csv_loggers.py --- octosuite/csv_loggers.py | 516 +++++++++++++++++++-------------------- 1 file changed, 257 insertions(+), 259 deletions(-) diff --git a/octosuite/csv_loggers.py b/octosuite/csv_loggers.py index 2f8b0a1..57be48e 100644 --- a/octosuite/csv_loggers.py +++ b/octosuite/csv_loggers.py @@ -1,448 +1,446 @@ +import os import csv import logging from rich import print as xprint -from octosuite.sign_vars import SignVar -from octosuite.log_roller import logRoller +from octosuite.log_roller import LogRoller from octosuite.colors import red, white, green, reset -""" -csvLogger -This class holds the methods for creating .csv files of each functionality in main -""" -class csvLogger: +# CsvLogger +# This class holds the methods for creating .csv files of each functionality in main +class CsvLogger: # .csv for organization' profile - def logOrgProfile(response): + def log_org_profile(response): org_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Email', 'About', 'Location', 'Blog', 'Followers', 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Account type', 'Is verified?', 'Has organization projects?', 'Has repository projects?', 'Created at', 'Updated at'] org_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], response.json()['id'], response.json()['node_id'], response.json()['email'], response.json()['description'], response.json()['location'], response.json()['blog'], response.json()['followers'], response.json()['following'], response.json()['twitter_username'], response.json()['public_gists'], response.json()['public_repos'], response.json()['type'], response.json()['is_verified'], response.json()['has_organization_projects'], response.json()['has_repository_projects'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{response.json()['name']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(org_profile_fields) - writecsv.writerow(org_profile_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_profile_fields) + write_csv.writerow(org_profile_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Creating a .csv file of a user' profile - def logUserProfile(response): + def log_user_profile(response): user_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Bio', 'Blog', 'Location', 'Followers', 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Organization', 'Is hireable?', 'Is site admin?', 'Joined at', 'Updated at'] user_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], response.json()['id'], response.json()['node_id'], response.json()['bio'], response.json()['blog'], response.json()['location'], response.json()['followers'], response.json()['following'], response.json()['twitter_username'], response.json()['public_gists'], response.json()['public_repos'], response.json()['company'], response.json()['hireable'], response.json()['site_admin'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{response.json()['login']}.csv", 'w',) as file: - writecsv = csv.writer(file) - writecsv.writerow(user_profile_fields) - writecsv.writerow(user_profile_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_profile_fields) + write_csv.writerow(user_profile_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository profile - def logRepoProfile(response): + def log_repo_profile(response): repo_profile_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] repo_profile_row = [response.json()['name'], response.json()['id'], response.json()['description'], response.json()['forks'], response.json()['stargazers_count'], response.json()['watchers'], response.json()['license'], response.json()['default_branch'], response.json()['visibility'], response.json()['language'], response.json()['open_issues'], response.json()['topics'], response.json()['homepage'], response.json()['clone_url'], response.json()['ssh_url'], response.json()['fork'], response.json()['allow_forking'], response.json()['private'], response.json()['archived'], response.json()['is_template'], response.json()['has_wiki'], response.json()['has_pages'], response.json()['has_projects'], response.json()['has_issues'], response.json()['has_downloads'], response.json()['pushed_at'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{response.json()['name']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_profile_fields) - writecsv.writerow(repo_profile_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_profile_fields) + write_csv.writerow(repo_profile_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository path contents - def logRepoPathContents(content, repo_name): + def log_repo_path_contents(content, repo_name): path_content_fields = ['Filename', 'Size (bytes)', 'Type', 'Path', 'SHA', 'URL'] path_content_row = [content['name'], content['size'], content['type'], content['path'], content['sha'], content['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{content['name']}_content_from_{repo_name}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(path_content_fields) - writecsv.writerow(path_content_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{content['name']}_content_from_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(path_content_fields) + write_csv.writerow(path_content_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository stargazer - def logRepoStargazers(stargazer, repo_name): + def log_repo_stargazers(stargazer, repo_name): user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_follower_row = [stargazer['avatar_url'], stargazer['login'], stargazer['id'], stargazer['node_id'], stargazer['gravatar_id'], stargazer['type'], stargazer['site_admin'], stargazer['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{stargazer['login']}_stargazer_of_{repo_name}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_follower_fields) - writecsv.writerow(user_follower_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{stargazer['login']}_stargazer_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository forks - def logRepoForks(fork, count): + def log_repo_forks(fork, count): repo_fork_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] repo_fork_row = [fork['full_name'], fork['id'], fork['description'], fork['forks'], fork['stargazers_count'], fork['watchers'], fork['license'], fork['default_branch'], fork['visibility'], fork['language'], fork['open_issues'], fork['topics'], fork['homepage'], fork['clone_url'], fork['ssh_url'], fork['fork'], fork['allow_forking'], fork['private'], fork['archived'], fork['is_template'], fork['has_wiki'], fork['has_pages'], fork['has_projects'], fork['has_issues'], fork['has_downloads'], fork['pushed_at'], fork['created_at'], fork['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{fork['name']}_fork_{count}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_fork_fields) - writecsv.writerow(repo_fork_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{fork['name']}_fork_{count}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_fork_fields) + write_csv.writerow(repo_fork_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository issues - def logRepoIssues(issue, repo_name): + def log_repo_issues(issue, repo_name): repo_issue_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', 'Closed at', 'Created at', 'Updated at'] repo_issue_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{repo_name}_issue_{issue['id']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_issue_fields) - writecsv.writerow(repo_issue_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repo_name}_issue_{issue['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_issue_fields) + write_csv.writerow(repo_issue_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # create .csv for repository releases - def logRepoReleases(release, repo_name): + def log_repo_releases(release, repo_name): repo_release_fields = ['Name', 'ID', 'Node ID', 'Tag', 'Branch', 'Assets', 'Is draft?', 'Is prerelease?', 'Created at', 'Published at'] repo_release_row = [release['name'], release['id'], release['node_id'], release['tag_name'], release['target_commitish'], release['assets'], release['draft'], release['prerelease'], release['created_at'], release['published_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{repo_name}_release_{release['name']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_release_fields) - writecsv.writerow(repo_release_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repo_name}_release_{release['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_release_fields) + write_csv.writerow(repo_release_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv file for repository contributors - def logRepoContributors(contributor, repo_name): + def log_repo_contributors(contributor, repo_name): repo_contributor_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] repo_contributor_row = [contributor['avatar_url'], contributor['login'], contributor['id'], contributor['node_id'], contributor['gravatar_id'], contributor['type'], contributor['site_admin'], contributor['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{contributor['login']}_contributor_of_{repo_name}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_contributor_fields) - writecsv.writerow(repo_contributor_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{contributor['login']}_contributor_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_contributor_fields) + write_csv.writerow(repo_contributor_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - print(f"{SignVar.info} {logRoller.loggingSkipped}\n") + logging.info(LogRoller.logging_skipped.format(prompt)) + print(f"{MessagePrefix.info} {LogRoller.logging_skipped}\n") # Create .csv for organization' events - def logOrgEvents(event, organization): + def log_repo_events(event, organization): org_event_fields = ['ID', 'Type', 'Created at', 'Payload'] org_event_row = [event['id'], event['type'], event['created_at'], event['payload']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{organization}_event_{event['id']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(org_event_fields) - writecsv.writerow(org_event_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{organization}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_event_fields) + write_csv.writerow(org_event_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for organization' repositories - def logOrgRepos(repository, organization): + def log_org_repos(repository, organization): org_repo_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] org_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{repository['name']}_repository_of_{organization}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(org_repo_fields) - writecsv.writerow(org_repo_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_repository_of_{organization}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_repo_fields) + write_csv.writerow(org_repo_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user' repositories - def logUserRepos(repository, username): + def log_user_repos(repository, username): user_repo_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] user_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{repository['name']}_{username}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_repo_fields) - writecsv.writerow(user_repo_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_repo_fields) + write_csv.writerow(user_repo_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user events - def logUserEvents(event): + def log_user_events(event): user_event_fields = ['Actor', 'Type', 'Repository', 'Created at', 'Payload'] user_event_row = [event['actor']['login'], event['type'], event['repo']['name'], event['created_at'], event['payload']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{event['actor']['login']}_event_{event['id']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_event_fields) - writecsv.writerow(user_event_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{event['actor']['login']}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_event_fields) + write_csv.writerow(user_event_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user gists - def logUserGists(gist): + def log_user_gists(gist): user_gist_fields = ['ID', 'Node ID', 'About', 'Comments', 'Files', 'Git Push URL', 'Is public?', 'Is truncated?', 'Updated at'] user_gist_row = [gist['id'], gist['node_id'], gist['description'], gist['comments'], gist['files'], gist['git_push_url'], gist['public'], gist['truncated'], gist['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{gist['id']}_gists_{gist['owner']['login']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_gist_fields) - writecsv.writerow(user_gist_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{gist['id']}_gists_{gist['owner']['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_gist_fields) + write_csv.writerow(user_gist_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user followers - def logUserFollowers(follower, username): + def log_user_followers(follower, username): user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_follower_row = [follower['avatar_url'], follower['login'], follower['id'], follower['node_id'], follower['gravatar_id'], follower['type'], follower['site_admin'], follower['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": with open(f"output/{follower['login']}_follower_of_{username}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_follower_fields) - writecsv.writerow(user_follower_row) + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user following - def logUserFollowing(user, username): + def log_user_following(user, username): user_following_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_following_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{user['login']}_followed_by_{username}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_following_fields) - writecsv.writerow(user_following_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{user['login']}_followed_by_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_following_fields) + write_csv.writerow(user_following_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user' subscriptions - def logUserSubscriptions(repository, username): + def log_user_subscriptions(repository, username): user_subscription_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] user_subscription_row = [repository['name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{username}_subscriptions_{repository['name']}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_subscription_fields) - writecsv.writerow(user_subscription_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{username}_subscriptions_{repository['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_subscription_fields) + write_csv.writerow(user_subscription_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # .csv for user organizations - def logUserOrgs(organization, username): + def log_user_orgs(organization, username): user_org_fields = ['Profile photo', 'Name', 'ID', 'Node ID', 'URL', 'About'] user_org_row = [organization['avatar_url'], organization['login'], organization['id'], organization['node_id'], organization['url'], organization['description']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{organization['login']}_{username}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_org_fields) - writecsv.writerow(user_org_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{organization['login']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_org_fields) + write_csv.writerow(user_org_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for user search - def logUserSearch(user, query): + def log_users_search(user, query): user_search_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_search_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{user['login']}_user_search_result_for_{query}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(user_search_fields) - writecsv.writerow(user_search_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{user['login']}_user_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_search_fields) + write_csv.writerow(user_search_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for repository search - def logRepoSearch(repository, query): + def log_repos_search(repository, query): repo_search_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] repo_search_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{repository['name']}_repository_search_result_for_{query}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(repo_search_fields) - writecsv.writerow(repo_search_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_repository_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_search_fields) + write_csv.writerow(repo_search_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for topic search - def logTopicSearch(topic, query): + def log_topics_search(topic, query): topic_search_fields = ['Name', 'Score', 'Curated', 'Featured', 'Display name', 'Created by', 'Created at', 'Updated at'] topic_search_row = [topic['name'], topic['score'], topic['curated'], topic['featured'], topic['display_name'], topic['created_by'], topic['created_at'], topic['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{topic['name']}_topic_search_result_for_{query}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(topic_search_fields) - writecsv.writerow(topic_search_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{topic['name']}_topic_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(topic_search_fields) + write_csv.writerow(topic_search_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for issues search - def logIssueSearch(issue, query): + def log_issues_search(issue, query): issue_search_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', 'Closed at', 'Created at', 'Updated at'] issue_search_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{issue['id']}_issue_search_result_for_{query}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(issue_search_fields) - writecsv.writerow(issue_search_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{issue['id']}_issue_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(issue_search_fields) + write_csv.writerow(issue_search_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") # Create .csv for commits search - def logCommitsSearch(commit, query): + def log_commits_search(commit, query): commit_search_fields = ['SHA', 'Author', 'Username', 'Email', 'Committer', 'Repository', 'URL', 'Description'] commit_search_row = [commit['commit']['tree']['sha'], commit['commit']['author']['name'], commit['author']['login'], commit['commit']['author']['email'], commit['commit']['committer']['name'], commit['repository']['full_name'], commit['html_url'], commit['commit']['message']] - xprint(f"\n{SignVar.prompt} {logRoller.askLogCsv}", end="");prompt = input().lower() - if prompt == 'y': - with open(f"output/{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv", 'w') as file: - writecsv = csv.writer(file) - writecsv.writerow(commit_search_fields) - writecsv.writerow(commit_search_row) + xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(commit_search_fields) + write_csv.writerow(commit_search_row) - logging.info(logRoller.loggedToCsv.format(file.name)) - xprint(f"{SignVar.positive} {logRoller.loggedToCsv.format(file.name)}") + logging.info(LogRoller.logged_to_csv.format(file.name)) + xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") else: - logging.info(logRoller.loggingSkipped.format(prompt)) - xprint(f"{SignVar.info} {logRoller.loggingSkipped.format(prompt)}") + logging.info(LogRoller.logging_skipped.format(prompt)) + xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") From 6e83bcf367819686372c6fb3d743cc7e0c8c1107 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:58:13 +0200 Subject: [PATCH 06/14] Update log_roller.py --- octosuite/log_roller.py | 49 +++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/octosuite/log_roller.py b/octosuite/log_roller.py index 3ee5289..c0ba3cc 100644 --- a/octosuite/log_roller.py +++ b/octosuite/log_roller.py @@ -1,28 +1,25 @@ -""" -logRoller This class is where the main notification strings/messages are held, and are being used in two different -cases (they're being used by logging to be written to log files, and being printed out to the screen). -""" +# LogRoller This class is where the main notification strings/messages are held, and are being used in two different +# cases (they're being used by logging to be written to log files, and being printed out to the screen). -class logRoller: - Ctrl = "Session terminated with {}." - Error = "An error occurred: {}" - sessionOpened = "Opened new session on {}:{}" - sessionClosed = "Session closed at {}." - viewingLogs = "Viewing logs..." - viewingCsv = "Viewing CSV file(s)..." - deletedLog = "Deleted log -> {}" - readingLog = "Reading log -> {}" - readingCsv = 'Reading csv -> {}' - deletedCsv = 'Deleted csv -> {}' - fileDownloading = "Downloading -> {}..." - fileDownloaded = "Downloaded -> downloads/{}" - infoNotFound = "Information not found -> ({} - {} - {})" - repoNotFound = "Repository not found -> ({})" - userNotFound = "User not found -> (@{})" - orgNotFound = "Organization not found -> (@{})" - repoOrUserNotFound = "Repository or user not found -> ({} - @{})" - askLogCsv = "Do you wish to log this output to a .csv file? (Y/n) " - loggedToCsv = "Output logged -> ({})" - loggingSkipped = "Logging skipped -> ({})" - limitInput = "Limit '{}' output to how many? (1-100) " +class LogRoller: + ctrl_c = "Session terminated with Ctrl+C." + error = "An error occurred: {}" + session_opened = "Opened new session on {}:{}" + session_closed = "Session closed at {}." + viewing_logs = "Viewing logs" + viewing_csv = "Viewing CSV file(s)..." + deleted_log = "Deleted log: {}" + reading_log = "Reading log: {}" + reading_csv = 'Reading csv: {}' + deleted_csv = 'Deleted csv: {}' + file_downloading = "Downloading: {}" + file_downloaded = "Downloaded: downloads/{}" + info_not_found = "Information not found: {}, {}, {}" + user_not_found = "User not found: @{}" + org_not_found = "Organization not found: @{}" + repo_or_user_not_found = "Repository or User not found: {}, @{}" + prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) " + logged_to_csv = "Output logged: {}" + logging_skipped = "Logging skipped: {}" + limit_output = "Limit '{}' output to how many? (1-100) " From 94c1e0a737a099a6de7ee55c07b74ac946e1e3c7 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 01:59:04 +0200 Subject: [PATCH 07/14] Update and rename sign_vars.py to message_prefixes.py --- octosuite/{sign_vars.py => message_prefixes.py} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename octosuite/{sign_vars.py => message_prefixes.py} (79%) diff --git a/octosuite/sign_vars.py b/octosuite/message_prefixes.py similarity index 79% rename from octosuite/sign_vars.py rename to octosuite/message_prefixes.py index dceb359..f296bb8 100644 --- a/octosuite/sign_vars.py +++ b/octosuite/message_prefixes.py @@ -1,11 +1,11 @@ from octosuite.colors import red, white, green, reset """ -SignVar *Even here, I couldn't think of a good name.* The Attributes class holds the signs/symbols that show what +MessagePrefix *Even here, I couldn't think of a good name.* The Attributes class holds the signs/symbols that show what a notification in OctoSuite might be all about. This might not be very important or necessary in some cases, but I think it's better to know the severity of the notifications you get in a program. """ -class SignVar: +class MessagePrefix: prompt = f"{white}[{green} ? {white}]{reset}" warning = f"{white}[{red} ! {white}]{reset}" error = f"{white}[{red} x {white}]{reset}" From 597ad07490163d802b3e892b9beaca172120fe30 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:04:54 +0200 Subject: [PATCH 08/14] Update octosuite.py --- octosuite/octosuite.py | 430 ++++++++++++++++++++--------------------- 1 file changed, 211 insertions(+), 219 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index defbbd9..a15afda 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -7,15 +7,14 @@ import logging import getpass import requests import platform -from rich.text import Text from rich.tree import Tree from rich.table import Table from datetime import datetime from rich import print as xprint from octosuite.helper import Help -from octosuite.sign_vars import SignVar -from octosuite.log_roller import logRoller -from octosuite.csv_loggers import csvLogger +from octosuite.log_roller import LogRoller +from octosuite.csv_loggers import CsvLogger +from octosuite.message_prefixes import MessagePrefix from octosuite.banners import version_tag, ascii_banner from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset @@ -26,60 +25,60 @@ class Octosuite: self.endpoint = 'https://api.github.com' # A list of tuples mapping commands to their methods - self.command_map = [("exit", self.exitSession), - ("clear", self.clearScreen), + self.command_map = [("exit", self.exit_session), + ("clear", self.clear_screen), ("about", self.about), ("author", self.author), - ("help", Help.helpCommand), - ("help:source", Help.sourceCommand), - ("help:search", Help.searchCommand), - ("help:user", Help.userCommand), - ("help:repo", Help.repoCommand), - ("help:logs", Help.logsCommand), - ("help:csv", Help.csvCommand), - ("help:org", Help.orgCommand), - ("source", Help.Source), - ("source:tarball", self.downloadTarball), - ("source:zipball", self.downloadZipball), - ("org", Help.Org), - ("org:events", self.orgEvents), - ("org:profile", self.orgProfile), - ("org:repos", self.orgRepos), - ("org:member", self.orgMember), - ("repo", Help.Repo), - ("repo:path_contents", self.pathContents), - ("repo:profile", self.repoProfile), - ("repo:contributors", self.repoContributors), - ("repo:stargazers", self.repoStargazers), - ("repo:forks", self.repoForks), - ("repo:issues", self.repoIssues), - ("repo:releases", self.repoReleases), - ("user", Help.User), - ("user:repos", self.userRepos), - ("user:gists", self.userGists), - ("user:orgs", self.userOrgs), - ("user:profile", self.userProfile), - ("user:events", self.userEvents), - ("user:followers", self.userFollowers), - ("user:follows", self.userFollows), - ("user:following", self.userFollowing), - ("user:subscriptions", self.userSubscriptions), - ("search", Help.Search), - ("search:users", self.userSearch), - ("search:repos", self.repoSearch), - ("search:topics", self.topicSearch), - ("search:issues", self.issueSearch), - ("search:commits", self.commitsSearch), - ("logs", Help.Logs), - ("logs:view", self.viewLogs), - ("logs:read", self.readLog), - ("logs:delete", self.deleteLog), - ("logs:clear", self.clearLogs), - ("csv", Help.Csv), - ("csv:view", self.viewCsv), - ("csv:read", self.readCsv), - ("csv:delete", self.deleteCsv), - ("csv:clear", self.clearCsv)] + ("help", Help.help_command), + ("help:source", Help.source_command), + ("help:search", Help.search_command), + ("help:user", Help.user_command), + ("help:repo", Help.repo_command), + ("help:logs", Help.logs_command), + ("help:csv", Help.csv_command), + ("help:org", Help.org_command), + ("source", Help.cource), + ("source:tarball", self.download_tarball), + ("source:zipball", self.download_zipball), + ("org", Help.org), + ("org:events", self.org_events), + ("org:profile", self.org_profile), + ("org:repos", self.org_repos), + ("org:member", self.org_member), + ("repo", Help.repo), + ("repo:path_contents", self.path_contents), + ("repo:profile", self.repo_profile), + ("repo:contributors", self.repo_contributors), + ("repo:stargazers", self.repo_stargazers), + ("repo:forks", self.repo_forks), + ("repo:issues", self.repo_issues), + ("repo:releases", self.repo_releases), + ("user", Help.user), + ("user:repos", self.user_repos), + ("user:gists", self.user_gists), + ("user:orgs", self.user_orgs), + ("user:profile", self.user_profile), + ("user:events", self.user_events), + ("user:followers", self.user_followers), + ("user:follows", self.user_follows), + ("user:following", self.user_following), + ("user:subscriptions", self.user_subscriptions), + ("search", Help.search), + ("search:users", self.user_search), + ("search:repos", self.repo_search), + ("search:topics", self.topic_search), + ("search:issues", self.issue_search), + ("search:commits", self.commits_search), + ("logs", Help.logs), + ("logs:view", self.view_logs), + ("logs:read", self.read_log), + ("logs:delete", self.delete_log), + ("logs:clear", self.clear_logs), + ("csv", Help.csv), + ("csv:view", self.view_csv), + ("csv:read", self.read_csv), + ("csv:delete", self.delete_csv), + ("csv:clear", self.clear_csv)] # Path attribute self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url'] @@ -286,12 +285,12 @@ class Octosuite: """ - pathFinder() + path_finder() This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders, enabling logging to automatically log network/user activity to a file, and logging the start of a session. """ - def pathFinder(self): + def path_finder(self): """ Check 3 directories (.logs, output, downloads) on startup If they exist, ignore, otherwise, create them @@ -313,12 +312,10 @@ class Octosuite: logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG) # Log the start of a session - logging.info(logRoller.sessionOpened.format(platform.node(), getpass.getuser())) + logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser())) - """ - Check for updates - """ + # Check for updates def check_updates(self): response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() if response['tag_name'] == version_tag: @@ -327,16 +324,16 @@ class Octosuite: """ pass else: - xprint(f"{SignVar.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") """ - onStart() + on_start() This is the main method, responsible for mapping commands, calling other methods, and catching exceptions """ - def onStart(self): - self.pathFinder() - self.clearScreen() + def on_start(self): + self.path_finder() + self.clear_screen() self.configure_logging() self.check_updates() xprint(ascii_banner()) @@ -362,60 +359,60 @@ class Octosuite: # Fetching organization info - def orgProfile(self): + def org_profile(self): xprint(f"{white}>> @{green}Organization {white}(username){reset} ", end="") organization = input() response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: org_profile_tree = Tree("\n" + response.json()['name']) for attr in self.org_attrs: org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}") xprint(org_profile_tree) - csvLogger.logOrgProfile(response) + CsvLogger.log_org_profile(response) else: xprint(response.json()) # Fetching user information - def userProfile(self): + def user_profile(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: user_profile_tree = Tree("\n" + response.json()['name']) for attr in self.profile_attrs: user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}") xprint(user_profile_tree) - csvLogger.logUserProfile(response) + CsvLogger.log_user_profile(response) else: xprint(response.json()) # Fetching repository information - def repoProfile(self): + def repo_profile(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") username = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: repo_profile_tree = Tree("\n" + response.json()['full_name']) for attr in self.repo_attrs: repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}") xprint(repo_profile_tree) - csvLogger.logRepoProfile(response) + CsvLogger.log_repo_profile(response) else: xprint(response.json()) # Get path contents - def pathContents(self): + def path_contents(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") @@ -424,101 +421,101 @@ class Octosuite: path_name = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.infoNotFound.format(repo_name, username, path_name)}") + xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}") elif response.status_code == 200: for content_count, content in enumerate(response.json(), start=1): path_contents_tree = Tree("\n" + content['name']) for attr in self.path_attrs: path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}") xprint(path_contents_tree) - csvLogger.logRepoPathContents(content, repo_name) - xprint(SignVar.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.") + CsvLogger.log_repo_path_contents(content, repo_name) + xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.") else: xprint(response.json()) # repo contributors - def repoContributors(self): + def repo_contributors(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("contributors"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: for contributor in response.json(): contributor_tree = Tree("\n" + contributor['login']) for attr in self.user_attrs: contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}") xprint(contributor_tree) - csvLogger.logRepoContributors(contributor, repo_name) + CsvLogger.log_repo_contributors(contributor, repo_name) else: xprint(response.json()) # repo stargazers - def repoStargazers(self): + def repo_stargazers(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository stargazers"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{SignVar.negative} Repository does not have any stargazers -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})") elif response.status_code == 200: for stargazer in response.json(): stargazer_tree = Tree("\n" + stargazer['login']) for attr in self.user_attrs: stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}") xprint(stargazer_tree) - csvLogger.logRepoStargazers(stargazer, repo_name) + CsvLogger.log_repo_stargazers(stargazer, repo_name) else: xprint(response.json()) # repo forks - def repoForks(self): + def repo_forks(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository forks"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{SignVar.negative} Repository does not have forks -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})") elif response.status_code == 200: for count, fork in enumerate(response.json()): fork_tree = Tree("\n" + fork['full_name']) for attr in self.repo_attrs: fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}") xprint(fork_tree) - csvLogger.logRepoForks(fork, count) + CsvLogger.log_repo_forks(fork, count) else: xprint(response.json()) # Repo issues - def repoIssues(self): + def repo_issues(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository issues"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{SignVar.negative} Repository does not have open issues -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})") elif response.status_code == 200: for issue in response.json(): issues_tree = Tree("\n" + issue['title']) @@ -526,24 +523,24 @@ class Octosuite: issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_tree) xprint(issue['body']) - csvLogger.logRepoIssues(issue, repo_name) + CsvLogger.log_repo_issues(issue, repo_name) else: xprint(response.json()) # Repo releases - def repoReleases(self): + def repo_releases(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repository releases"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.repoOrUserNotFound.format(repo_name, username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{SignVar.negative} Repository does not have releases -> ({repo_name})") + xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})") elif response.status_code == 200: for release in response.json(): releases_tree = Tree("\n" + release['name']) @@ -551,40 +548,40 @@ class Octosuite: releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}") xprint(releases_tree) xprint(release['body']) - csvLogger.logRepoReleases(release, repo_name) + CsvLogger.log_repo_releases(release, repo_name) else: xprint(response.json()) # Fetching organization repositories - def orgRepos(self): + def org_repos(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() - xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - csvLogger.logOrgRepos(repository, organization) + CsvLogger.log_org_repos(repository, organization) else: xprint(response.json()) # organization events - def orgEvents(self): + def org_events(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() - xprint(SignVar.prompt, logRoller.limitInput.format("organization repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.orgNotFound.format(organization)}") + xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -592,97 +589,97 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - csvLogger.logOrgEvents(event, organization) + CsvLogger.log_org_events(event, organization) else: xprint(response.json()) # organization member - def orgMember(self): + def org_member(self): xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") organization = input() xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: - xprint(f"{SignVar.positive} User ({username}) is a public member of the organization -> ({organization})") + xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})") else: - xprint(f"{SignVar.negative} {response.json()['message']}") + xprint(f"{MessagePrefix.negative} {response.json()['message']}") # Fetching user repositories - def userRepos(self): + def user_repos(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repositories"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - csvLogger.logUserRepos(repository, username) + CsvLogger.log_user_repos(repository, username) else: xprint(response.json()) # Fetching user's gists - def userGists(self): + def user_gists(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format('gists'), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User does not have gists.") + xprint(f"{MessagePrefix.negative} User does not have gists.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for gist in response.json(): gists_tree = Tree("\n" + gist['id']) for attr in self.gists_attrs: gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}") xprint(gists_tree) - csvLogger.logUserGists(gist) + CsvLogger.log_user_gists(gist) else: xprint(response.json()) # Fetching a list of organizations that a user owns or belongs to - def userOrgs(self): + def user_orgs(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("user organizations"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username}) does not (belong to/own) any organizations.") + xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for organization in response.json(): org_tree = Tree("\n" + organization['login']) for attr in self.user_orgs_attrs: org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}") xprint(org_tree) - csvLogger.logUserOrgs(organization, username) + CsvLogger.log_user_orgs(organization, username) else: xprint(response.json()) # Fetching a users events - def userEvents(self): + def user_events(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input() - xprint(SignVar.prompt, logRoller.limitInput.format("events"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}") if response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -692,95 +689,95 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - csvLogger.logUserEvents(event) + CsvLogger.log_user_events(event) else: xprint(response.json()) # Fetching a target user's subscriptions - def userSubscriptions(self): + def user_subscriptions(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user subscriptions"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User does not have any subscriptions.") + xprint(f"{MessagePrefix.negative} User does not have any subscriptions.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for repository in response.json(): subscriptions_tree =Tree("\n" + repository['full_name']) for attr in self.repo_attrs: subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(subscriptions_tree) - csvLogger.logUserSubscriptions(repository, username) + CsvLogger.log_user_subscriptions(repository, username) else: xprint(response.json()) # Fetching a list of users the target follows - def userFollowing(self): + def user_following(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user' following"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username})does not follow anyone.") + xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for user in response.json(): following_tree = Tree("\n" + user['login']) for attr in self.user_attrs: following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(following_tree) - csvLogger.logUserFollowing(user, username) + CsvLogger.log_user_following(user, username) else: xprint(response.json()) # Fetching user's followers - def userFollowers(self): + def user_followers(self): xprint(f"{white}>> @{green}Username{reset} ", end="") username = input().lower() - xprint(SignVar.prompt, logRoller.limitInput.format("user followers"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}") if response.json() == []: - xprint(f"{SignVar.negative} User ({username})does not have followers.") + xprint(f"{MessagePrefix.negative} User ({username})does not have followers.") elif response.status_code == 404: - xprint(f"{SignVar.negative} {logRoller.userNotFound.format(username)}") + xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") elif response.status_code == 200: for follower in response.json(): followers_tree = Tree("\n" + follower['login']) for attr in self.user_attrs: followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}") xprint(followers_tree) - csvLogger.logUserFollowers(follower, username) + CsvLogger.log_user_followers(follower, username) else: xprint(response.json()) # Checking whether user[A] follows user[B] - def userFollows(self): + def user_follows(self): xprint(f"{white}>> @{green}user{white}(A) (username){reset} ", end="") user_a = input() xprint(f"{white}>> @{green}user{white}(B) (username){reset} ", end="") user_b = input() response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: - xprint(f"{SignVar.positive} @{user_a} FOLLOWS @{user_b}") + xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}") else: - xprint(f"{SignVar.negative} @{user_a} DOES NOT FOLLOW @{user_b}") + xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}") # User search - def userSearch(self): + def users_search(self): xprint(f"{white}>> @{green}Query{white} (eg. john){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("user search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: @@ -788,14 +785,14 @@ class Octosuite: for attr in self.user_attrs: user_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(user_search_tree) - csvLogger.logUserSearch(user, query) + CsvLogger.log_users_search(user, query) # Repository search - def repoSearch(self): + def repos_search(self): xprint(f"{white}>> %{green}Query{white} (eg. git){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("repositor[y][ies] search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: @@ -803,14 +800,14 @@ class Octosuite: for attr in self.repo_attrs: repo_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repo_search_tree) - csvLogger.logRepoSearch(repository, query) + CsvLogger.log_repos_search(repository, query) # Topics search - def topicSearch(self): + def topics_search(self): xprint(f"{white}>> #{green}Query{white} (eg. osint){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("topic(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: @@ -818,14 +815,14 @@ class Octosuite: for attr in self.topic_attrs: topic_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") xprint(topic_search_tree) - csvLogger.logTopicSearch(topic, query) + CsvLogger.log_topics_search(topic, query) # Issue search - def issueSearch(self): + def issues_search(self): xprint(f"{white}>> !{green}Query{white} (eg. error){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("issue(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: @@ -834,14 +831,14 @@ class Octosuite: issue_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issue_search_tree) xprint(issue['body']) - csvLogger.logIssueSearch(issue, query) + CsvLogger.log_issues_search(issue, query) # Commits search - def commitsSearch(self): + def commits_search(self): xprint(f"{white}>> :{green}Query{white} (eg. filename:index.php){reset} ", end="") query = input() - xprint(SignVar.prompt, logRoller.limitInput.format("commit(s) search"), end="") + xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json() for commit in response['items']: @@ -854,12 +851,12 @@ class Octosuite: commits_search_tree.add(f"URL: {commit['html_url']}") xprint(commits_search_tree) xprint(commit['commit']['message']) - csvLogger.logCommitsSearch(commit, query) + CsvLogger.log_commits_search(commit, query) # View csv files - def viewCsv(self): - logging.info(logRoller.viewingCsv) + def view_csv(self): + logging.info(LogRoller.viewing_csv) csv_files = os.listdir("output") csv_table = Table(show_header=True, header_style=header_title) csv_table.add_column("CSV", style="dim") @@ -870,39 +867,38 @@ class Octosuite: # Read a specified csv file - def readCsv(self): + def read_csv(self): xprint(f"{white}>> {green}.csv {reset}(filename) ", end="") csv_file = input() with open(os.path.join("output", csv_file + ".csv"), "r") as file: - logging.info(logRoller.readingCsv.format(csv_file)) + logging.info(LogRoller.reading_csv.format(csv_file)) text = Text(file.read()) xprint(text) - # xprint("\n" + file.read()) # Delete a specified csv file - def deleteCsv(self): + def delete_csv(self): xprint(f"{white}>> {green}.csv {reset}filename{reset} ", end="") csv_file = input() os.remove(os.path.join("output", csv_file)) - logging.info(logRoller.deletedCsv.format(csv_file)) - xprint(f"{SignVar.positive} {logRoller.deletedCsv.format(csv_file)}") + logging.info(LogRoller.deleted_csv.format(csv_file)) + xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}") # Clear csv - def clearCsv(self): - xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (y/n) ", end="") + def clear_csv(self): + xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == "y": + if prompt == "yes": shutil.rmtree("output", ignore_errors=True) - xprint(f"{SignVar.info} CSV files cleared successfully!") + xprint(f"{MessagePrefix.info} CSV files cleared successfully!") else: pass # View octosuite log files - def viewLogs(self): - logging.info(logRoller.viewingLogs) + def view_logs(self): + logging.info(LogRoller.viewing_logs) logs = os.listdir(".logs") logs_table = Table(show_header=True, header_style=header_title) logs_table.add_column("Log", style="dim") @@ -913,60 +909,60 @@ class Octosuite: # Read a specified log file - def readLog(self): + def read_log(self): xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") log_file = input() with open(os.path.join(".logs", log_file + ".log"), "r") as log: - logging.info(logRoller.readingLog.format(log_file)) + logging.info(LogRoller.reading_log.format(log_file)) xprint("\n" + log.read()) # Delete a specified log file - def deleteLog(self): + def delete_log(self): xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") log_file = input() os.remove(os.path.join(".logs", log_file)) - logging.info(logRoller.deletedLog.format(log_file)) - xprint(f"{SignVar.positive} {logRoller.deletedLog.format(log_file)}") + logging.info(LogRoller.deleted_log.format(log_file)) + xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}") # Clear logs - def clearLogs(self): - xprint(f"{SignVar.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (y/n) ", end="") + def clear_logs(self): + xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == "y": + if prompt == "yes": shutil.rmtree(".logs", ignore_errors=True) - xprint(f"{SignVar.info} Logs cleared successfully!") - xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}") + xprint(f"{MessagePrefix.info} Logs cleared successfully!") + xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") exit() else: pass # Downloading release tarball - def downloadTarball(self): - logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar")) - xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.tar")) + def download_tarball(self): + logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) + xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file: file.write(data.content) file.close() - logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar")) - xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.tar")) + logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) + xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) # Downloading release zipball - def downloadZipball(self): - logging.info(logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip")) - xprint(SignVar.info, logRoller.fileDownloading.format(f"octosuite.v{version_tag}.zip")) + def download_zipball(self): + logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) + xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file: file.write(data.content) file.close() - logging.info(logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip")) - xprint(SignVar.positive, logRoller.fileDownloaded.format(f"octosuite.v{version_tag}.zip")) + logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) + xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) # Author info @@ -979,45 +975,41 @@ class Octosuite: # About program def about(self): - about_text = Text(f""" - OCTOSUITE © 2022 Richard Mwewa + about_text = f""" + OCTOSUITE © 2023 Richard Mwewa -An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations. -With over 20+ features, Octosuite only runs on 2 external dependencies, and returns the gathered intelligence in a highly readable format. +An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations, with over 20+ features. Whats new in v{version_tag}? -[fixed] Minor fixes -[improved] Removed width from tables, so that they can auto adjust -[added] Added the 'log:clear' command, which will be used to clear all logs -[added] Added the 'csv:clear' command, which will be used to clear all csv files +[{green}fixed{reset}] Minor fixes +[{green}improved{reset}] Removed width from tables, so that they can auto adjust +[{green}added{reset}] Added the 'log:clear' command, which will be used to clear all logs +[{green}added{reset}] Added the 'csv:clear' command, which will be used to clear all csv files Read the wiki: https://github.com/bellingcat/octosuite/wiki GitHub REST API documentation: https://docs.github.com/rest -""") +""" xprint(about_text) # Close session - def exitSession(self): - xprint(f"{SignVar.prompt} This will close the current session, continue? (Y/n) ", end="") + def exit_session(self): + xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="") prompt = input().lower() - if prompt == 'y': - logging.info(logRoller.sessionClosed.format(datetime.now())) - xprint(f"{SignVar.info} {logRoller.sessionClosed.format(datetime.now())}") + if prompt == "yes": + logging.info(LogRoller.session_closed.format(datetime.now())) + xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") exit() else: pass # Clear screen - def clearScreen(self): + def clear_screen(self): """ using 'cls' on Windows machines to clear the screen, otherwise, use 'clear' """ - if sys.platform.lower().startswith("win"): - os.system('cls') - else: - os.system('clear') + os.system('cls' if os.name == 'nt' else 'clear') From a108b4784e76eddda2bc3d8f059f7a5e398f5ec8 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:05:44 +0200 Subject: [PATCH 09/14] Update main.py --- octosuite/main.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/octosuite/main.py b/octosuite/main.py index 770c804..64a2af6 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -1,19 +1,19 @@ import logging from rich import print as xprint -from octosuite.sign_vars import SignVar -from octosuite.octosuite import Octosuite -from octosuite.log_roller import logRoller +from octosuite.octosuite import * +from octosuite.log_roller import LogRoller +from octosuite.message_prefixes import MessagePrefix -def main(): +def octosuite(): try: run = Octosuite() - run.onStart() + run.on_start() except KeyboardInterrupt: - logging.warning(logRoller.Ctrl.format("Ctrl+C")) - xprint(f"{SignVar.warning} {logRoller.Ctrl.format('Ctrl+C')}") + logging.warning(LogRoller.ctrl_c) + xprint(f"{MessagePrefix.warning} {LogRoller.ctrl_c}") except Exception as e: - logging.error(logRoller.Error.format(e)) - xprint(f"{SignVar.error} {logRoller.Error.format(e)}") + logging.error(LogRoller.error.format(e)) + xprint(f"{MessagePrefix.error} {LogRoller.error.format(e)}") From c2641b3134e7acdfffbfaec4cfa4bc5e6e24feb8 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:07:40 +0200 Subject: [PATCH 10/14] Update main.py --- octosuite/main.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/octosuite/main.py b/octosuite/main.py index 64a2af6..ac47dda 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -1,8 +1,4 @@ -import logging -from rich import print as xprint from octosuite.octosuite import * -from octosuite.log_roller import LogRoller -from octosuite.message_prefixes import MessagePrefix def octosuite(): From 497c855648e7fd17bb6dd136ead645bf781a571a Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:08:40 +0200 Subject: [PATCH 11/14] Update main.py --- octosuite/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/octosuite/main.py b/octosuite/main.py index ac47dda..915c60c 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -1,3 +1,4 @@ +# import everything from the octosuite.py file from octosuite.octosuite import * From 083d3e9f715b2032c406426d14de658b2f32cacd Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:16:44 +0200 Subject: [PATCH 12/14] Update octosuite.py --- octosuite/octosuite.py | 138 +++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 68 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index a15afda..2aa0760 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -7,6 +7,7 @@ import logging import getpass import requests import platform +from rich.text import Text from rich.tree import Tree from rich.table import Table from datetime import datetime @@ -26,59 +27,59 @@ class Octosuite: # A list of tuples mapping commands to their methods self.command_map = [("exit", self.exit_session), - ("clear", self.clear_screen), - ("about", self.about), - ("author", self.author), - ("help", Help.help_command), - ("help:source", Help.source_command), - ("help:search", Help.search_command), - ("help:user", Help.user_command), - ("help:repo", Help.repo_command), - ("help:logs", Help.logs_command), - ("help:csv", Help.csv_command), - ("help:org", Help.org_command), - ("source", Help.cource), - ("source:tarball", self.download_tarball), - ("source:zipball", self.download_zipball), - ("org", Help.org), - ("org:events", self.org_events), - ("org:profile", self.org_profile), - ("org:repos", self.org_repos), - ("org:member", self.org_member), - ("repo", Help.repo), - ("repo:path_contents", self.path_contents), - ("repo:profile", self.repo_profile), - ("repo:contributors", self.repo_contributors), - ("repo:stargazers", self.repo_stargazers), - ("repo:forks", self.repo_forks), - ("repo:issues", self.repo_issues), - ("repo:releases", self.repo_releases), - ("user", Help.user), - ("user:repos", self.user_repos), - ("user:gists", self.user_gists), - ("user:orgs", self.user_orgs), - ("user:profile", self.user_profile), - ("user:events", self.user_events), - ("user:followers", self.user_followers), - ("user:follows", self.user_follows), - ("user:following", self.user_following), - ("user:subscriptions", self.user_subscriptions), - ("search", Help.search), - ("search:users", self.user_search), - ("search:repos", self.repo_search), - ("search:topics", self.topic_search), - ("search:issues", self.issue_search), - ("search:commits", self.commits_search), - ("logs", Help.logs), - ("logs:view", self.view_logs), - ("logs:read", self.read_log), - ("logs:delete", self.delete_log), - ("logs:clear", self.clear_logs), - ("csv", Help.csv), - ("csv:view", self.view_csv), - ("csv:read", self.read_csv), - ("csv:delete", self.delete_csv), - ("csv:clear", self.clear_csv)] + ("clear", self.clear_screen), + ("about", self.about), + ("author", self.author), + ("help", Help.help_command), + ("help:source", Help.source_command), + ("help:search", Help.search_command), + ("help:user", Help.user_command), + ("help:repo", Help.repo_command), + ("help:logs", Help.logs_command), + ("help:csv", Help.csv_command), + ("help:org", Help.org_command), + ("source", Help.cource), + ("source:tarball", self.download_tarball), + ("source:zipball", self.download_zipball), + ("org", Help.org), + ("org:events", self.org_events), + ("org:profile", self.org_profile), + ("org:repos", self.org_repos), + ("org:member", self.org_member), + ("repo", Help.repo), + ("repo:path_contents", self.path_contents), + ("repo:profile", self.repo_profile), + ("repo:contributors", self.repo_contributors), + ("repo:stargazers", self.repo_stargazers), + ("repo:forks", self.repo_forks), + ("repo:issues", self.repo_issues), + ("repo:releases", self.repo_releases), + ("user", Help.user), + ("user:repos", self.user_repos), + ("user:gists", self.user_gists), + ("user:orgs", self.user_orgs), + ("user:profile", self.user_profile), + ("user:events", self.user_events), + ("user:followers", self.user_followers), + ("user:follows", self.user_follows), + ("user:following", self.user_following), + ("user:subscriptions", self.user_subscriptions), + ("search", Help.search), + ("search:users", self.users_search), + ("search:repos", self.repos_search), + ("search:topics", self.topics_search), + ("search:issues", self.issues_search), + ("search:commits", self.commits_search), + ("logs", Help.logs), + ("logs:view", self.view_logs), + ("logs:read", self.read_log), + ("logs:delete", self.delete_log), + ("logs:clear", self.clear_logs), + ("csv", Help.csv), + ("csv:view", self.view_csv), + ("csv:read", self.read_csv), + ("csv:delete", self.delete_csv), + ("csv:clear", self.clear_csv)] # Path attribute self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url'] @@ -324,7 +325,7 @@ class Octosuite: """ pass else: - xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + xprint(f"{MessagePrefix.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") """ @@ -484,7 +485,7 @@ class Octosuite: def repo_forks(self): xprint(f"{white}>> %{green}Repository{reset} ", end="") repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") + xprint(f"{white}@{green}Owner{white} (username):{reset} ", end="") username = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="") limit = int(input()) @@ -781,10 +782,10 @@ class Octosuite: limit = int(input()) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: - user_search_tree = Tree("\n" + user['login']) + users_search_tree = Tree("\n" + user['login']) for attr in self.user_attrs: - user_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") - xprint(user_search_tree) + users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") + xprint(users_search_tree) CsvLogger.log_users_search(user, query) @@ -796,10 +797,10 @@ class Octosuite: limit = int(input()) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: - repo_search_tree = Tree("\n" + repository['full_name']) + repos_search_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: - repo_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") - xprint(repo_search_tree) + repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") + xprint(repos_search_tree) CsvLogger.log_repos_search(repository, query) @@ -811,10 +812,10 @@ class Octosuite: limit = int(input()) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: - topic_search_tree = Tree("\n" + topic['name']) + topics_search_tree = Tree("\n" + topic['name']) for attr in self.topic_attrs: - topic_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") - xprint(topic_search_tree) + topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") + xprint(topics_search_tree) CsvLogger.log_topics_search(topic, query) @@ -826,10 +827,10 @@ class Octosuite: limit = int(input()) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: - issue_search_tree = Tree("\n" + issue['title']) + issues_search_tree = Tree("\n" + issue['title']) for attr in self.repo_issues_attrs: - issue_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") - xprint(issue_search_tree) + issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") + xprint(issues_search_tree) xprint(issue['body']) CsvLogger.log_issues_search(issue, query) @@ -978,7 +979,8 @@ class Octosuite: about_text = f""" OCTOSUITE © 2023 Richard Mwewa -An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations, with over 20+ features. +An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations. +With over 20+ features, Octosuite only runs on 2 external dependencies, and returns the gathered intelligence in a highly readable format. Whats new in v{version_tag}? From bb6c5a09e9b936dd736ff2ddd8875ef06f8a92d0 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:26:24 +0200 Subject: [PATCH 13/14] Update octosuite.py --- octosuite/octosuite.py | 52 +++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index 2aa0760..a5d3fe6 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -325,7 +325,7 @@ class Octosuite: """ pass else: - xprint(f"{MessagePrefix.info} A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") """ @@ -337,7 +337,7 @@ class Octosuite: self.clear_screen() self.configure_logging() self.check_updates() - xprint(ascii_banner()) + xprint(ascii_banner()[1], ascii_banner()[0]) """ Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C) @@ -361,7 +361,7 @@ class Octosuite: # Fetching organization info def org_profile(self): - xprint(f"{white}>> @{green}Organization {white}(username){reset} ", end="") + xprint(f"{white}@{green}Organization {white}(username):{reset} ", end="") organization = input() response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: @@ -378,7 +378,7 @@ class Octosuite: # Fetching user information def user_profile(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: @@ -556,7 +556,7 @@ class Octosuite: # Fetching organization repositories def org_repos(self): - xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") + xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") organization = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) @@ -576,7 +576,7 @@ class Octosuite: # organization events def org_events(self): - xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") + xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") organization = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") limit = int(input()) @@ -597,9 +597,9 @@ class Octosuite: # organization member def org_member(self): - xprint(f"{white}>> @{green}Organization{white} (username){reset} ", end="") + xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") organization = input() - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: @@ -610,7 +610,7 @@ class Octosuite: # Fetching user repositories def user_repos(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="") limit = int(input()) @@ -630,7 +630,7 @@ class Octosuite: # Fetching user's gists def user_gists(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="") limit = int(input()) @@ -652,7 +652,7 @@ class Octosuite: # Fetching a list of organizations that a user owns or belongs to def user_orgs(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="") limit = int(input()) @@ -674,7 +674,7 @@ class Octosuite: # Fetching a users events def user_events(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="") limit = int(input()) @@ -697,7 +697,7 @@ class Octosuite: # Fetching a target user's subscriptions def user_subscriptions(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="") limit = int(input()) @@ -719,7 +719,7 @@ class Octosuite: # Fetching a list of users the target follows def user_following(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="") limit = int(input()) @@ -741,7 +741,7 @@ class Octosuite: # Fetching user's followers def user_followers(self): - xprint(f"{white}>> @{green}Username{reset} ", end="") + xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="") limit = int(input()) @@ -763,9 +763,9 @@ class Octosuite: # Checking whether user[A] follows user[B] def user_follows(self): - xprint(f"{white}>> @{green}user{white}(A) (username){reset} ", end="") + xprint(f"{white}@{green}user{white}(A) (username):{reset} ", end="") user_a = input() - xprint(f"{white}>> @{green}user{white}(B) (username){reset} ", end="") + xprint(f"{white}@{green}user{white}(B) (username):{reset} ", end="") user_b = input() response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: @@ -776,7 +776,7 @@ class Octosuite: # User search def users_search(self): - xprint(f"{white}>> @{green}Query{white} (eg. john){reset} ", end="") + xprint(f"{white}@{green}query{white} (eg. john):{reset} ", end="") query = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="") limit = int(input()) @@ -791,7 +791,7 @@ class Octosuite: # Repository search def repos_search(self): - xprint(f"{white}>> %{green}Query{white} (eg. git){reset} ", end="") + xprint(f"{white}%{green}query{white} (eg. git):{reset} ", end="") query = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="") limit = int(input()) @@ -806,7 +806,7 @@ class Octosuite: # Topics search def topics_search(self): - xprint(f"{white}>> #{green}Query{white} (eg. osint){reset} ", end="") + xprint(f"{white}#{green}query{white} (eg. osint):{reset} ", end="") query = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="") limit = int(input()) @@ -821,7 +821,7 @@ class Octosuite: # Issue search def issues_search(self): - xprint(f"{white}>> !{green}Query{white} (eg. error){reset} ", end="") + xprint(f"{white}!{green}Query{white} (eg. error):{reset} ", end="") query = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="") limit = int(input()) @@ -837,7 +837,7 @@ class Octosuite: # Commits search def commits_search(self): - xprint(f"{white}>> :{green}Query{white} (eg. filename:index.php){reset} ", end="") + xprint(f"{white}:{green}Query{white} (eg. filename:index.php):{reset} ", end="") query = input() xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="") limit = int(input()) @@ -869,7 +869,7 @@ class Octosuite: # Read a specified csv file def read_csv(self): - xprint(f"{white}>> {green}.csv {reset}(filename) ", end="") + xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() with open(os.path.join("output", csv_file + ".csv"), "r") as file: logging.info(LogRoller.reading_csv.format(csv_file)) @@ -879,7 +879,7 @@ class Octosuite: # Delete a specified csv file def delete_csv(self): - xprint(f"{white}>> {green}.csv {reset}filename{reset} ", end="") + xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() os.remove(os.path.join("output", csv_file)) logging.info(LogRoller.deleted_csv.format(csv_file)) @@ -911,7 +911,7 @@ class Octosuite: # Read a specified log file def read_log(self): - xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") + xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") log_file = input() with open(os.path.join(".logs", log_file + ".log"), "r") as log: logging.info(LogRoller.reading_log.format(log_file)) @@ -920,7 +920,7 @@ class Octosuite: # Delete a specified log file def delete_log(self): - xprint(f"{white}>> {green}.log date{reset} (eg. 2022-04-27 10:09:36AM) ", end="") + xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") log_file = input() os.remove(os.path.join(".logs", log_file)) logging.info(LogRoller.deleted_log.format(log_file)) From 2537bcbab84d37529481655a683121fb70b472fa Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 25 Nov 2022 02:29:04 +0200 Subject: [PATCH 14/14] Update message_prefixes.py --- octosuite/message_prefixes.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/octosuite/message_prefixes.py b/octosuite/message_prefixes.py index f296bb8..f65d0e4 100644 --- a/octosuite/message_prefixes.py +++ b/octosuite/message_prefixes.py @@ -6,9 +6,9 @@ a notification in OctoSuite might be all about. This might not be very important but I think it's better to know the severity of the notifications you get in a program. """ class MessagePrefix: - prompt = f"{white}[{green} ? {white}]{reset}" - warning = f"{white}[{red} ! {white}]{reset}" - error = f"{white}[{red} x {white}]{reset}" - positive = f"{white}[{green} + {white}]{reset}" - negative = f"{white}[{red} - {white}]{reset}" - info = f"{white}[{green} * {white}]{reset}" + prompt = f"{white}[{green}?{white}]{reset}" + warning = f"{white}[{red}!{white}]{reset}" + error = f"{white}[{red}x{white}]{reset}" + positive = f"{white}[{green}+{white}]{reset}" + negative = f"{white}[{red}-{white}]{reset}" + info = f"{white}[{green}*{white}]{reset}"