mirror of
https://github.com/bellingcat/octosuite.git
synced 2026-06-12 21:38:34 +03:00
Update octosuite.py
This commit is contained in:
@@ -7,15 +7,14 @@ import logging
|
||||
import getpass
|
||||
import requests
|
||||
import platform
|
||||
import subprocess
|
||||
from rich.text import Text
|
||||
from rich.tree import Tree
|
||||
from rich.table import Table
|
||||
from datetime import datetime
|
||||
# from pyreadline3 import Readline
|
||||
from rich import print as xprint
|
||||
from octosuite.config import *
|
||||
from octosuite.banner import version_tag, banner
|
||||
from octosuite.config import red, white, green, header_title, reset
|
||||
from octosuite.config import red, white, green, header_title, reset, create_parser, args
|
||||
from octosuite.message_prefixes import ERROR, WARNING, PROMPT, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol
|
||||
# seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :)
|
||||
from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, \
|
||||
@@ -27,18 +26,39 @@ from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_pr
|
||||
log_repo_contributors, log_repo_stargazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, \
|
||||
log_org_profile, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, \
|
||||
log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, \
|
||||
log_commits_search # log_org_events
|
||||
|
||||
# readline = Readline()
|
||||
|
||||
"""
|
||||
path_finder()
|
||||
This method is responsible for creating/checking the availability of the (.logs, output, downloads) folders,
|
||||
enabling logging to automatically log network/user activity to a file,
|
||||
and logging the start of a session.
|
||||
"""
|
||||
log_commits_search
|
||||
|
||||
|
||||
|
||||
def set_readline():
|
||||
if os.name == "nt":
|
||||
try:
|
||||
from pyreadline3 import Readline
|
||||
except ImportError:
|
||||
subprocess.run(['pip3', 'install', 'pyreadline3'])
|
||||
readline = Readline()
|
||||
else:
|
||||
import readline
|
||||
|
||||
def completer(text, state):
|
||||
options = [i for i in commands if i.startswith(text)]
|
||||
if state < len(options):
|
||||
return options[state]
|
||||
else:
|
||||
return None
|
||||
|
||||
readline.parse_and_bind("tab: complete")
|
||||
readline.set_completer(completer)
|
||||
|
||||
return readline
|
||||
|
||||
|
||||
set_readline()
|
||||
|
||||
|
||||
# path_finder()
|
||||
# This function is responsible for creating/checking the availability of the (.logs, output, downloads) folders,
|
||||
# enabling logging to automatically log network/user activity to a file, and logging the start of a session.
|
||||
def path_finder():
|
||||
"""
|
||||
Check 3 directories (.logs, output, downloads) on startup
|
||||
@@ -49,15 +69,8 @@ def path_finder():
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
|
||||
|
||||
"""
|
||||
Configure logging and check for updates
|
||||
"""
|
||||
|
||||
|
||||
# Configure logging to log user activities
|
||||
def configure_logging():
|
||||
"""
|
||||
Configure logging to log activities to a file, which will be named by the date and time a session was opened.
|
||||
"""
|
||||
now = datetime.now()
|
||||
now_formatted = now.strftime("%Y-%m-%d %H-%M-%S%p")
|
||||
logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s",
|
||||
@@ -66,18 +79,19 @@ def configure_logging():
|
||||
logging.info(session_opened.format(platform.node(), getpass.getuser()))
|
||||
|
||||
|
||||
# Check if the remote tag_name from the latest release matches the one in the program
|
||||
# if it does, it means the program is up to date.
|
||||
# If it doesn't match, notify the user about a new release
|
||||
def check_updates():
|
||||
response = requests.get("https://api.github.com/repos/bellingcat/octosuite/releases/latest").json()
|
||||
if response['tag_name'] == version_tag:
|
||||
"""
|
||||
Ignore if the program is up to date
|
||||
"""
|
||||
pass
|
||||
else:
|
||||
xprint(
|
||||
f"[{green}UPDATE{reset}] A new release of Octosuite is available ({response['tag_name']}). Run 'pip install --upgrade octosuite' to get the updates.\n")
|
||||
|
||||
|
||||
# Delete a specified csv file
|
||||
def delete_csv():
|
||||
xprint(f"{green}csv {white}(filename):{reset} ", end="")
|
||||
csv_file = input()
|
||||
@@ -86,76 +100,21 @@ def delete_csv():
|
||||
xprint(f"{POSITIVE} {deleted_csv.format(csv_file)}")
|
||||
|
||||
|
||||
# Clear csv files
|
||||
def clear_csv():
|
||||
xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="")
|
||||
prompt = input().lower()
|
||||
if prompt == "yes":
|
||||
shutil.rmtree("output", ignore_errors=True)
|
||||
xprint(f"{INFO} CSV files cleared successfully!")
|
||||
files = os.listdir('output')
|
||||
for file in files:
|
||||
if os.path.isfile(file):
|
||||
os.remove(file)
|
||||
xprint(f"{INFO} Cleared {len(files)} .csv files!")
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def view_logs():
|
||||
logging.info(viewing_logs)
|
||||
logs = os.listdir(".logs")
|
||||
logs_table = Table(show_header=True, header_style=header_title)
|
||||
logs_table.add_column("Log", style="dim")
|
||||
logs_table.add_column("Size (bytes)")
|
||||
for log in logs:
|
||||
logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log)))
|
||||
xprint(logs_table)
|
||||
|
||||
|
||||
def read_log():
|
||||
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
|
||||
log_file = input()
|
||||
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
|
||||
logging.info(reading_log.format(log_file))
|
||||
xprint("\n" + log.read())
|
||||
|
||||
|
||||
def delete_log():
|
||||
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
|
||||
log_file = input()
|
||||
os.remove(os.path.join(".logs", log_file))
|
||||
logging.info(deleted_log.format(log_file))
|
||||
xprint(f"{POSITIVE} {deleted_log.format(log_file)}")
|
||||
|
||||
|
||||
def clear_logs():
|
||||
xprint(
|
||||
f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ",
|
||||
end="")
|
||||
prompt = input().lower()
|
||||
if prompt == "yes":
|
||||
shutil.rmtree(".logs", ignore_errors=True)
|
||||
xprint(f"{INFO} Logs cleared successfully!")
|
||||
xprint(f"{INFO} {session_closed.format(datetime.now())}")
|
||||
exit()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def exit_session():
|
||||
xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="")
|
||||
prompt = input().lower()
|
||||
if prompt == "yes":
|
||||
logging.info(session_closed.format(datetime.now()))
|
||||
xprint(f"{INFO} {session_closed.format(datetime.now())}")
|
||||
exit()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def clear_screen():
|
||||
"""
|
||||
using 'cls' on Windows machines to clear the screen,
|
||||
otherwise, use 'clear'
|
||||
"""
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
|
||||
# View csv files
|
||||
def view_csv():
|
||||
logging.info(viewing_csv)
|
||||
csv_files = os.listdir("output")
|
||||
@@ -167,6 +126,7 @@ def view_csv():
|
||||
xprint(csv_table)
|
||||
|
||||
|
||||
# Read csv
|
||||
def read_csv():
|
||||
xprint(f"{green}csv {white}(filename):{reset} ", end="")
|
||||
csv_file = input()
|
||||
@@ -176,6 +136,71 @@ def read_csv():
|
||||
xprint(text)
|
||||
|
||||
|
||||
# View logs
|
||||
def view_logs():
|
||||
logging.info(viewing_logs)
|
||||
logs = os.listdir(".logs")
|
||||
logs_table = Table(show_header=True, header_style=header_title)
|
||||
logs_table.add_column("Log", style="dim")
|
||||
logs_table.add_column("Size (bytes)")
|
||||
for log in logs:
|
||||
logs_table.add_row(str(log), str(os.path.getsize(".logs/" + log)))
|
||||
xprint(logs_table)
|
||||
|
||||
|
||||
# Read log
|
||||
def read_log():
|
||||
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
|
||||
log_file = input()
|
||||
with open(os.path.join(".logs", log_file + ".log"), "r") as log:
|
||||
logging.info(reading_log.format(log_file))
|
||||
xprint("\n" + log.read())
|
||||
|
||||
|
||||
# Delete log
|
||||
def delete_log():
|
||||
xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="")
|
||||
log_file = input()
|
||||
os.remove(os.path.join(".logs", log_file))
|
||||
logging.info(deleted_log.format(log_file))
|
||||
xprint(f"{POSITIVE} {deleted_log.format(log_file)}")
|
||||
|
||||
|
||||
# Clear logs
|
||||
def clear_logs():
|
||||
xprint(
|
||||
f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="")
|
||||
prompt = input().lower()
|
||||
if prompt == "yes":
|
||||
files = os.listdir('.logs')
|
||||
for file in files:
|
||||
if os.path.isfile(file):
|
||||
os.remove(file)
|
||||
xprint(f"{INFO} Cleared {len(files)} .log files!")
|
||||
exit()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
# Exit session
|
||||
def exit_session():
|
||||
xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="")
|
||||
prompt = input().lower()
|
||||
if prompt == "yes":
|
||||
logging.info(session_closed.format(datetime.now()))
|
||||
xprint(f"{INFO} {session_closed.format(datetime.now())}")
|
||||
exit()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
# Clear screen
|
||||
def clear_screen():
|
||||
# Using 'cls' on Windows machines to clear the screen,
|
||||
# otherwise, use 'clear'
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
|
||||
def about():
|
||||
about_text = f"""
|
||||
OCTOSUITE © 2023 Richard Mwewa
|
||||
|
||||
Reference in New Issue
Block a user