diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index 4a1b0a9..f738c74 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -7,15 +7,14 @@ import logging import getpass import requests import platform +import subprocess from rich.text import Text from rich.tree import Tree from rich.table import Table from datetime import datetime -# from pyreadline3 import Readline from rich import print as xprint -from octosuite.config import * from octosuite.banner import version_tag, banner -from octosuite.config import red, white, green, header_title, reset +from octosuite.config import red, white, green, header_title, reset, create_parser, args from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol # seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :) from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, \ @@ -27,18 +26,39 @@ from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_pr log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, \ log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, \ log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, \ - log_commits_search # log_org_events - -# readline = Readline() - -""" -path_finder() -This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders, -enabling logging to automatically log network/user activity to a file, - and logging the start of a session. -""" + log_commits_search + +def set_readline(): + if os.name == "nt": + try: + from pyreadline3 import Readline + except ImportError: + subprocess.run(['pip3', 'install', 'pyreadline3']) + readline = Readline() + else: + import readline + + def completer(text, state): + options = [i for i in commands if i.startswith(text)] + if state < len(options): + return options[state] + else: + return None + + readline.parse_and_bind("tab: complete") + readline.set_completer(completer) + + return readline + + +set_readline() + + +# path_finder() +# This function is responsible for creating/checking the availability of the (.logs, output, downloads) folders, +# enabling logging to automatically log network/user activity to a file, and logging the start of a session. def path_finder(): """ Check 3 directories (.logs, output, downloads) on startup @@ -49,15 +69,8 @@ def path_finder(): os.makedirs(directory, exist_ok=True) -""" -Configure logging and check for updates -""" - - +# Configure logging to log user activities def configure_logging(): - """ - Configure logging to log activities to a file, which will be named by the date and time a session was opened. - """ now = datetime.now() now_formatted = now.strftime("%Y-%m-%d %H-%M-%S%p") logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", @@ -66,18 +79,19 @@ def configure_logging(): logging.info(session_opened.format(platform.node(), getpass.getuser())) +# Check if the remote tag_name from the latest release matches the one in the program +# if it does, it means the program is up to date. +# If it doesn't match, notify the user about a new release def check_updates(): response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json() if response['tag_name'] == version_tag: - """ - Ignore if the program is up to date - """ pass else: xprint( f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n") +# Delete a specified csv file def delete_csv(): xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() @@ -86,76 +100,21 @@ def delete_csv(): xprint(f"{POSITIVE} {deleted_csv.format(csv_file)}") +# Clear csv files def clear_csv(): xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") prompt = input().lower() if prompt == "yes": - shutil.rmtree("output", ignore_errors=True) - xprint(f"{INFO} CSV files cleared successfully!") + files = os.listdir('output') + for file in files: + if os.path.isfile(file): + os.remove(file) + xprint(f"{INFO} Cleared {len(files)} .csv files!") else: pass -def view_logs(): - logging.info(viewing_logs) - logs = os.listdir(".logs") - logs_table = Table(show_header=True, header_style=header_title) - logs_table.add_column("Log", style="dim") - logs_table.add_column("Size (bytes)") - for log in logs: - logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log))) - xprint(logs_table) - - -def read_log(): - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() - with open(os.path.join(".logs", log_file + ".log"), "r") as log: - logging.info(reading_log.format(log_file)) - xprint("\n" + log.read()) - - -def delete_log(): - xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") - log_file = input() - os.remove(os.path.join(".logs", log_file)) - logging.info(deleted_log.format(log_file)) - xprint(f"{POSITIVE} {deleted_log.format(log_file)}") - - -def clear_logs(): - xprint( - f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", - end="") - prompt = input().lower() - if prompt == "yes": - shutil.rmtree(".logs", ignore_errors=True) - xprint(f"{INFO} Logs cleared successfully!") - xprint(f"{INFO} {session_closed.format(datetime.now())}") - exit() - else: - pass - - -def exit_session(): - xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="") - prompt = input().lower() - if prompt == "yes": - logging.info(session_closed.format(datetime.now())) - xprint(f"{INFO} {session_closed.format(datetime.now())}") - exit() - else: - pass - - -def clear_screen(): - """ - using 'cls' on Windows machines to clear the screen, - otherwise, use 'clear' - """ - os.system('cls' if os.name == 'nt' else 'clear') - - +# View csv files def view_csv(): logging.info(viewing_csv) csv_files = os.listdir("output") @@ -167,6 +126,7 @@ def view_csv(): xprint(csv_table) +# Read csv def read_csv(): xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() @@ -176,6 +136,71 @@ def read_csv(): xprint(text) +# View logs +def view_logs(): + logging.info(viewing_logs) + logs = os.listdir(".logs") + logs_table = Table(show_header=True, header_style=header_title) + logs_table.add_column("Log", style="dim") + logs_table.add_column("Size (bytes)") + for log in logs: + logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log))) + xprint(logs_table) + + +# Read log +def read_log(): + xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") + log_file = input() + with open(os.path.join(".logs", log_file + ".log"), "r") as log: + logging.info(reading_log.format(log_file)) + xprint("\n" + log.read()) + + +# Delete log +def delete_log(): + xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") + log_file = input() + os.remove(os.path.join(".logs", log_file)) + logging.info(deleted_log.format(log_file)) + xprint(f"{POSITIVE} {deleted_log.format(log_file)}") + + +# Clear logs +def clear_logs(): + xprint( + f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") + prompt = input().lower() + if prompt == "yes": + files = os.listdir('.logs') + for file in files: + if os.path.isfile(file): + os.remove(file) + xprint(f"{INFO} Cleared {len(files)} .log files!") + exit() + else: + pass + + +# Exit session +def exit_session(): + xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="") + prompt = input().lower() + if prompt == "yes": + logging.info(session_closed.format(datetime.now())) + xprint(f"{INFO} {session_closed.format(datetime.now())}") + exit() + else: + pass + + +# Clear screen +def clear_screen(): + # Using 'cls' on Windows machines to clear the screen, + # otherwise, use 'clear' + os.system('cls' if os.name == 'nt' else 'clear') + + def about(): about_text = f""" OCTOSUITE © 2023 Richard Mwewa