From 1f86ad4034543ab8c487af52a2d230d8006e61c1 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Thu, 1 Dec 2022 19:36:59 +0200 Subject: [PATCH 1/9] Update octosuite.py --- octosuite/octosuite.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index d01d37d..e445f31 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -23,6 +23,7 @@ from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_pr readline = Readline() + class Octosuite: def __init__(self): # API endpoint @@ -92,8 +93,7 @@ class Octosuite: 'path': 'Path', 'sha': 'SHA', 'html_url': 'URL'} - - + # Organization attributes self.org_attrs = ['avatar_url', 'login', 'id', 'node_id', 'email', 'description', 'blog', 'location', 'followers', 'following', 'twitter_username', 'public_gists', 'public_repos', 'type', 'is_verified', From 4c8d3626b3707ed9f5826c5a9571f47baced5315 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Thu, 1 Dec 2022 20:12:52 +0200 Subject: [PATCH 2/9] Update octosuite.py --- octosuite/octosuite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index e445f31..f8b836f 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -15,7 +15,7 @@ from pyreadline3 import Readline from rich import print as xprint from octosuite.banners import version_tag, ascii_banner from octosuite.colors import red, white, green, header_title, reset -from octosuite.message_prefixes import PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol +from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol # seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :) from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, deleted_log, reading_log, reading_csv, deleted_csv, file_downloading, file_downloaded, info_not_found, user_not_found, org_not_found, repo_or_user_not_found, limit_output From b3ae15a0629a2e4bd85e1acaa1752133bd6c1c05 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Fri, 2 Dec 2022 23:15:26 +0200 Subject: [PATCH 3/9] Update Dockerfile --- Dockerfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index e6fcb16..d6c17aa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,9 +6,7 @@ WORKDIR /app COPY . . -RUN pip install --upgrade pip -RUN pip install build -RUN python -m build +RUN pip install --upgrade pip && pip install build && python -m build RUN pip install dist/*.whl ENTRYPOINT ["octosuite"] From dd7fcc455dbed1951be09b09525b0efc40191096 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Mon, 5 Dec 2022 21:03:15 +0200 Subject: [PATCH 4/9] Update Octosuite --- octosuite/main.py | 25 ++++++++++++++++++++++++- octosuite/octosuite.py | 31 ------------------------------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/octosuite/main.py b/octosuite/main.py index 45a0361..fe6fea9 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -5,7 +5,30 @@ from octosuite.octosuite import * # I drifted away from the 'pythonic way' here def octosuite(): try: run = Octosuite() - run.on_start() + run.path_finder() + run.clear_screen() + run.configure_logging() + run.check_updates() + xprint(ascii_banner()[1], ascii_banner()[0]) + + """ + Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C) + or if the 'exit' command is entered. + """ + while True: + xprint(f"{white}┌──({red}{getpass.getuser()}{white}@{red}octosuite{white})\n├──[~{green}{os.getcwd()}{white}]\n└╼ {reset}",end="") + command_input = input().lower() + print("\n") + """ + Iterate over the command_map and check if the user input matches any command in it [command_map], + if there's a match, we return its method. If no match is found, we ignore it. + """ + for command, method in run.command_map: + if command_input == command: + method() + print("\n") + else: + pass except KeyboardInterrupt: logging.warning(ctrl_c) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index f8b836f..8b84d2c 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -329,37 +329,6 @@ class Octosuite: pass else: xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") - - - """ - on_start() - This is the main method, responsible for mapping commands, calling other methods, and catching exceptions - """ - def on_start(self): - self.path_finder() - self.clear_screen() - self.configure_logging() - self.check_updates() - xprint(ascii_banner()[1], ascii_banner()[0]) - - """ - Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C) - or if the 'exit' command is entered. - """ - while True: - xprint(f"{white}┌──({red}{getpass.getuser()}{white}@{red}octosuite{white})\n├──[~{green}{os.getcwd()}{white}]\n└╼ {reset}",end="") - command_input = input().lower() - print("\n") - """ - Iterate over the command_map and check if the user input matches any command in it [command_map], - if there's a match, we return its method. If no match is found, we ignore it. - """ - for command, method in self.command_map: - if command_input == command: - method() - print("\n") - else: - pass # Fetching organization info From 3f1191758b29d66af8db5f9535d1fa9d081725cc Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sun, 25 Dec 2022 02:31:24 +0200 Subject: [PATCH 5/9] Update octosuite.py --- octosuite/octosuite.py | 1220 ++++++++++++++++++++++------------------ 1 file changed, 668 insertions(+), 552 deletions(-) diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index 8b84d2c..d248ccc 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -1,38 +1,228 @@ #!usr/bin/python -import os +import os import sys import shutil import logging import getpass import requests import platform -from rich.text import Text -from rich.tree import Tree -from rich.table import Table +import subprocess from datetime import datetime -from pyreadline3 import Readline -from rich import print as xprint -from octosuite.banners import version_tag, ascii_banner -from octosuite.colors import red, white, green, header_title, reset -from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol +from octosuite.banner import version_tag, banner +from octosuite.config import Tree, Text, Table, Prompt, xprint, create_parser, args, red, white, green, yellow, header_title, reset +from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol # seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :) -from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv -from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, deleted_log, reading_log, reading_csv, deleted_csv, file_downloading, file_downloaded, info_not_found, user_not_found, org_not_found, repo_or_user_not_found, limit_output -from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, log_commits_search # log_org_events +from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, \ + logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv +from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, \ + deleted, reading, file_downloading, file_downloaded, info_not_found, \ + user_not_found, org_not_found, repo_or_user_not_found, limit_output, prompt_log_csv, logging_skipped +from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, \ + log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, \ + log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, \ + log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, \ + log_commits_search -readline = Readline() + +if os.name == "nt": + try: + from pyreadline3 import Readline + except ImportError: + subprocess.run(['pip3', 'install', 'pyreadline3']) + readline = Readline() +else: + import readline + + def completer(text, state): + options = [i for i in commands if i.startswith(text)] + if state < len(options): + return options[state] + else: + return None + + readline.parse_and_bind("tab: complete") + readline.set_completer(completer) + + +# path_finder() +# This function is responsible for creating/checking the availability of the (.logs, output, downloads) folders, +# enabling logging to automatically log network/user activity to a file, and logging the start of a session. +def path_finder(): + """ + Check 3 directories (.logs, output, downloads) on startup + If they exist, ignore, otherwise, create them + """ + directory_list = ['.logs', 'output', 'downloads'] + for directory in directory_list: + os.makedirs(directory, exist_ok=True) + + +# Configure logging to log user activities +def configure_logging(): + now = datetime.now() + now_formatted = now.strftime("%Y-%m-%d %H-%M-%S%p") + logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG) + # Log the start of a session + logging.info(session_opened.format(platform.node(), getpass.getuser())) + + +# Check if the remote tag_name from the latest release matches the one in the program +# if it does, it means the program is up-to-date. +# If it doesn't match, notify the user about a new release +def check_updates(): + response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() + if response['tag_name'] == version_tag: + pass + else: + xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") + + +def list_dir_and_files(): + os.system('dir' if os.name == 'nt' else 'ls') + + +# Delete a specified csv file +def delete_csv(): + if args.csv_file: + csv_file = args.csv_file + else: + csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}") + os.remove(os.path.join("output", csv_file)) + logging.info(deleted.format(csv_file)) + xprint(f"{POSITIVE} {deleted.format(csv_file)}") + + +# Clear csv files +def clear_csv(): + prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue?", choices=['yes', 'no']) + if prompt == "yes": + shutil.rmtree('output', ignore_errors=True) + xprint(f"{INFO} csv files cleared successfully!") + else: + pass + + +# View csv files +def view_csv(): + logging.info(viewing_csv) + csv_files = os.listdir("output") + csv_table = Table(show_header=True, header_style=header_title) + csv_table.add_column("CSV", style="dim") + csv_table.add_column("Size (bytes)") + for csv_file in csv_files: + csv_table.add_row(str(csv_file), str(os.path.getsize("output/" + csv_file))) + xprint(csv_table) + + +# Read csv +def read_csv(): + if args.csv_file: + csv_file = args.csv_file + else: + csv_file = Prompt.ask(f"{green}csv {white}(filename){reset}") + with open(os.path.join("output", csv_file), "r") as file: + logging.info(reading.format(csv_file)) + text = Text(file.read()) + xprint(text) + + +# View logs +def view_logs(): + logging.info(viewing_logs) + logs = os.listdir(".logs") + logs_table = Table(show_header=True, header_style=header_title) + logs_table.add_column("Log", style="dim") + logs_table.add_column("Size (bytes)") + for log in logs: + logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log))) + xprint(logs_table) + + +# Read log +def read_log(): + if args.log_file: + log_file = args.log_file + else: + log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}") + with open(os.path.join(".logs", log_file + ".log"), "r") as log: + logging.info(reading.format(log_file)) + xprint("\n" + log.read()) + + +# Delete log +def delete_log(): + if args.log_file: + log_file = args.log_file + else: + log_file = Prompt.ask(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM){reset}") + os.remove(os.path.join(".logs", log_file)) + logging.info(deleted.format(log_file)) + xprint(f"{POSITIVE} {deleted.format(log_file)}") + + +# Clear logs +def clear_logs(): + prompt = Prompt.ask(f"{PROMPT} This will clear all {len(os.listdir('output'))} log files, continue?", choices=['yes', 'no']) + if prompt == "yes": + shutil.rmtree('.logs', ignore_errors=True) + xprint(f"{INFO} .log files cleared successfully!") + exit() + else: + pass + + +# Exit session +def exit_session(): + exit_prompt = Prompt.ask(f"{PROMPT} This will close the current session, continue?", choices=['yes', 'no']) + if exit_prompt == "yes": + logging.info(session_closed.format(datetime.now())) + xprint(f"{INFO} {session_closed.format(datetime.now())}") + exit() + else: + pass + + +# Clear screen +def clear_screen(): + # Using 'cls' on Windows machines to clear the screen, + # otherwise, use 'clear' + os.system('cls' if os.name == 'nt' else 'clear') + + +def about(): + about_text = f""" + OCTOSUITE © 2023 Richard Mwewa + +An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations. + + +Whats new in v{version_tag}? +[{green}IMPROVED{reset}] Users will now be able to view previous commands with the ↑ ↓ arrow keys (for Windows systems) +[{green}IMPROVED{reset}] Removed width from tables, so that they can auto adjust +[{green}ADDED{reset}] Added the 'ls' command, which will be used to list all files and directories of the specified directory (beta) +[{green}ADDED{reset}] Added the 'cd' command, which will be used to move to a specified directory (beta) +[{green}ADDED{reset}] Added the 'log:clear' command, which will be used to clear all logs +[{green}ADDED{reset}] Added the 'csv:clear' command, which will be used to clear all csv files +[{green}GUI{reset}] The GUI will now come as a standalone executable + +Read the wiki: https://github.com/bellingcat/octosuite/wiki +GitHub REST API documentation: https://docs.github.com/rest +""" + xprint(about_text) class Octosuite: def __init__(self): # API endpoint self.endpoint = 'https://api.github.com' - + # A list of tuples mapping commands to their methods - self.command_map = [("exit", self.exit_session), - ("clear", self.clear_screen), - ("about", self.about), + self.command_map = [('ls', list_dir_and_files), + ("exit", exit_session), + ("clear", clear_screen), + ("about", about), ("author", self.author), ("help", help_command), ("help:source", source_command), @@ -75,266 +265,263 @@ class Octosuite: ("search:issues", self.issues_search), ("search:commits", self.commits_search), ("logs", logs), - ("logs:view", self.view_logs), - ("logs:read", self.read_log), - ("logs:delete", self.delete_log), - ("logs:clear", self.clear_logs), + ("logs:view", view_logs), + ("logs:read", read_log), + ("logs:delete", delete_log), + ("logs:clear", clear_logs), ("csv", csv), - ("csv:view", self.view_csv), - ("csv:read", self.read_csv), - ("csv:delete", self.delete_csv), - ("csv:clear", self.clear_csv)] + ("csv:view", view_csv), + ("csv:read", read_csv), + ("csv:delete", delete_csv), + ("csv:clear", clear_csv)] + + # Arguments map will be used to run Octosuite with argparse + self.argument_map = [("user_profile", self.user_profile), + ("user_repos", self.user_repos), + ("user_gists", self.user_gists), + ("user_orgs", self.user_orgs), + ("user_events", self.user_events), + ("user_subscriptions", self.user_subscriptions), + ("user_following", self.user_following), + ("user_followers", self.user_followers), + ("user_follows", self.user_follows), + ("users_search", self.users_search), + ("issues_search", self.issues_search), + ("commits_search", self.commits_search), + ("topics_search", self.topics_search), + ("repos_search", self.repos_search), + ("org_profile", self.org_profile), + ("org_repos", self.org_repos), + ("org_events", self.org_events), + ("org_member", self.org_member), + ("repo_profile", self.repo_profile), + ("repo_contributors", self.repo_contributors), + ("repo_stargazers", self.repo_stargazers), + ("repo_forks", self.repo_forks), + ("repo_issues", self.repo_issues), + ("repo_releases", self.repo_releases), + ("repo_path_contents", self.path_contents), + ("view_logs", view_logs), + ("read_log", read_log), + ("delete_log", delete_log), + ("clear_logs", clear_logs), + ("view_csv", view_csv), + ("read_csv", read_csv), + ("delete_csv", delete_csv), + ("clear_csv", clear_csv), + ("about", about), + ("author", self.author)] # Path attribute self.path_attrs = ['size', 'type', 'path', 'sha', 'html_url'] # Path attribute dictionary self.path_attr_dict = {'size': 'Size (bytes)', - 'type': 'Type', - 'path': 'Path', - 'sha': 'SHA', - 'html_url': 'URL'} + 'type': 'Type', + 'path': 'Path', + 'sha': 'SHA', + 'html_url': 'URL'} # Organization attributes - self.org_attrs = ['avatar_url', 'login', 'id', 'node_id', 'email', 'description', 'blog', 'location', 'followers', - 'following', 'twitter_username', 'public_gists', 'public_repos', 'type', 'is_verified', - 'has_organization_projects', 'has_repository_projects', 'created_at', 'updated_at'] + self.org_attrs = ['avatar_url', 'login', 'id', 'node_id', 'email', 'description', 'blog', 'location', + 'followers', + 'following', 'twitter_username', 'public_gists', 'public_repos', 'type', 'is_verified', + 'has_organization_projects', 'has_repository_projects', 'created_at', 'updated_at'] # Organization attribute dictionary self.org_attr_dict = {'avatar_url': 'Profile Photo', - 'login': 'Username', - 'id': 'ID', - 'node_id': 'Node ID', - 'email': 'Email', - 'description': 'About', - 'location': 'Location', - 'blog': 'Blog', - 'followers': 'Followers', - 'following': 'Following', - 'twitter_username': 'Twitter handle', - 'public_gists': 'Gists', - 'public_repos': 'Repositories', - 'type': 'Account type', - 'is_verified': 'Is verified?', - 'has_organization_projects': 'Has organization projects?', - 'has_repository_projects': 'Has repository projects?', - 'created_at': 'Created at', - 'updated_at': 'Updated at'} - - + 'login': 'Username', + 'id': 'ID', + 'node_id': 'Node ID', + 'email': 'Email', + 'description': 'About', + 'location': 'Location', + 'blog': 'Blog', + 'followers': 'Followers', + 'following': 'Following', + 'twitter_username': 'Twitter handle', + 'public_gists': 'Gists', + 'public_repos': 'Repositories', + 'type': 'Account type', + 'is_verified': 'Is verified?', + 'has_organization_projects': 'Has organization projects?', + 'has_repository_projects': 'Has repository projects?', + 'created_at': 'Created at', + 'updated_at': 'Updated at'} + # Repository attributes - self.repo_attrs = ['id', 'description', 'forks', 'stargazers_count', 'watchers', 'license', 'default_branch', 'visibility', - 'language', 'open_issues', 'topics', 'homepage', 'clone_url', 'ssh_url', 'fork', 'allow_forking', - 'private', 'archived', 'has_downloads', 'has_issues', 'has_pages', 'has_projects', 'has_wiki', - 'pushed_at', 'created_at', 'updated_at'] + self.repo_attrs = ['id', 'description', 'forks', 'stargazers_count', 'watchers', 'license', 'default_branch', + 'visibility', + 'language', 'open_issues', 'topics', 'homepage', 'clone_url', 'ssh_url', 'fork', + 'allow_forking', + 'private', 'archived', 'has_downloads', 'has_issues', 'has_pages', 'has_projects', + 'has_wiki', + 'pushed_at', 'created_at', 'updated_at'] # Repository attribute dictionary self.repo_attr_dict = {'id': 'ID', - 'description': 'About', - 'forks': 'Forks', - 'stargazers_count': 'Stars', - 'watchers': 'Watchers', - 'license': 'License', - 'default_branch': 'Branch', - 'visibility': 'Visibility', - 'language': 'Language(s)', - 'open_issues': 'Open issues', - 'topics': 'Topics', - 'homepage': 'Homepage', - 'clone_url': 'Clone URL', - 'ssh_url': 'SSH URL', - 'fork': 'Is fork?', - 'allow_forking': 'Is forkable?', - 'private': 'Is private?', - 'archived': 'Is archived?', - 'is_template': 'Is template?', - 'has_wiki': 'Has wiki?', - 'has_pages': 'Has pages?', - 'has_projects': 'Has projects?', - 'has_issues': 'Has issues?', - 'has_downloads': 'Has downloads?', - 'pushed_at': 'Pushed at', - 'created_at': 'Created at', - 'updated_at': 'Updated at'} - - + 'description': 'About', + 'forks': 'Forks', + 'stargazers_count': 'Stars', + 'watchers': 'Watchers', + 'license': 'License', + 'default_branch': 'Branch', + 'visibility': 'Visibility', + 'language': 'Language(s)', + 'open_issues': 'Open issues', + 'topics': 'Topics', + 'homepage': 'Homepage', + 'clone_url': 'Clone URL', + 'ssh_url': 'SSH URL', + 'fork': 'Is fork?', + 'allow_forking': 'Is forkable?', + 'private': 'Is private?', + 'archived': 'Is archived?', + 'is_template': 'Is template?', + 'has_wiki': 'Has wiki?', + 'has_pages': 'Has pages?', + 'has_projects': 'Has projects?', + 'has_issues': 'Has issues?', + 'has_downloads': 'Has downloads?', + 'pushed_at': 'Pushed at', + 'created_at': 'Created at', + 'updated_at': 'Updated at'} + # Repo releases attributes - self.repo_releases_attrs = ['id', 'node_id', 'tag_name', 'target_commitish', 'assets', 'draft', 'prerelease', 'created_at', - 'published_at'] + self.repo_releases_attrs = ['id', 'node_id', 'tag_name', 'target_commitish', 'assets', 'draft', 'prerelease', + 'created_at', + 'published_at'] # Repo releases attribute dictionary self.repo_releases_attr_dict = {'id': 'ID', - 'node_id': 'Node ID', - 'tag_name': 'Tag', - 'target_commitish': 'Branch', - 'assets': 'Assets', - 'draft': 'Is draft?', - 'prerelease': 'Is prerelease?', - 'created_at': 'Created at', - 'published_at': 'Published at'} - - + 'node_id': 'Node ID', + 'tag_name': 'Tag', + 'target_commitish': 'Branch', + 'assets': 'Assets', + 'draft': 'Is draft?', + 'prerelease': 'Is prerelease?', + 'created_at': 'Created at', + 'published_at': 'Published at'} + # Profile attributes - self.profile_attrs = ['avatar_url', 'login', 'id', 'node_id', 'bio', 'blog', 'location', 'followers', 'following', - 'twitter_username', 'public_gists', 'public_repos', 'company', 'hireable', 'site_admin', 'created_at', - 'updated_at'] + self.profile_attrs = ['avatar_url', 'login', 'id', 'node_id', 'bio', 'blog', 'location', 'followers', + 'following', + 'twitter_username', 'public_gists', 'public_repos', 'company', 'hireable', 'site_admin', + 'created_at', + 'updated_at'] # Profile attribute dictionary self.profile_attr_dict = {'avatar_url': 'Profile Photo', - 'login': 'Username', - 'id': 'ID', - 'node_id': 'Node ID', - 'bio': 'Bio', - 'blog': 'Blog', - 'location': 'Location', - 'followers': 'Followers', - 'following': 'Following', - 'twitter_username': 'Twitter Handle', - 'public_gists': 'Gists (public)', - 'public_repos': 'Repositories (public)', - 'company': 'Organization', - 'hireable': 'Is hireable?', - 'site_admin': 'Is site admin?', - 'created_at': 'Joined at', - 'updated_at': 'Updated at'} - - + 'login': 'Username', + 'id': 'ID', + 'node_id': 'Node ID', + 'bio': 'Bio', + 'blog': 'Blog', + 'location': 'Location', + 'followers': 'Followers', + 'following': 'Following', + 'twitter_username': 'Twitter Handle', + 'public_gists': 'Gists (public)', + 'public_repos': 'Repositories (public)', + 'company': 'Organization', + 'hireable': 'Is hireable?', + 'site_admin': 'Is site admin?', + 'created_at': 'Joined at', + 'updated_at': 'Updated at'} + # User attributes self.user_attrs = ['avatar_url', 'id', 'node_id', 'gravatar_id', 'site_admin', 'type', 'html_url'] # User attribute dictionary self.user_attr_dict = {'avatar_url': 'Profile Photo', - 'id': 'ID', - 'node_id': 'Node ID', - 'gravatar_id': 'Gravatar ID', - 'site_admin': 'Is site admin?', - 'type': 'Account type', - 'html_url': 'URL'} - - - # Topic atrributes + 'id': 'ID', + 'node_id': 'Node ID', + 'gravatar_id': 'Gravatar ID', + 'site_admin': 'Is site admin?', + 'type': 'Account type', + 'html_url': 'URL'} + + # Topic attributes self.topic_attrs = ['score', 'curated', 'featured', 'display_name', 'created_by', 'created_at', 'updated_at'] # Topic attribute dictionary self.topic_attr_dict = {'score': 'Score', - 'curated': 'Curated', - 'featured': 'Featured', - 'display_name': 'Display name', - 'created_by': 'Created by', - 'created_at': 'Created at', - 'updated_at': 'Updated at'} - - + 'curated': 'Curated', + 'featured': 'Featured', + 'display_name': 'Display name', + 'created_by': 'Created by', + 'created_at': 'Created at', + 'updated_at': 'Updated at'} + # Gists attribute - self.gists_attrs = ['node_id', 'description', 'comments', 'files', 'git_push_url', 'public', 'truncated', 'updated_at'] + self.gists_attrs = ['node_id', 'description', 'comments', 'files', 'git_push_url', 'public', 'truncated', + 'updated_at'] # Gists attribute dictionary self.gists_attr_dict = {'node_id': 'Node ID', - 'description': 'About', - 'comments': 'Comments', - 'files': 'Files', - 'git_push_url': 'Git Push URL', - 'public': 'Is public?', - 'truncated': 'Is truncated?', - 'updated_at': 'Updated at'} - - + 'description': 'About', + 'comments': 'Comments', + 'files': 'Files', + 'git_push_url': 'Git Push URL', + 'public': 'Is public?', + 'truncated': 'Is truncated?', + 'updated_at': 'Updated at'} + # Issue attributes - self.issue_attrs = ['id', 'node_id', 'score', 'state', 'number', 'comments', 'milestone', 'assignee', 'assignees', 'labels', - 'locked', 'draft', 'closed_at'] + self.issue_attrs = ['id', 'node_id', 'score', 'state', 'number', 'comments', 'milestone', 'assignee', + 'assignees', 'labels', + 'locked', 'draft', 'closed_at'] # Issue attribute dict self.issue_attr_dict = {'id': 'ID', - 'node_id': 'Node ID', - 'score': 'Score', - 'state': 'State', - 'closed_at': 'Closed at', - 'number': 'Number', - 'comments': 'Comments', - 'milestone': 'Milestone', - 'assignee': 'Assignee', - 'assignees': 'Assignees', - 'labels': 'Labels', - 'draft': 'Is draft?', - 'locked': 'Is locked?', - 'created_at': 'Created at'} - + 'node_id': 'Node ID', + 'score': 'Score', + 'state': 'State', + 'closed_at': 'Closed at', + 'number': 'Number', + 'comments': 'Comments', + 'milestone': 'Milestone', + 'assignee': 'Assignee', + 'assignees': 'Assignees', + 'labels': 'Labels', + 'draft': 'Is draft?', + 'locked': 'Is locked?', + 'created_at': 'Created at'} + # Repo issues attributes self.repo_issues_attrs = ['id', 'node_id', 'state', 'reactions', 'number', 'comments', 'milestone', 'assignee', - 'active_lock_reason', 'author_association', 'assignees', 'labels', 'locked', 'closed_at', - 'created_at', 'updated_at'] + 'active_lock_reason', 'author_association', 'assignees', 'labels', 'locked', + 'closed_at', + 'created_at', 'updated_at'] # Issue attribute dict self.repo_issues_attr_dict = {'id': 'ID', - 'node_id': 'Node ID', - 'number': 'Number', - 'state': 'State', - 'reactions': 'Reactions', - 'comments': 'Comments', - 'milestone': 'Milestone', - 'assignee': 'Assignee', - 'assignees': 'Assignees', - 'author_association': 'Author association', - 'labels': 'Labels', - 'locked': 'Is locked?', - 'active_lock_reason': 'Lock reason', - 'closed_at': 'Closed at', - 'created_at': 'Created at', - 'updated_at': 'Updated at'} - - + 'node_id': 'Node ID', + 'number': 'Number', + 'state': 'State', + 'reactions': 'Reactions', + 'comments': 'Comments', + 'milestone': 'Milestone', + 'assignee': 'Assignee', + 'assignees': 'Assignees', + 'author_association': 'Author association', + 'labels': 'Labels', + 'locked': 'Is locked?', + 'active_lock_reason': 'Lock reason', + 'closed_at': 'Closed at', + 'created_at': 'Created at', + 'updated_at': 'Updated at'} + # User organizations attributes self.user_orgs_attrs = ['avatar_url', 'id', 'node_id', 'url', 'description'] self.user_orgs_attr_dict = {'avatar_url': 'Profile Photo', - 'id': 'ID', - 'node_id': 'Node ID', - 'url': 'URL', - 'description': 'About'} - + 'id': 'ID', + 'node_id': 'Node ID', + 'url': 'URL', + 'description': 'About'} + # Author dictionary self.author_dict = {'Alias': 'rly0nheart', - 'Country': ':zambia: Zambia, Africa', - 'About.me': 'https://about.me/rly0nheart', - 'Buy Me A Coffee': 'https://buymeacoffee.com/189381184'} - - - """ - path_finder() - This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders, - enabling logging to automatically log network/user activity to a file, - and logging the start of a session. - """ - def path_finder(self): - """ - Check 3 directories (.logs, output, downloads) on startup - If they exist, ignore, otherwise, create them - """ - directory_list = ['.logs', 'output', 'downloads'] - for directory in directory_list: - os.makedirs(directory, exist_ok=True) - - - """ - Configure logging and check for updates - """ - def configure_logging(self): - """ - Configure logging to log activities to a file, which will be named by the date and time a session was opened. - """ - now = datetime.now() - now_formatted = now.strftime("%Y-%m-%d %H-%M-%S%p") - logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG) - # Log the start of a session - logging.info(session_opened.format(platform.node(), getpass.getuser())) - - - # Check for updates - def check_updates(self): - response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() - if response['tag_name'] == version_tag: - """ - Ignore if the program is up to date - """ - pass - else: - xprint(f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") - - + 'Country': ':zambia: Zambia, Africa', + 'About.me': 'https://about.me/rly0nheart', + 'Buy Me A Coffee': 'https://buymeacoffee.com/189381184'} + # Fetching organization info def org_profile(self): - xprint(f"{white}@{green}Organization {white}(username):{reset} ", end="") - organization = input() + if args.organization: + organization = args.organization + else: + organization = Prompt.ask(f"{white}@{green}Organization{reset}") response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -346,12 +533,13 @@ class Octosuite: log_org_profile(response) else: xprint(response.json()) - - + # Fetching user information def user_profile(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() + if args.username: + username = args.username + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -360,17 +548,21 @@ class Octosuite: for attr in self.profile_attrs: user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}") xprint(user_profile_tree) - log_user_profile(response) + + # Logging output to a csv file + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_profile(response) else: xprint(response.json()) - - + # Fetching repository information def repo_profile(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username) ", end="") - username = input() + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -379,19 +571,22 @@ class Octosuite: for attr in self.repo_attrs: repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}") xprint(repo_profile_tree) - log_repo_profile(response) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_profile(response) else: xprint(response.json()) - - + # Get path contents def path_contents(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username) ", end="") - username = input() - xprint(f"{white}>> ~{green}/path/name{reset} ", end="") - path_name = input() + if args.repository and args.username and args.path_name: + repo_name = args.repository + username = args.username + path_name = args.path_name + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + path_name = Prompt.ask("~/path/name ") response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}") if response.status_code == 404: xprint(f"{NEGATIVE} {info_not_found.format(repo_name, username, path_name)}") @@ -402,19 +597,20 @@ class Octosuite: path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}") xprint(path_contents_tree) log_repo_path_contents(content, repo_name) - xprint(INFO, f"Found {content_count} file(s) in {repo_name}/{path_name}.") + xprint(INFO, f"Found {content_count} file(s) in {repo_name}/{path_name}.") else: xprint(response.json()) - - + # repo contributors def repo_contributors(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username) ", end="") - username = input() - xprint(PROMPT, limit_output.format("contributors"), end="") - limit = int(input()) + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("contributors")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -424,19 +620,22 @@ class Octosuite: for attr in self.user_attrs: contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}") xprint(contributor_tree) - log_repo_contributors(contributor, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_contributors(contributor, repo_name) else: xprint(response.json()) - - + # repo stargazers def repo_stargazers(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("repository stargazers"), end="") - limit = int(input()) + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("stargazers")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -448,19 +647,22 @@ class Octosuite: for attr in self.user_attrs: stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}") xprint(stargazer_tree) - log_repo_stargazers(stargazer, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_stargazers(stargazer, repo_name) else: xprint(response.json()) - - + # repo forks def repo_forks(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}@{green}Owner{white} (username):{reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("repository forks"), end="") - limit = int(input()) + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("forks")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") @@ -472,22 +674,26 @@ class Octosuite: for attr in self.repo_attrs: fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}") xprint(fork_tree) - log_repo_forks(fork, count) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_forks(fork, count) else: xprint(response.json()) - + # Repo issues def repo_issues(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("repository issues"), end="") - limit = int(input()) + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("issues")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") - elif response.json() == []: + elif not response.json(): xprint(f"{NEGATIVE} Repository does not have open issues -> ({repo_name})") elif response.status_code == 200: for issue in response.json(): @@ -499,20 +705,21 @@ class Octosuite: log_repo_issues(issue, repo_name) else: xprint(response.json()) - - + # Repo releases def repo_releases(self): - xprint(f"{white}>> %{green}Repository{reset} ", end="") - repo_name = input() - xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("repository releases"), end="") - limit = int(input()) + if args.repository and args.username and args.limit: + repo_name = args.repository + username = args.username + limit = args.limit + else: + repo_name = Prompt.ask(f"{white}%{green}Repository{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("repository releases")) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") - elif response.json() == []: + elif not response.json(): xprint(f"{NEGATIVE} Repository does not have releases -> ({repo_name})") elif response.status_code == 200: for release in response.json(): @@ -521,17 +728,20 @@ class Octosuite: releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}") xprint(releases_tree) xprint(release['body']) - log_repo_releases(release, repo_name) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repo_releases(release, repo_name) else: xprint(response.json()) - - + # Fetching organization repositories def org_repos(self): - xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") - organization = input() - xprint(PROMPT, limit_output.format("organization repositories"), end="") - limit = int(input()) + if args.organization and args.limit: + organization = args.organization + limit = args.limit + else: + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + limit = Prompt.ask(limit_output.format("organization repositories")) response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -541,17 +751,20 @@ class Octosuite: for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - log_org_repos(repository, organization) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_org_repos(repository, organization) else: xprint(response.json()) - - + # organization events def org_events(self): - xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") - organization = input() - xprint(PROMPT, limit_output.format("organization repositories"), end="") - limit = int(input()) + if args.organization and args.limit: + organization = args.organization + limit = args.limit + else: + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + limit = Prompt.ask(limit_output.format("organization events")) response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {org_not_found.format(organization)}") @@ -565,27 +778,29 @@ class Octosuite: # log_org_events(event, organization) else: xprint(response.json()) - - + # organization member def org_member(self): - xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") - organization = input() - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() + if args.organization and args.username: + organization = args.organization + username = args.username + else: + organization = Prompt.ask(f"{white}@{green}Organization{reset}") + username = Prompt.ask(f"{white}@{green}Username{reset}") response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: xprint(f"{POSITIVE} User ({username}) is a public member of the organization -> ({organization})") else: xprint(f"{NEGATIVE} {response.json()['message']}") - - + # Fetching user repositories def user_repos(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("repositories"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("repositories")) response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -595,19 +810,22 @@ class Octosuite: for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - log_user_repos(repository, username) - else: - xprint(response.json()) - - + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_repos(repository, username) + else: + xprint(response.json()) + # Fetching user's gists def user_gists(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format('gists'), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format('gists')) response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}") - if response.json() == []: + if not response.json(): xprint(f"{NEGATIVE} User does not have gists.") elif response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -617,19 +835,22 @@ class Octosuite: for attr in self.gists_attrs: gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}") xprint(gists_tree) - log_user_gists(gist) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_gists(gist) else: xprint(response.json()) - - + # Fetching a list of organizations that a user owns or belongs to def user_orgs(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("user organizations"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user organizations")) response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}") - if response.json() == []: + if not response.json(): xprint(f"{NEGATIVE} User ({username}) does not (belong to/own) any organizations.") elif response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -639,17 +860,20 @@ class Octosuite: for attr in self.user_orgs_attrs: org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}") xprint(org_tree) - log_user_orgs(organization, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_orgs(organization, username) else: xprint(response.json()) - - + # Fetching a users events def user_events(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input() - xprint(PROMPT, limit_output.format("events"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("events")) response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}") if response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -665,38 +889,42 @@ class Octosuite: log_user_events(event) else: xprint(response.json()) - - + # Fetching a target user's subscriptions def user_subscriptions(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input().lower() - xprint(PROMPT, limit_output.format("user subscriptions"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user subscriptions")) response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}") - if response.json() == []: + if not response.json(): xprint(f"{NEGATIVE} User does not have any subscriptions.") elif response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for repository in response.json(): - subscriptions_tree =Tree("\n" + repository['full_name']) + subscriptions_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(subscriptions_tree) - log_user_subscriptions(repository, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_subscriptions(repository, username) else: xprint(response.json()) - - + # Fetching a list of users the target follows def user_following(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input().lower() - xprint(PROMPT, limit_output.format("user' following"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user' following")) response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}") - if response.json() == []: + if not response.json(): xprint(f"{NEGATIVE} User ({username})does not follow anyone.") elif response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -706,19 +934,22 @@ class Octosuite: for attr in self.user_attrs: following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(following_tree) - log_user_following(user, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_following(user, username) else: xprint(response.json()) - - + # Fetching user's followers def user_followers(self): - xprint(f"{white}@{green}username:{reset} ", end="") - username = input().lower() - xprint(PROMPT, limit_output.format("user followers"), end="") - limit = int(input()) + if args.username and args.limit: + username = args.username + limit = args.limit + else: + username = Prompt.ask(f"{white}@{green}Username{reset}") + limit = Prompt.ask(limit_output.format("user followers")) response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}") - if response.json() == []: + if not response.json(): xprint(f"{NEGATIVE} User ({username})does not have followers.") elif response.status_code == 404: xprint(f"{NEGATIVE} {user_not_found.format(username)}") @@ -728,75 +959,88 @@ class Octosuite: for attr in self.user_attrs: followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}") xprint(followers_tree) - log_user_followers(follower, username) + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_user_followers(follower, username) else: xprint(response.json()) - - + # Checking whether user[A] follows user[B] def user_follows(self): - xprint(f"{white}@{green}user{white}(A) (username):{reset} ", end="") - user_a = input() - xprint(f"{white}@{green}user{white}(B) (username):{reset} ", end="") - user_b = input() + if args.username and args.username_b: + user_a = args.username + user_b = args.username_b + else: + user_a = Prompt.ask(f"{white}@{green}User_A{reset}") + user_b = Prompt.ask(f"{white}@{green}User_B{reset}") response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: xprint(f"{POSITIVE} @{user_a} FOLLOWS @{user_b}") else: xprint(f"{NEGATIVE} @{user_a} DOES NOT FOLLOW @{user_b}") - - + # User search def users_search(self): - xprint(f"{white}@{green}query{white} (eg. john):{reset} ", end="") - query = input() - xprint(PROMPT, limit_output.format("user search"), end="") - limit = int(input()) + if args.query and args.limit and args.limit: + query = args.query + limit = args.limit + else: + query = Prompt.ask(f"{white}@{green}Username{reset} (search)") + limit = Prompt.ask(limit_output.format("user search")) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: users_search_tree = Tree("\n" + user['login']) for attr in self.user_attrs: users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(users_search_tree) - log_users_search(user, query) - + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_users_search(user, query) + # Repository search def repos_search(self): - xprint(f"{white}%{green}query{white} (eg. git):{reset} ", end="") - query = input() - xprint(PROMPT, limit_output.format("repositor[y][ies] search"), end="") - limit = int(input()) + if args.query and args.limit: + query = args.query + limit = args.limit + else: + query = Prompt.ask(f"{white}%{green}Repository{reset} (search)") + limit = Prompt.ask(limit_output.format("repositor[y][ies] search")) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: repos_search_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_search_tree) - log_repos_search(repository, query) - + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_repos_search(repository, query) + # Topics search def topics_search(self): - xprint(f"{white}#{green}query{white} (eg. osint):{reset} ", end="") - query = input() - xprint(PROMPT, limit_output.format("topic(s) search"), end="") - limit = int(input()) + if args.query and args.limit: + query = args.query + limit = args.limit + else: + query = Prompt.ask(f"{white}:{green}Topics{reset} (search)") + limit = Prompt.ask(limit_output.format("topic(s) search")) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: topics_search_tree = Tree("\n" + topic['name']) for attr in self.topic_attrs: topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") xprint(topics_search_tree) - log_topics_search(topic, query) - + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_topics_search(topic, query) + # Issue search def issues_search(self): - xprint(f"{white}!{green}Query{white} (eg. error):{reset} ", end="") - query = input() - xprint(PROMPT, limit_output.format("issue(s) search"), end="") - limit = int(input()) + if args.query and args.limit: + query = args.query + limit = args.limit + else: + query = Prompt.ask(f"{white}!{green}Issues{reset} (search)") + limit = Prompt.ask(limit_output.format("issue(s) search")) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: issues_search_tree = Tree("\n" + issue['title']) @@ -804,15 +1048,18 @@ class Octosuite: issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_search_tree) xprint(issue['body']) - log_issues_search(issue, query) - - + + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_issues_search(issue, query) + # Commits search def commits_search(self): - xprint(f"{white}:{green}Query{white} (eg. filename:index.php):{reset} ", end="") - query = input() - xprint(PROMPT, limit_output.format("commit(s) search"), end="") - limit = int(input()) + if args.query and args.limit: + query = args.query + limit = args.limit + else: + query = Prompt.ask(f"{white};{green}Commits{reset} (search)") + limit = Prompt.ask(limit_output.format("commit(s) search")) response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json() for commit in response['items']: commits_search_tree = Tree("\n" + commit['commit']['tree']['sha']) @@ -824,94 +1071,10 @@ class Octosuite: commits_search_tree.add(f"URL: {commit['html_url']}") xprint(commits_search_tree) xprint(commit['commit']['message']) - log_commits_search(commit, query) - - # View csv files - def view_csv(self): - logging.info(viewing_csv) - csv_files = os.listdir("output") - csv_table = Table(show_header=True, header_style=header_title) - csv_table.add_column("CSV", style="dim") - csv_table.add_column("Size (bytes)") - for csv_file in csv_files: - csv_table.add_row(str(csv_file), str(os.path.getsize("output/" + csv_file))) - xprint(csv_table) - - - # Read a specified csv file - def read_csv(self): - xprint(f"{green}csv {white}(filename):{reset} ", end="") - csv_file = input() - with open(os.path.join("output", csv_file + ".csv"), "r") as file: - logging.info(reading_csv.format(csv_file)) - text = Text(file.read()) - xprint(text) - - - # Delete a specified csv file - def delete_csv(self): - xprint(f"{green}csv {white}(filename):{reset} ", end="") - csv_file = input() - os.remove(os.path.join("output", csv_file)) - logging.info(deleted_csv.format(csv_file)) - xprint(f"{POSITIVE} {deleted_csv.format(csv_file)}") - - - # Clear csv - def clear_csv(self): - xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") - prompt = input().lower() - if prompt == "yes": - shutil.rmtree("output", ignore_errors=True) - xprint(f"{INFO} CSV files cleared successfully!") - else: - pass - - - # View octosuite log files - def view_logs(self): - logging.info(viewing_logs) - logs = os.listdir(".logs") - logs_table = Table(show_header=True, header_style=header_title) - logs_table.add_column("Log", style="dim") - logs_table.add_column("Size (bytes)") - for log in logs: - logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log))) - xprint(logs_table) - - - # Read a specified log file - def read_log(self): - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() - with open(os.path.join(".logs", log_file + ".log"), "r") as log: - logging.info(reading_log.format(log_file)) - xprint("\n" + log.read()) - - - # Delete a specified log file - def delete_log(self): - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() - os.remove(os.path.join(".logs", log_file)) - logging.info(deleted_log.format(log_file)) - xprint(f"{POSITIVE} {deleted_log.format(log_file)}") - - - # Clear logs - def clear_logs(self): - xprint(f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") - prompt = input().lower() - if prompt == "yes": - shutil.rmtree(".logs", ignore_errors=True) - xprint(f"{INFO} Logs cleared successfully!") - xprint(f"{INFO} {session_closed.format(datetime.now())}") - exit() - else: - pass - - + if args.log_csv or Prompt.ask(f"{PROMPT} {prompt_log_csv}", choices=['yes', 'no']) == "yes": + log_commits_search(commit, query) + # Downloading release tarball def download_tarball(self): logging.info(file_downloading.format(f"octosuite.v{version_tag}.tar")) @@ -920,12 +1083,11 @@ class Octosuite: with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file: file.write(data.content) file.close() - + logging.info(file_downloaded.format(f"octosuite.v{version_tag}.tar")) xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.tar")) - - - # Downloading release zipball + + # Downloading release zip ball def download_zipball(self): logging.info(file_downloading.format(f"octosuite.v{version_tag}.zip")) xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.zip")) @@ -933,59 +1095,13 @@ class Octosuite: with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file: file.write(data.content) file.close() - + logging.info(file_downloaded.format(f"octosuite.v{version_tag}.zip")) xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.zip")) - - + # Author info def author(self): author_tree = Tree(f"{white}Richard Mwewa (Ritchie){reset}") - for author_key, author_value in self.author_dict.items(self): + for author_key, author_value in self.author_dict.items(): author_tree.add(f"{white}{author_key}:{reset} {author_value}") xprint(author_tree) - - - # About program - def about(self): - about_text = f""" - OCTOSUITE © 2023 Richard Mwewa - -An advanced and lightning fast framework for gathering open-source intelligence on GitHub users and organizations. -With over 20+ features, Octosuite only runs on 2 external dependencies, and returns the gathered intelligence in a highly readable format. - - -Whats new in v{version_tag}? -[{green}IMPROVED{reset}] Users will now be able to view previous commands with the ↑ ↓ arrow keys -[{green}IMPROVED{reset}] Removed width from tables, so that they can auto adjust -[{green}ADDED{reset}] Added the 'log:clear' command, which will be used to clear all logs -[{green}ADDED{reset}] Added the 'csv:clear' command, which will be used to clear all csv files -[{green}GUI{reset}] The GUI will now come as a standalone executable -[{green}GUI{reset}] Added an option to check for updates - -Read the wiki: https://github.com/bellingcat/octosuite/wiki -GitHub REST API documentation: https://docs.github.com/rest -""" - xprint(about_text) - - - # Close session - def exit_session(self): - xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="") - prompt = input().lower() - if prompt == "yes": - logging.info(session_closed.format(datetime.now())) - xprint(f"{INFO} {session_closed.format(datetime.now())}") - exit() - else: - pass - - - # Clear screen - def clear_screen(self): - """ - using 'cls' on Windows machines to clear the screen, - otherwise, use 'clear' - """ - os.system('cls' if os.name == 'nt' else 'clear') - From e87560939fedac05ac16f1162c5769e6fd2fa169 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sun, 25 Dec 2022 02:32:28 +0200 Subject: [PATCH 6/9] Update main.py --- octosuite/main.py | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/octosuite/main.py b/octosuite/main.py index fe6fea9..b09bd73 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -1,34 +1,46 @@ # import everything from the octosuite.py file -from octosuite.octosuite import * # I drifted away from the 'pythonic way' here +from octosuite.octosuite import * # I drifted away from the 'pythonic way' here def octosuite(): try: run = Octosuite() - run.path_finder() - run.clear_screen() - run.configure_logging() - run.check_updates() - xprint(ascii_banner()[1], ascii_banner()[0]) - + path_finder() + clear_screen() + configure_logging() + check_updates() + if args: + """ + Iterate over the argument_map and check if the passed command line argument matches any argument in it [argument_map], + if there's a match, we return its method. If no match is found, we do nothing (which will return the usage). + """ + for argument, method in run.argument_map: + if args.method == argument: + method() + print("\n") + """ Main loop keeps octosuite running, this will break if Octosuite detects a KeyboardInterrupt (Ctrl+C) or if the 'exit' command is entered. """ + xprint(banner()[0], banner()[1]) while True: - xprint(f"{white}┌──({red}{getpass.getuser()}{white}@{red}octosuite{white})\n├──[~{green}{os.getcwd()}{white}]\n└╼ {reset}",end="") - command_input = input().lower() - print("\n") + command_input = Prompt.ask(f"{white}┌──({red}{getpass.getuser()}{white}@{red}octosuite{white})\n├──[~{green}{os.getcwd()}{white}]\n└╼ {reset}") """ Iterate over the command_map and check if the user input matches any command in it [command_map], if there's a match, we return its method. If no match is found, we ignore it. """ - for command, method in run.command_map: - if command_input == command: - method() - print("\n") - else: - pass + if command_input[:2] == 'cd': + os.chdir(command_input[3:]) + elif command_input[:2] == 'ls': + os.system(f'dir {command_input[3:]}' if os.name == 'nt' else f'ls {command_input[3:]}') + else: + for command, method in run.command_map: + if command_input == command: + method() + print("\n") + else: + pass except KeyboardInterrupt: logging.warning(ctrl_c) From ead76074f54a0ce7844bce2a8bb2a06095d07af8 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sun, 25 Dec 2022 02:33:22 +0200 Subject: [PATCH 7/9] Update message_prefixes.py --- octosuite/message_prefixes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/octosuite/message_prefixes.py b/octosuite/message_prefixes.py index 98e98aa..9b71391 100644 --- a/octosuite/message_prefixes.py +++ b/octosuite/message_prefixes.py @@ -1,4 +1,4 @@ -from octosuite.colors import red, white, green, reset +from octosuite.config import red, white, green, yellow, reset """ message prefixes that show what @@ -6,8 +6,8 @@ a notification in OctoSuite might be all about. This might not be very important but I think it's better to know the severity of the notifications you get in a program. """ PROMPT = f"{white}[{green}PROMPT{white}]{reset}" -WARNING = f"{white}[{red}WARNING{white}]{reset}" +WARNING = f"{white}[{yellow}WARNING{white}]{reset}" ERROR = f"{white}[{red}ERROR{white}]{reset}" POSITIVE = f"{white}[{green}POSITIVE{white}]{reset}" NEGATIVE = f"{white}[{red}NEGATIVE{white}]{reset}" -INFO = f"{white}[{green}INFO{white}]{reset}" \ No newline at end of file +INFO = f"{white}[{green}INFO{white}]{reset}" From 9eec2e323da62b06b1f3234bb7a4935477a00c8b Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sun, 25 Dec 2022 02:33:43 +0200 Subject: [PATCH 8/9] Update log_roller.py --- octosuite/log_roller.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/octosuite/log_roller.py b/octosuite/log_roller.py index 749be36..c143844 100644 --- a/octosuite/log_roller.py +++ b/octosuite/log_roller.py @@ -8,17 +8,15 @@ session_opened = "Opened new session on {}:{}" session_closed = "Session closed at {}." viewing_logs = "Viewing logs" viewing_csv = "Viewing CSV file(s)..." -deleted_log = "Deleted log: {}" -reading_log = "Reading log: {}" -reading_csv = 'Reading csv: {}' -deleted_csv = 'Deleted csv: {}' +deleted = "Deleted: {}" +reading = "Reading: {}" file_downloading = "Downloading: {}" file_downloaded = "Downloaded: downloads/{}" info_not_found = "Information not found: {}, {}, {}" user_not_found = "User not found: @{}" org_not_found = "Organization not found: @{}" repo_or_user_not_found = "Repository or User not found: {}, @{}" -prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) " +prompt_log_csv = "Do you wish to log this output to a CSV file?" logged_to_csv = "Output logged: {}" logging_skipped = "Logging skipped: {}" -limit_output = "Limit '{}' output to how many? (1-100) " +limit_output = "Limit '{}' output to how many? (1-100)" From d907167713da2073e5c071eb6311aaad9d5a1c5a Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sun, 25 Dec 2022 02:46:18 +0200 Subject: [PATCH 9/9] Update csv_loggers.py --- octosuite/csv_loggers.py | 419 +++++++++++++-------------------------- 1 file changed, 139 insertions(+), 280 deletions(-) diff --git a/octosuite/csv_loggers.py b/octosuite/csv_loggers.py index b46901f..ef39394 100644 --- a/octosuite/csv_loggers.py +++ b/octosuite/csv_loggers.py @@ -21,22 +21,16 @@ def log_org_profile(response): response.json()['is_verified'], response.json()['has_organization_projects'], response.json()['has_repository_projects'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_profile_fields) - write_csv.writerow(org_profile_row) + + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_profile_fields) + write_csv.writerow(org_profile_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Creating a .csv file of a user' profile def log_user_profile(response): user_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Bio', 'Blog', 'Location', 'Followers', @@ -49,21 +43,15 @@ def log_user_profile(response): response.json()['public_gists'], response.json()['public_repos'], response.json()['company'], response.json()['hireable'], response.json()['site_admin'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['login']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_profile_fields) - write_csv.writerow(user_profile_row) + + with open(os.path.join("output", f"{response.json()['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_profile_fields) + write_csv.writerow(user_profile_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - # create .csv for repository profile def log_repo_profile(response): @@ -81,66 +69,47 @@ def log_repo_profile(response): response.json()['has_pages'], response.json()['has_projects'], response.json()['has_issues'], response.json()['has_downloads'], response.json()['pushed_at'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_profile_fields) - write_csv.writerow(repo_profile_row) - + + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_profile_fields) + write_csv.writerow(repo_profile_row) + logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - + # create .csv for repository path contents def log_repo_path_contents(content, repo_name): path_content_fields = ['Filename', 'Size (bytes)', 'Type', 'Path', 'SHA', 'URL'] path_content_row = [content['name'], content['size'], content['type'], content['path'], content['sha'], content['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{content['name']}_content_from_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(path_content_fields) - write_csv.writerow(path_content_row) + + with open(os.path.join("output", f"{content['name']}_content_from_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(path_content_fields) + write_csv.writerow(path_content_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # create .csv for repository stargazer def log_repo_stargazers(stargazer, repo_name): user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_follower_row = [stargazer['avatar_url'], stargazer['login'], stargazer['id'], stargazer['node_id'], stargazer['gravatar_id'], stargazer['type'], stargazer['site_admin'], stargazer['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{stargazer['login']}_stargazer_of_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_follower_fields) - write_csv.writerow(user_follower_row) + + with open(os.path.join("output", f"{stargazer['login']}_stargazer_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - -# create .csv for repository forks + # create .csv for repository forks def log_repo_forks(fork, count): repo_fork_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', @@ -152,22 +121,16 @@ def log_repo_forks(fork, count): fork['fork'], fork['allow_forking'], fork['private'], fork['archived'], fork['is_template'], fork['has_wiki'], fork['has_pages'], fork['has_projects'], fork['has_issues'], fork['has_downloads'], fork['pushed_at'], fork['created_at'], fork['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{fork['name']}_fork_{count}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_fork_fields) - write_csv.writerow(repo_fork_row) + + with open(os.path.join("output", f"{fork['name']}_fork_{count}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_fork_fields) + write_csv.writerow(repo_fork_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # create .csv for repository issues def log_repo_issues(issue, repo_name): repo_issue_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', @@ -177,22 +140,16 @@ def log_repo_issues(issue, repo_name): issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repo_name}_issue_{issue['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_issue_fields) - write_csv.writerow(repo_issue_row) + + with open(os.path.join("output", f"{repo_name}_issue_{issue['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_issue_fields) + write_csv.writerow(repo_issue_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # create .csv for repository releases def log_repo_releases(release, repo_name): repo_release_fields = ['Name', 'ID', 'Node ID', 'Tag', 'Branch', 'Assets', 'Is draft?', 'Is prerelease?', @@ -200,22 +157,16 @@ def log_repo_releases(release, repo_name): repo_release_row = [release['name'], release['id'], release['node_id'], release['tag_name'], release['target_commitish'], release['assets'], release['draft'], release['prerelease'], release['created_at'], release['published_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repo_name}_release_{release['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_release_fields) - write_csv.writerow(repo_release_row) + + with open(os.path.join("output", f"{repo_name}_release_{release['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_release_fields) + write_csv.writerow(repo_release_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv file for repository contributors def log_repo_contributors(contributor, repo_name): repo_contributor_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', @@ -223,42 +174,30 @@ def log_repo_contributors(contributor, repo_name): repo_contributor_row = [contributor['avatar_url'], contributor['login'], contributor['id'], contributor['node_id'], contributor['gravatar_id'], contributor['type'], contributor['site_admin'], contributor['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{contributor['login']}_contributor_of_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_contributor_fields) - write_csv.writerow(repo_contributor_row) + + with open(os.path.join("output", f"{contributor['login']}_contributor_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_contributor_fields) + write_csv.writerow(repo_contributor_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - print(f"{INFO} {logging_skipped}\n") - - + # Create .csv for organization' events def log_repo_events(event, organization): org_event_fields = ['ID', 'Type', 'Created at', 'Payload'] org_event_row = [event['id'], event['type'], event['created_at'], event['payload']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{organization}_event_{event['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_event_fields) - write_csv.writerow(org_event_row) + + with open(os.path.join("output", f"{organization}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_event_fields) + write_csv.writerow(org_event_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv for organization' repositories def log_org_repos(repository, organization): org_repo_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', @@ -273,22 +212,16 @@ def log_org_repos(repository, organization): repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_repository_of_{organization}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_repo_fields) - write_csv.writerow(org_repo_row) + + with open(os.path.join("output", f"{repository['name']}_repository_of_{organization}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_repo_fields) + write_csv.writerow(org_repo_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user' repositories def log_user_repos(repository, username): user_repo_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', @@ -303,109 +236,79 @@ def log_user_repos(repository, username): repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_repo_fields) - write_csv.writerow(user_repo_row) + + with open(os.path.join("output", f"{repository['name']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_repo_fields) + write_csv.writerow(user_repo_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user events def log_user_events(event): user_event_fields = ['Actor', 'Type', 'Repository', 'Created at', 'Payload'] user_event_row = [event['actor']['login'], event['type'], event['repo']['name'], event['created_at'], event['payload']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{event['actor']['login']}_event_{event['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_event_fields) - write_csv.writerow(user_event_row) + + with open(os.path.join("output", f"{event['actor']['login']}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_event_fields) + write_csv.writerow(user_event_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user gists def log_user_gists(gist): user_gist_fields = ['ID', 'Node ID', 'About', 'Comments', 'Files', 'Git Push URL', 'Is public?', 'Is truncated?', 'Updated at'] user_gist_row = [gist['id'], gist['node_id'], gist['description'], gist['comments'], gist['files'], gist['git_push_url'], gist['public'], gist['truncated'], gist['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{gist['id']}_gists_{gist['owner']['login']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_gist_fields) - write_csv.writerow(user_gist_row) + + with open(os.path.join("output", f"{gist['id']}_gists_{gist['owner']['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_gist_fields) + write_csv.writerow(user_gist_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user followers def log_user_followers(follower, username): user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_follower_row = [follower['avatar_url'], follower['login'], follower['id'], follower['node_id'], follower['gravatar_id'], follower['type'], follower['site_admin'], follower['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(f"output/{follower['login']}_follower_of_{username}.csv", 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_follower_fields) - write_csv.writerow(user_follower_row) + + with open(f"output/{follower['login']}_follower_of_{username}.csv", 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user following def log_user_following(user, username): user_following_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_following_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{user['login']}_followed_by_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_following_fields) - write_csv.writerow(user_following_row) + + with open(os.path.join("output", f"{user['login']}_followed_by_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_following_fields) + write_csv.writerow(user_following_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user' subscriptions def log_user_subscriptions(repository, username): user_subscription_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', @@ -422,65 +325,47 @@ def log_user_subscriptions(repository, username): repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{username}_subscriptions_{repository['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_subscription_fields) - write_csv.writerow(user_subscription_row) + + with open(os.path.join("output", f"{username}_subscriptions_{repository['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_subscription_fields) + write_csv.writerow(user_subscription_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # .csv for user organizations def log_user_orgs(organization, username): user_org_fields = ['Profile photo', 'Name', 'ID', 'Node ID', 'URL', 'About'] user_org_row = [organization['avatar_url'], organization['login'], organization['id'], organization['node_id'], organization['url'], organization['description']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{organization['login']}_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_org_fields) - write_csv.writerow(user_org_row) + + with open(os.path.join("output", f"{organization['login']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_org_fields) + write_csv.writerow(user_org_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv for user search def log_users_search(user, query): user_search_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] user_search_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{user['login']}_user_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_search_fields) - write_csv.writerow(user_search_row) + + with open(os.path.join("output", f"{user['login']}_user_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_search_fields) + write_csv.writerow(user_search_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv for repository search def log_repos_search(repository, query): repo_search_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', @@ -496,46 +381,32 @@ def log_repos_search(repository, query): repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_repository_search_result_for_{query}.csv"), - 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_search_fields) - write_csv.writerow(repo_search_row) + + with open(os.path.join("output", f"{repository['name']}_repository_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_search_fields) + write_csv.writerow(repo_search_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - # Create .csv for topic search - - + +# Create .csv for topic search def log_topics_search(topic, query): topic_search_fields = ['Name', 'Score', 'Curated', 'Featured', 'Display name', 'Created by', 'Created at', 'Updated at'] topic_search_row = [topic['name'], topic['score'], topic['curated'], topic['featured'], topic['display_name'], topic['created_by'], topic['created_at'], topic['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{topic['name']}_topic_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(topic_search_fields) - write_csv.writerow(topic_search_row) + + with open(os.path.join("output", f"{topic['name']}_topic_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(topic_search_fields) + write_csv.writerow(topic_search_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv for issues search def log_issues_search(issue, query): issue_search_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', @@ -545,40 +416,28 @@ def log_issues_search(issue, query): issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{issue['id']}_issue_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(issue_search_fields) - write_csv.writerow(issue_search_row) + + with open(os.path.join("output", f"{issue['id']}_issue_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(issue_search_fields) + write_csv.writerow(issue_search_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") - - + # Create .csv for commits search def log_commits_search(commit, query): commit_search_fields = ['SHA', 'Author', 'Username', 'Email', 'Committer', 'Repository', 'URL', 'Description'] commit_search_row = [commit['commit']['tree']['sha'], commit['commit']['author']['name'], commit['author']['login'], commit['commit']['author']['email'], commit['commit']['committer']['name'], commit['repository']['full_name'], commit['html_url'], commit['commit']['message']] - xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); - prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv"), - 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(commit_search_fields) - write_csv.writerow(commit_search_row) + + with open(os.path.join("output", f"{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(commit_search_fields) + write_csv.writerow(commit_search_row) logging.info(logged_to_csv.format(file.name)) xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - - else: - logging.info(logging_skipped.format(prompt)) - xprint(f"{INFO} {logging_skipped.format(prompt)}") +