diff --git a/dist/octosuite-3.0.0-py3-none-any.whl b/dist/octosuite-3.0.0-py3-none-any.whl deleted file mode 100644 index d1776b2..0000000 Binary files a/dist/octosuite-3.0.0-py3-none-any.whl and /dev/null differ diff --git a/dist/octosuite-3.0.0.tar.gz b/dist/octosuite-3.0.0.tar.gz deleted file mode 100644 index 05ddb25..0000000 Binary files a/dist/octosuite-3.0.0.tar.gz and /dev/null differ diff --git a/octosuite.egg-info/PKG-INFO b/octosuite.egg-info/PKG-INFO deleted file mode 100644 index ce70b7f..0000000 --- a/octosuite.egg-info/PKG-INFO +++ /dev/null @@ -1,83 +0,0 @@ -Metadata-Version: 2.1 -Name: octosuite -Version: 3.0.0 -Summary: Advanced Github OSINT Framework -Home-page: https://github.com/bellingcat/octosuite -Author: Richard Mwewa -Author-email: rly0nheart@duck.com -License: GNU General Public License v3 (GPLv3) -Classifier: Development Status :: 5 - Production/Stable -Classifier: Intended Audience :: Information Technology -Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) -Classifier: Operating System :: OS Independent -Classifier: Natural Language :: English -Classifier: Programming Language :: Python :: 3 -Description-Content-Type: text/markdown -License-File: LICENSE - -![logo](https://user-images.githubusercontent.com/74001397/175805580-fffc96d4-e0ef-48bb-a55c-80b2da3e714d.png) - -A framework fro gathering osint on GitHub users, repositories and organizations - -[![Upload Python Package](https://github.com/bellingcat/octosuite/actions/workflows/python-publish.yml/badge.svg)](https://github.com/bellingcat/octosuite/actions/workflows/python-publish.yml) -[![CodeQL](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml/badge.svg)](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml) -![GitHub](https://img.shields.io/github/license/bellingcat/octosuite?style=flat) -![PyPI](https://img.shields.io/pypi/v/octosuite?style=flat&logo=pypi) -![PyPI - Downloads](https://img.shields.io/pypi/dw/octosuite?style=flat&logo=pypi) -![PyPI - Status](https://img.shields.io/pypi/status/octosuite?style=flat&logo=pypi) -![GitHub repo size](https://img.shields.io/github/repo-size/bellingcat/octosuite?style=flat&logo=github) - - -![octosuite_gui_exe (2)](https://user-images.githubusercontent.com/74001397/186889610-4530ee26-d3c6-46fc-8c92-8709f89617fd.png "Octosuite' about window") - -![octosuite_gui_exe (4)](https://user-images.githubusercontent.com/74001397/186889897-c1c17fac-fddc-4967-9084-39cfe2d1307f.png "Octosuite user profile window") - - -# Wiki -[Refer to the Wiki](https://github.com/bellingcat/octosuite/wiki) for installation instructions, in addition to all other documentation. - -# Features -- [x] Fetches an organization's profile information -- [x] Fetches an oganization's events -- [x] Returns an organization's repositories -- [x] Returns an organization's public members -- [x] Fetches a repository's information -- [x] Returns a repository's contributors -- [x] Returns a repository's languages -- [x] Fetches a repository's stargazers -- [x] Fetches a repository's forks -- [x] Fetches a repository's releases -- [x] Returns a list of files in a specified path of a repository -- [x] Fetches a user's profile information -- [x] Returns a user's gists -- [x] Returns organizations that a user owns/belongs to -- [x] Fetches a user's events -- [x] Fetches a list of users followed by the target -- [x] Fetches a user's followers -- [x] Checks if user A follows user B -- [x] Checks if user is a public member of an organizations -- [x] Returns a user's subscriptions -- [x] Gets a user's subscriptions -- [x] Gets a user's events -- [x] Searches users -- [x] Searches repositories -- [x] Searches topics -- [x] Searches issues -- [x] Searches commits -- [x] Automatically logs network activity (.logs folder) -- [x] User can view, read and delete logs -- [x] ...And more - -## Note -> Octosuite automatically logs network and user activity of each session, the logs are saved by date and time in the .logs folder - - -# License -![license](https://user-images.githubusercontent.com/74001397/137917929-2f2cdb0c-4d1d-4e4b-9f0d-e01589e027b5.png) - -# Donations -If you like Octosuite and would like to show support, you can Buy A Coffee for the developer using the button below - -Buy Me A Coffee - -Your support will be much appreciated😊 diff --git a/octosuite.egg-info/SOURCES.txt b/octosuite.egg-info/SOURCES.txt deleted file mode 100644 index d1d90bf..0000000 --- a/octosuite.egg-info/SOURCES.txt +++ /dev/null @@ -1,18 +0,0 @@ -LICENSE -README.md -setup.py -octosuite/__init__.py -octosuite/banners.py -octosuite/colors.py -octosuite/csv_loggers.py -octosuite/helper.py -octosuite/log_roller.py -octosuite/main.py -octosuite/message_prefixes.py -octosuite/octosuite.py -octosuite.egg-info/PKG-INFO -octosuite.egg-info/SOURCES.txt -octosuite.egg-info/dependency_links.txt -octosuite.egg-info/entry_points.txt -octosuite.egg-info/requires.txt -octosuite.egg-info/top_level.txt \ No newline at end of file diff --git a/octosuite.egg-info/dependency_links.txt b/octosuite.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/octosuite.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/octosuite.egg-info/entry_points.txt b/octosuite.egg-info/entry_points.txt deleted file mode 100644 index ef7d5ff..0000000 --- a/octosuite.egg-info/entry_points.txt +++ /dev/null @@ -1,2 +0,0 @@ -[console_scripts] -octosuite = octosuite.main:octosuite diff --git a/octosuite.egg-info/requires.txt b/octosuite.egg-info/requires.txt deleted file mode 100644 index d4d1a3e..0000000 --- a/octosuite.egg-info/requires.txt +++ /dev/null @@ -1,3 +0,0 @@ -requests -rich -psutil diff --git a/octosuite.egg-info/top_level.txt b/octosuite.egg-info/top_level.txt deleted file mode 100644 index 194678a..0000000 --- a/octosuite.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -octosuite diff --git a/octosuite/colors.py b/octosuite/colors.py index 1ddb230..c6188f9 100644 --- a/octosuite/colors.py +++ b/octosuite/colors.py @@ -29,7 +29,7 @@ xprint(system_tree) print("\n") while True: try: - color_chooser = input(f"[?] Welcome, would you like to enable colors for this session? (yes/no) ").lower() + color_chooser = input(f"[PROMPT] Welcome, would you like to enable colors for this session? (yes/no) ").lower() if color_chooser == "yes": header_title = "bold white" red = "[red]" @@ -44,8 +44,8 @@ while True: header_title = red = white = green = red_bold = white_bold = green_bold = reset = "" break else: - print(f"\n[!] Your response '{color_chooser}' is invalid (expected yes or no)") + print(f"\n[INVALID] Your response '{color_chooser}' is invalid (expected yes or no)") except KeyboardInterrupt: - exit(f"[!] Process interrupted with Ctrl+C.") + exit(f"[WARNING] Process interrupted with Ctrl+C.") diff --git a/octosuite/csv_loggers.py b/octosuite/csv_loggers.py index d7b2d3b..93db373 100644 --- a/octosuite/csv_loggers.py +++ b/octosuite/csv_loggers.py @@ -2,445 +2,583 @@ import os import csv import logging from rich import print as xprint -from octosuite.log_roller import LogRoller -from octosuite.message_prefixes import MessagePrefix +from octosuite.log_roller import prompt_log_csv, logged_to_csv, logging_skipped +from octosuite.message_prefixes import PROMPT, WARNING, POSITIVE, NEGATIVE, INFO -# CsvLogger -# This class holds the methods for creating .csv files of each functionality in main -class CsvLogger: - # .csv for organization' profile - def log_org_profile(response): - org_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Email', 'About', 'Location', 'Blog', 'Followers', 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Account type', 'Is verified?', 'Has organization projects?', 'Has repository projects?', 'Created at', 'Updated at'] - org_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], response.json()['id'], response.json()['node_id'], response.json()['email'], response.json()['description'], response.json()['location'], response.json()['blog'], response.json()['followers'], response.json()['following'], response.json()['twitter_username'], response.json()['public_gists'], response.json()['public_repos'], response.json()['type'], response.json()['is_verified'], response.json()['has_organization_projects'], response.json()['has_repository_projects'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_profile_fields) - write_csv.writerow(org_profile_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") +# csv_loggers.py +# This class holds the functions for creating .csv files of each functionality in main +def log_org_profile(response): + org_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Email', 'About', 'Location', 'Blog', + 'Followers', 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Account type', + 'Is verified?', 'Has organization projects?', 'Has repository projects?', 'Created at', + 'Updated at'] + org_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], + response.json()['id'], response.json()['node_id'], response.json()['email'], + response.json()['description'], response.json()['location'], response.json()['blog'], + response.json()['followers'], response.json()['following'], response.json()['twitter_username'], + response.json()['public_gists'], response.json()['public_repos'], response.json()['type'], + response.json()['is_verified'], response.json()['has_organization_projects'], + response.json()['has_repository_projects'], response.json()['created_at'], + response.json()['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_profile_fields) + write_csv.writerow(org_profile_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") - # Creating a .csv file of a user' profile - def log_user_profile(response): - user_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Bio', 'Blog', 'Location', 'Followers', 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Organization', 'Is hireable?', 'Is site admin?', 'Joined at', 'Updated at'] - user_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], response.json()['id'], response.json()['node_id'], response.json()['bio'], response.json()['blog'], response.json()['location'], response.json()['followers'], response.json()['following'], response.json()['twitter_username'], response.json()['public_gists'], response.json()['public_repos'], response.json()['company'], response.json()['hireable'], response.json()['site_admin'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['login']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_profile_fields) - write_csv.writerow(user_profile_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") +# Creating a .csv file of a user' profile +def log_user_profile(response): + user_profile_fields = ['Profile photo', 'Name', 'Username', 'ID', 'Node ID', 'Bio', 'Blog', 'Location', 'Followers', + 'Following', 'Twitter handle', 'Gists', 'Repositories', 'Organization', 'Is hireable?', + 'Is site admin?', 'Joined at', 'Updated at'] + user_profile_row = [response.json()['avatar_url'], response.json()['name'], response.json()['login'], + response.json()['id'], response.json()['node_id'], response.json()['bio'], + response.json()['blog'], response.json()['location'], response.json()['followers'], + response.json()['following'], response.json()['twitter_username'], + response.json()['public_gists'], response.json()['public_repos'], response.json()['company'], + response.json()['hireable'], response.json()['site_admin'], response.json()['created_at'], + response.json()['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_profile_fields) + write_csv.writerow(user_profile_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") - # create .csv for repository profile - def log_repo_profile(response): - repo_profile_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - repo_profile_row = [response.json()['name'], response.json()['id'], response.json()['description'], response.json()['forks'], response.json()['stargazers_count'], response.json()['watchers'], response.json()['license'], response.json()['default_branch'], response.json()['visibility'], response.json()['language'], response.json()['open_issues'], response.json()['topics'], response.json()['homepage'], response.json()['clone_url'], response.json()['ssh_url'], response.json()['fork'], response.json()['allow_forking'], response.json()['private'], response.json()['archived'], response.json()['is_template'], response.json()['has_wiki'], response.json()['has_pages'], response.json()['has_projects'], response.json()['has_issues'], response.json()['has_downloads'], response.json()['pushed_at'], response.json()['created_at'], response.json()['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_profile_fields) - write_csv.writerow(repo_profile_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - +# create .csv for repository profile +def log_repo_profile(response): + repo_profile_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', + 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] + repo_profile_row = [response.json()['name'], response.json()['id'], response.json()['description'], + response.json()['forks'], response.json()['stargazers_count'], response.json()['watchers'], + response.json()['license'], response.json()['default_branch'], response.json()['visibility'], + response.json()['language'], response.json()['open_issues'], response.json()['topics'], + response.json()['homepage'], response.json()['clone_url'], response.json()['ssh_url'], + response.json()['fork'], response.json()['allow_forking'], response.json()['private'], + response.json()['archived'], response.json()['is_template'], response.json()['has_wiki'], + response.json()['has_pages'], response.json()['has_projects'], response.json()['has_issues'], + response.json()['has_downloads'], response.json()['pushed_at'], response.json()['created_at'], + response.json()['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{response.json()['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_profile_fields) + write_csv.writerow(repo_profile_row) - # create .csv for repository path contents - def log_repo_path_contents(content, repo_name): - path_content_fields = ['Filename', 'Size (bytes)', 'Type', 'Path', 'SHA', 'URL'] - path_content_row = [content['name'], content['size'], content['type'], content['path'], content['sha'], content['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{content['name']}_content_from_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(path_content_fields) - write_csv.writerow(path_content_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - - # create .csv for repository stargazer - def log_repo_stargazers(stargazer, repo_name): - user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] - user_follower_row = [stargazer['avatar_url'], stargazer['login'], stargazer['id'], stargazer['node_id'], stargazer['gravatar_id'], stargazer['type'], stargazer['site_admin'], stargazer['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{stargazer['login']}_stargazer_of_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_follower_fields) - write_csv.writerow(user_follower_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") - # create .csv for repository forks - def log_repo_forks(fork, count): - repo_fork_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - repo_fork_row = [fork['full_name'], fork['id'], fork['description'], fork['forks'], fork['stargazers_count'], fork['watchers'], fork['license'], fork['default_branch'], fork['visibility'], fork['language'], fork['open_issues'], fork['topics'], fork['homepage'], fork['clone_url'], fork['ssh_url'], fork['fork'], fork['allow_forking'], fork['private'], fork['archived'], fork['is_template'], fork['has_wiki'], fork['has_pages'], fork['has_projects'], fork['has_issues'], fork['has_downloads'], fork['pushed_at'], fork['created_at'], fork['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{fork['name']}_fork_{count}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_fork_fields) - write_csv.writerow(repo_fork_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - # create .csv for repository issues - def log_repo_issues(issue, repo_name): - repo_issue_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', 'Closed at', 'Created at', 'Updated at'] - repo_issue_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repo_name}_issue_{issue['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_issue_fields) - write_csv.writerow(repo_issue_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") +# create .csv for repository path contents +def log_repo_path_contents(content, repo_name): + path_content_fields = ['Filename', 'Size (bytes)', 'Type', 'Path', 'SHA', 'URL'] + path_content_row = [content['name'], content['size'], content['type'], content['path'], content['sha'], + content['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{content['name']}_content_from_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(path_content_fields) + write_csv.writerow(path_content_row) - - # create .csv for repository releases - def log_repo_releases(release, repo_name): - repo_release_fields = ['Name', 'ID', 'Node ID', 'Tag', 'Branch', 'Assets', 'Is draft?', 'Is prerelease?', 'Created at', 'Published at'] - repo_release_row = [release['name'], release['id'], release['node_id'], release['tag_name'], release['target_commitish'], release['assets'], release['draft'], release['prerelease'], release['created_at'], release['published_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repo_name}_release_{release['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_release_fields) - write_csv.writerow(repo_release_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv file for repository contributors - def log_repo_contributors(contributor, repo_name): - repo_contributor_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] - repo_contributor_row = [contributor['avatar_url'], contributor['login'], contributor['id'], contributor['node_id'], contributor['gravatar_id'], contributor['type'], contributor['site_admin'], contributor['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{contributor['login']}_contributor_of_{repo_name}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_contributor_fields) - write_csv.writerow(repo_contributor_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - print(f"{MessagePrefix.info} {LogRoller.logging_skipped}\n") - - - # Create .csv for organization' events - def log_repo_events(event, organization): - org_event_fields = ['ID', 'Type', 'Created at', 'Payload'] - org_event_row = [event['id'], event['type'], event['created_at'], event['payload']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{organization}_event_{event['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_event_fields) - write_csv.writerow(org_event_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv for organization' repositories - def log_org_repos(repository, organization): - org_repo_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - org_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_repository_of_{organization}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(org_repo_fields) - write_csv.writerow(org_repo_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user' repositories - def log_user_repos(repository, username): - user_repo_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - user_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_repo_fields) - write_csv.writerow(user_repo_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user events - def log_user_events(event): - user_event_fields = ['Actor', 'Type', 'Repository', 'Created at', 'Payload'] - user_event_row = [event['actor']['login'], event['type'], event['repo']['name'], event['created_at'], event['payload']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{event['actor']['login']}_event_{event['id']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_event_fields) - write_csv.writerow(user_event_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user gists - def log_user_gists(gist): - user_gist_fields = ['ID', 'Node ID', 'About', 'Comments', 'Files', 'Git Push URL', 'Is public?', 'Is truncated?', 'Updated at'] - user_gist_row = [gist['id'], gist['node_id'], gist['description'], gist['comments'], gist['files'], gist['git_push_url'], gist['public'], gist['truncated'], gist['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{gist['id']}_gists_{gist['owner']['login']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_gist_fields) - write_csv.writerow(user_gist_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user followers - def log_user_followers(follower, username): - user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] - user_follower_row = [follower['avatar_url'], follower['login'], follower['id'], follower['node_id'], follower['gravatar_id'], follower['type'], follower['site_admin'], follower['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(f"output/{follower['login']}_follower_of_{username}.csv", 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_follower_fields) - write_csv.writerow(user_follower_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user following - def log_user_following(user, username): - user_following_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] - user_following_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{user['login']}_followed_by_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_following_fields) - write_csv.writerow(user_following_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user' subscriptions - def log_user_subscriptions(repository, username): - user_subscription_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - user_subscription_row = [repository['name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{username}_subscriptions_{repository['name']}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_subscription_fields) - write_csv.writerow(user_subscription_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # .csv for user organizations - def log_user_orgs(organization, username): - user_org_fields = ['Profile photo', 'Name', 'ID', 'Node ID', 'URL', 'About'] - user_org_row = [organization['avatar_url'], organization['login'], organization['id'], organization['node_id'], organization['url'], organization['description']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{organization['login']}_{username}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_org_fields) - write_csv.writerow(user_org_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") - # Create .csv for user search - def log_users_search(user, query): - user_search_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', 'URL'] - user_search_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], user['type'], user['site_admin'], user['html_url']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{user['login']}_user_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(user_search_fields) - write_csv.writerow(user_search_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv for repository search - def log_repos_search(repository, query): - repo_search_fields = [ 'Name','ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] - repo_search_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], repository['stargazers_count'], repository['watchers'], repository['license'], repository['default_branch'], repository['visibility'], repository['language'], repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], repository['has_projects'], repository['has_issues'], repository['has_downloads'], repository['pushed_at'], repository['created_at'], repository['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{repository['name']}_repository_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(repo_search_fields) - write_csv.writerow(repo_search_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv for topic search - def log_topics_search(topic, query): - topic_search_fields = ['Name', 'Score', 'Curated', 'Featured', 'Display name', 'Created by', 'Created at', 'Updated at'] - topic_search_row = [topic['name'], topic['score'], topic['curated'], topic['featured'], topic['display_name'], topic['created_by'], topic['created_at'], topic['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{topic['name']}_topic_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(topic_search_fields) - write_csv.writerow(topic_search_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv for issues search - def log_issues_search(issue, query): - issue_search_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', 'Closed at', 'Created at', 'Updated at'] - issue_search_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{issue['id']}_issue_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(issue_search_fields) - write_csv.writerow(issue_search_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") - - - # Create .csv for commits search - def log_commits_search(commit, query): - commit_search_fields = ['SHA', 'Author', 'Username', 'Email', 'Committer', 'Repository', 'URL', 'Description'] - commit_search_row = [commit['commit']['tree']['sha'], commit['commit']['author']['name'], commit['author']['login'], commit['commit']['author']['email'], commit['commit']['committer']['name'], commit['repository']['full_name'], commit['html_url'], commit['commit']['message']] - xprint(f"\n{MessagePrefix.prompt} {LogRoller.prompt_log_csv}", end="");prompt = input().lower() - if prompt == "yes": - with open(os.path.join("output", f"{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv"), 'w') as file: - write_csv = csv.writer(file) - write_csv.writerow(commit_search_fields) - write_csv.writerow(commit_search_row) - - logging.info(LogRoller.logged_to_csv.format(file.name)) - xprint(f"{MessagePrefix.positive} {LogRoller.logged_to_csv.format(file.name)}") - - else: - logging.info(LogRoller.logging_skipped.format(prompt)) - xprint(f"{MessagePrefix.info} {LogRoller.logging_skipped.format(prompt)}") + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# create .csv for repository stargazer +def log_repo_stargazers(stargazer, repo_name): + user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', + 'Is site admin?', 'URL'] + user_follower_row = [stargazer['avatar_url'], stargazer['login'], stargazer['id'], stargazer['node_id'], + stargazer['gravatar_id'], stargazer['type'], stargazer['site_admin'], stargazer['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{stargazer['login']}_stargazer_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# create .csv for repository forks +def log_repo_forks(fork, count): + repo_fork_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', + 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] + repo_fork_row = [fork['full_name'], fork['id'], fork['description'], fork['forks'], fork['stargazers_count'], + fork['watchers'], fork['license'], fork['default_branch'], fork['visibility'], fork['language'], + fork['open_issues'], fork['topics'], fork['homepage'], fork['clone_url'], fork['ssh_url'], + fork['fork'], fork['allow_forking'], fork['private'], fork['archived'], fork['is_template'], + fork['has_wiki'], fork['has_pages'], fork['has_projects'], fork['has_issues'], + fork['has_downloads'], fork['pushed_at'], fork['created_at'], fork['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{fork['name']}_fork_{count}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_fork_fields) + write_csv.writerow(repo_fork_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# create .csv for repository issues +def log_repo_issues(issue, repo_name): + repo_issue_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', 'Assignee', + 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', 'Closed at', + 'Created at', 'Updated at'] + repo_issue_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], + issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], issue['assignees'], + issue['author_association'], issue['labels'], issue['locked'], issue['active_lock_reason'], + issue['closed_at'], issue['created_at'], issue['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repo_name}_issue_{issue['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_issue_fields) + write_csv.writerow(repo_issue_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# create .csv for repository releases +def log_repo_releases(release, repo_name): + repo_release_fields = ['Name', 'ID', 'Node ID', 'Tag', 'Branch', 'Assets', 'Is draft?', 'Is prerelease?', + 'Created at', 'Published at'] + repo_release_row = [release['name'], release['id'], release['node_id'], release['tag_name'], + release['target_commitish'], release['assets'], release['draft'], release['prerelease'], + release['created_at'], release['published_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repo_name}_release_{release['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_release_fields) + write_csv.writerow(repo_release_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv file for repository contributors +def log_repo_contributors(contributor, repo_name): + repo_contributor_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', + 'Is site admin?', 'URL'] + repo_contributor_row = [contributor['avatar_url'], contributor['login'], contributor['id'], contributor['node_id'], + contributor['gravatar_id'], contributor['type'], contributor['site_admin'], + contributor['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{contributor['login']}_contributor_of_{repo_name}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_contributor_fields) + write_csv.writerow(repo_contributor_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + print(f"{INFO} {logging_skipped}\n") + + +# Create .csv for organization' events +def log_repo_events(event, organization): + org_event_fields = ['ID', 'Type', 'Created at', 'Payload'] + org_event_row = [event['id'], event['type'], event['created_at'], event['payload']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{organization}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_event_fields) + write_csv.writerow(org_event_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv for organization' repositories +def log_org_repos(repository, organization): + org_repo_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', + 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] + org_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], + repository['stargazers_count'], repository['watchers'], repository['license'], + repository['default_branch'], repository['visibility'], repository['language'], + repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], + repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], + repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], + repository['has_projects'], repository['has_issues'], repository['has_downloads'], + repository['pushed_at'], repository['created_at'], repository['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_repository_of_{organization}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(org_repo_fields) + write_csv.writerow(org_repo_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user' repositories +def log_user_repos(repository, username): + user_repo_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', + 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] + user_repo_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], + repository['stargazers_count'], repository['watchers'], repository['license'], + repository['default_branch'], repository['visibility'], repository['language'], + repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], + repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], + repository['archived'], repository['is_template'], repository['has_wiki'], repository['has_pages'], + repository['has_projects'], repository['has_issues'], repository['has_downloads'], + repository['pushed_at'], repository['created_at'], repository['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_repo_fields) + write_csv.writerow(user_repo_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user events +def log_user_events(event): + user_event_fields = ['Actor', 'Type', 'Repository', 'Created at', 'Payload'] + user_event_row = [event['actor']['login'], event['type'], event['repo']['name'], event['created_at'], + event['payload']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{event['actor']['login']}_event_{event['id']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_event_fields) + write_csv.writerow(user_event_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user gists +def log_user_gists(gist): + user_gist_fields = ['ID', 'Node ID', 'About', 'Comments', 'Files', 'Git Push URL', 'Is public?', 'Is truncated?', + 'Updated at'] + user_gist_row = [gist['id'], gist['node_id'], gist['description'], gist['comments'], gist['files'], + gist['git_push_url'], gist['public'], gist['truncated'], gist['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{gist['id']}_gists_{gist['owner']['login']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_gist_fields) + write_csv.writerow(user_gist_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user followers +def log_user_followers(follower, username): + user_follower_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', + 'Is site admin?', 'URL'] + user_follower_row = [follower['avatar_url'], follower['login'], follower['id'], follower['node_id'], + follower['gravatar_id'], follower['type'], follower['site_admin'], follower['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(f"output/{follower['login']}_follower_of_{username}.csv", 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_follower_fields) + write_csv.writerow(user_follower_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user following +def log_user_following(user, username): + user_following_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', + 'Is site admin?', 'URL'] + user_following_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], + user['type'], user['site_admin'], user['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{user['login']}_followed_by_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_following_fields) + write_csv.writerow(user_following_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user' subscriptions +def log_user_subscriptions(repository, username): + user_subscription_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', + 'Has pages?', 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', + 'Created at', 'Updated at'] + user_subscription_row = [repository['name'], repository['id'], repository['description'], repository['forks'], + repository['stargazers_count'], repository['watchers'], repository['license'], + repository['default_branch'], repository['visibility'], repository['language'], + repository['open_issues'], repository['topics'], repository['homepage'], + repository['clone_url'], repository['ssh_url'], repository['fork'], + repository['allow_forking'], repository['private'], repository['archived'], + repository['is_template'], repository['has_wiki'], repository['has_pages'], + repository['has_projects'], repository['has_issues'], repository['has_downloads'], + repository['pushed_at'], repository['created_at'], repository['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{username}_subscriptions_{repository['name']}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_subscription_fields) + write_csv.writerow(user_subscription_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# .csv for user organizations +def log_user_orgs(organization, username): + user_org_fields = ['Profile photo', 'Name', 'ID', 'Node ID', 'URL', 'About'] + user_org_row = [organization['avatar_url'], organization['login'], organization['id'], organization['node_id'], + organization['url'], organization['description']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{organization['login']}_{username}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_org_fields) + write_csv.writerow(user_org_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv for user search +def log_users_search(user, query): + user_search_fields = ['Profile photo', 'Username', 'ID', 'Node ID', 'Gravatar ID', 'Account type', 'Is site admin?', + 'URL'] + user_search_row = [user['avatar_url'], user['login'], user['id'], user['node_id'], user['gravatar_id'], + user['type'], user['site_admin'], user['html_url']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{user['login']}_user_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(user_search_fields) + write_csv.writerow(user_search_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv for repository search +def log_repos_search(repository, query): + repo_search_fields = ['Name', 'ID', 'About', 'Forks', 'Stars', 'Watchers', 'License', 'Branch', 'Visibility', + 'Language(s)', 'Open issues', 'Topics', 'Homepage', 'Clone URL', 'SSH URL', 'Is fork?', + 'Is forkable?', 'Is private?', 'Is archived?', 'Is template?', 'Has wiki?', 'Has pages?', + 'Has projects?', 'Has issues?', 'Has downloads?', 'Pushed at', 'Created at', 'Updated at'] + repo_search_row = [repository['full_name'], repository['id'], repository['description'], repository['forks'], + repository['stargazers_count'], repository['watchers'], repository['license'], + repository['default_branch'], repository['visibility'], repository['language'], + repository['open_issues'], repository['topics'], repository['homepage'], repository['clone_url'], + repository['ssh_url'], repository['fork'], repository['allow_forking'], repository['private'], + repository['archived'], repository['is_template'], repository['has_wiki'], + repository['has_pages'], repository['has_projects'], repository['has_issues'], + repository['has_downloads'], repository['pushed_at'], repository['created_at'], + repository['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{repository['name']}_repository_search_result_for_{query}.csv"), + 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(repo_search_fields) + write_csv.writerow(repo_search_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + # Create .csv for topic search + + +def log_topics_search(topic, query): + topic_search_fields = ['Name', 'Score', 'Curated', 'Featured', 'Display name', 'Created by', 'Created at', + 'Updated at'] + topic_search_row = [topic['name'], topic['score'], topic['curated'], topic['featured'], topic['display_name'], + topic['created_by'], topic['created_at'], topic['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{topic['name']}_topic_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(topic_search_fields) + write_csv.writerow(topic_search_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv for issues search +def log_issues_search(issue, query): + issue_search_fields = ['Title', 'ID', 'Node ID', 'Number', 'State', 'Reactions', 'Comments', 'Milestone', + 'Assignee', 'Assignees', 'Author association', 'Labels', 'Is locked?', 'Lock reason', + 'Closed at', 'Created at', 'Updated at'] + issue_search_row = [issue['title'], issue['id'], issue['node_id'], issue['number'], issue['state'], + issue['reactions'], issue['comments'], issue['milestone'], issue['assignee'], + issue['assignees'], issue['author_association'], issue['labels'], issue['locked'], + issue['active_lock_reason'], issue['closed_at'], issue['created_at'], issue['updated_at']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{issue['id']}_issue_search_result_for_{query}.csv"), 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(issue_search_fields) + write_csv.writerow(issue_search_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") + + +# Create .csv for commits search +def log_commits_search(commit, query): + commit_search_fields = ['SHA', 'Author', 'Username', 'Email', 'Committer', 'Repository', 'URL', 'Description'] + commit_search_row = [commit['commit']['tree']['sha'], commit['commit']['author']['name'], commit['author']['login'], + commit['commit']['author']['email'], commit['commit']['committer']['name'], + commit['repository']['full_name'], commit['html_url'], commit['commit']['message']] + xprint(f"\n{PROMPT} {prompt_log_csv}", end=""); + prompt = input().lower() + if prompt == "yes": + with open(os.path.join("output", f"{commit['commit']['tree']['sha']}_commit_search_result_for_{query}.csv"), + 'w') as file: + write_csv = csv.writer(file) + write_csv.writerow(commit_search_fields) + write_csv.writerow(commit_search_row) + + logging.info(logged_to_csv.format(file.name)) + xprint(f"{POSITIVE} {logged_to_csv.format(file.name)}") + + else: + logging.info(logging_skipped.format(prompt)) + xprint(f"{INFO} {logging_skipped.format(prompt)}") diff --git a/octosuite/helper.py b/octosuite/helper.py index 7c07283..9c0c10e 100644 --- a/octosuite/helper.py +++ b/octosuite/helper.py @@ -2,163 +2,168 @@ from rich.table import Table from rich import print as xprint from octosuite.colors import white, green, white_bold, green_bold, header_title, reset -# Help -# This class holds the help text for available commands. -class Help: - usage_text = 'Use syntax {} to get started with %s{}%s.' % (green_bold, reset) - usage_text_1 = '%sUse {} to view all available subcommands.%s' % (white, reset) - usage_text_2 = "%sThe {} command works with subcommands. %s" % (white, reset) +# helper.py +# This file holds the help text for available commands. +usage_text = 'Use syntax {} to get started with %s{}%s.' % (green_bold, reset) +usage_text_1 = '%sUse {} to view all available subcommands.%s' % (white, reset) +usage_text_2 = "%sThe {} command works with subcommands. %s" % (white, reset) - def org(): - xprint( - Help.usage_text_2.format(f"{green_bold}org{reset}") + Help.usage_text_1.format( - f"{green_bold}help:org{reset}")) - def repo(): - xprint(Help.usage_text_2.format(f"{green_bold}repo{reset}") + Help.usage_text_1.format( - f"{green_bold}help:repo{reset}")) +def org(): + xprint(usage_text_2.format(f"{green_bold}org{reset}") + usage_text_1.format(f"{green_bold}help:org{reset}")) - def user(): - xprint(Help.usage_text_2.format(f"{green_bold}user{reset}") + Help.usage_text_1.format( - f"{green_bold}help:user{reset}")) - def search(): - xprint(Help.usage_text_2.format(f"{green_bold}search{reset}") + Help.usage_text_1.format( - f"{green_bold}help:search{reset}")) +def repo(): + xprint(usage_text_2.format(f"{green_bold}repo{reset}") + usage_text_1.format(f"{green_bold}help:repo{reset}")) - def source(): - xprint(Help.usage_text_2.format(f"{green_bold}source{reset}") + Help.usage_text_1.format( - f"{green_bold}help:source{reset}")) - def logs(): - xprint(Help.usage_text_2.format(f"{green_bold}logs{reset}") + Help.usage_text_1.format( - f"{green_bold}help:logs{reset}")) +def user(): + xprint(usage_text_2.format(f"{green_bold}user{reset}") + usage_text_1.format(f"{green_bold}help:user{reset}")) - def csv(): - xprint( - Help.usage_text_2.format(f"{green_bold}csv{reset}") + Help.usage_text_1.format( - f"{green_bold}help:csv{reset}")) - def source_command(): - source_cmd_table = Table(show_header=True, header_style=header_title) - source_cmd_table.add_column("Command", style="dim") - source_cmd_table.add_column("Description") - source_cmd_table.add_row("zipball", "Download source code Zipball") - source_cmd_table.add_row("tarball", "Download source code Tarball") +def search(): + xprint(usage_text_2.format(f"{green_bold}search{reset}") + usage_text_1.format(f"{green_bold}help:search{reset}")) - syntax = f"{green}source:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'source code downloads')}") - xprint(source_cmd_table) - def search_command(): - search_cmd_table = Table(show_header=True, header_style=header_title) - search_cmd_table.add_column("Command", style="dim") - search_cmd_table.add_column("Description") - search_cmd_table.add_row("users", "Search user(s)") - search_cmd_table.add_row("repos", "Search repositor[y][ies]") - search_cmd_table.add_row("topics", "Search topic(s)") - search_cmd_table.add_row("issues", "Search issue(s)") - search_cmd_table.add_row("commits", "Search commit(s)") +def source(): + xprint(usage_text_2.format(f"{green_bold}source{reset}") + usage_text_1.format(f"{green_bold}help:source{reset}")) - syntax = f"{green}search:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'target discovery')}") - xprint(search_cmd_table) - def user_command(): - user_cmd_table = Table(show_header=True, header_style=header_title) - user_cmd_table.add_column("Command", style="dim") - user_cmd_table.add_column("Description") - user_cmd_table.add_row("profile", "Get a target's profile info") - user_cmd_table.add_row("gists", "Return a users's gists") - user_cmd_table.add_row("org", "Return organizations that a target belongs to/owns") - user_cmd_table.add_row("repos", "Return a target's repositories") - user_cmd_table.add_row("events", "Return a target's events") - user_cmd_table.add_row("follows", "Check if user(A) follows user(B)") - user_cmd_table.add_row("followers", "Return a target's followers") - user_cmd_table.add_row("following", "Return a list of users the target is following") - user_cmd_table.add_row("subscriptions", "Return a target's subscriptions") +def logs(): + xprint(usage_text_2.format(f"{green_bold}logs{reset}") + usage_text_1.format(f"{green_bold}help:logs{reset}")) - syntax = f"{green}user:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'user investigation(s)')}") - xprint(user_cmd_table) - def org_command(): - org_cmd_table = Table(show_header=True, header_style=header_title) - org_cmd_table.add_column("Command", style="dim") - org_cmd_table.add_column("Description") - org_cmd_table.add_row("profile", "Get a target organization' profile info") - org_cmd_table.add_row("repos", "Return a target organization' repositories") - org_cmd_table.add_row("events", "Return a target organization' events") - org_cmd_table.add_row("member", "Check if a specified user is a public member of the target organization") +def csv(): + xprint(usage_text_2.format(f"{green_bold}csv{reset}") + usage_text_1.format(f"{green_bold}help:csv{reset}")) - syntax = f"{green}org:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'organization investigation(s)')}") - xprint(org_cmd_table) - def repo_command(): - repo_cmd_table = Table(show_header=True, header_style=header_title) - repo_cmd_table.add_column("Command", style="dim") - repo_cmd_table.add_column("Description") - repo_cmd_table.add_row("profile", "Get a repository's info") - repo_cmd_table.add_row("issues", "Return a repository's issues") - repo_cmd_table.add_row("forks", "Return a repository's forks") - repo_cmd_table.add_row("releases", "Return a repository's releases") - repo_cmd_table.add_row("stargazers", "Return a repository's stargazers") - repo_cmd_table.add_row("contributors", "Return a repository's contributors") - repo_cmd_table.add_row("path_contents", "List contents in a path of a repository") +def source_command(): + source_cmd_table = Table(show_header=True, header_style=header_title) + source_cmd_table.add_column("Command", style="dim") + source_cmd_table.add_column("Description") + source_cmd_table.add_row("zipball", "Download source code Zipball") + source_cmd_table.add_row("tarball", "Download source code Tarball") - syntax = f"{green}repo:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'repository investigation(s)')}") - xprint(repo_cmd_table) + syntax = f"{green}source:{reset}" + xprint(f"{usage_text.format(syntax, 'source code downloads')}") + xprint(source_cmd_table) - def logs_command(): - logs_cmd_table = Table(show_header=True, header_style=header_title) - logs_cmd_table.add_column("Command", style="dim") - logs_cmd_table.add_column("Description") - logs_cmd_table.add_row("view", "View logs") - logs_cmd_table.add_row("read", "Read log") - logs_cmd_table.add_row("delete", "Delete log") - logs_cmd_table.add_row("clear", "clear logs") - syntax = f"{green}logs:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'log(s) management')}") - xprint(logs_cmd_table) +def search_command(): + search_cmd_table = Table(show_header=True, header_style=header_title) + search_cmd_table.add_column("Command", style="dim") + search_cmd_table.add_column("Description") + search_cmd_table.add_row("users", "Search user(s)") + search_cmd_table.add_row("repos", "Search repositor[y][ies]") + search_cmd_table.add_row("topics", "Search topic(s)") + search_cmd_table.add_row("issues", "Search issue(s)") + search_cmd_table.add_row("commits", "Search commit(s)") - def csv_command(): - csv_cmd_table = Table(show_header=True, header_style=header_title) - csv_cmd_table.add_column("Command", style="dim") - csv_cmd_table.add_column("Description") - csv_cmd_table.add_row("view", "View csv files") - csv_cmd_table.add_row("read", "Read csv") - csv_cmd_table.add_row("delete", "Delete csv") - csv_cmd_table.add_row("clear", "clear csv files") + syntax = f"{green}search:{reset}" + xprint(f"{usage_text.format(syntax, 'target discovery')}") + xprint(search_cmd_table) - syntax = f"{green}csv:{reset}" - xprint(f"{Help.usage_text.format(syntax, 'csv management')}") - xprint(csv_cmd_table) - def help_command(): - core_cmd_table = Table(show_header=True, header_style=header_title) - core_cmd_table.add_column("Command", style="dim", width=12) - core_cmd_table.add_column("Description") - core_cmd_table.add_row("help", "Help menu") - core_cmd_table.add_row("exit", "Close session") - core_cmd_table.add_row("clear", "Clear screen") - core_cmd_table.add_row("about", "Program's info") - core_cmd_table.add_row("author", "Developer's info") +def user_command(): + user_cmd_table = Table(show_header=True, header_style=header_title) + user_cmd_table.add_column("Command", style="dim") + user_cmd_table.add_column("Description") + user_cmd_table.add_row("profile", "Get a target's profile info") + user_cmd_table.add_row("gists", "Return a users's gists") + user_cmd_table.add_row("org", "Return organizations that a target belongs to/owns") + user_cmd_table.add_row("repos", "Return a target's repositories") + user_cmd_table.add_row("events", "Return a target's events") + user_cmd_table.add_row("follows", "Check if user(A) follows user(B)") + user_cmd_table.add_row("followers", "Return a target's followers") + user_cmd_table.add_row("following", "Return a list of users the target is following") + user_cmd_table.add_row("subscriptions", "Return a target's subscriptions") - help_sub_cmd_table = Table(show_header=True, header_style=header_title) - help_sub_cmd_table.add_column("Command", style="dim", width=12) - help_sub_cmd_table.add_column("Description") - help_sub_cmd_table.add_row("csv", "List all csv management commands") - help_sub_cmd_table.add_row("logs", "List all logs management commands") - help_sub_cmd_table.add_row("org", "List all organization investigation commands") - help_sub_cmd_table.add_row("user", "List all users investigation commands") - help_sub_cmd_table.add_row("repo", "List all repository investigation commands") - help_sub_cmd_table.add_row("search", "List all target discovery commands") - help_sub_cmd_table.add_row("source", "List all source code download commands (for developers)") + syntax = f"{green}user:{reset}" + xprint(f"{usage_text.format(syntax, 'user investigation(s)')}") + xprint(user_cmd_table) - syntax = f"{green}help:{reset}" - xprint(core_cmd_table) - xprint(f"\n\n{Help.usage_text.format(syntax, 'octosuite')}") - xprint(help_sub_cmd_table) \ No newline at end of file + +def org_command(): + org_cmd_table = Table(show_header=True, header_style=header_title) + org_cmd_table.add_column("Command", style="dim") + org_cmd_table.add_column("Description") + org_cmd_table.add_row("profile", "Get a target organization' profile info") + org_cmd_table.add_row("repos", "Return a target organization' repositories") + org_cmd_table.add_row("events", "Return a target organization' events") + org_cmd_table.add_row("member", "Check if a specified user is a public member of the target organization") + + syntax = f"{green}org:{reset}" + xprint(f"{usage_text.format(syntax, 'organization investigation(s)')}") + xprint(org_cmd_table) + + +def repo_command(): + repo_cmd_table = Table(show_header=True, header_style=header_title) + repo_cmd_table.add_column("Command", style="dim") + repo_cmd_table.add_column("Description") + repo_cmd_table.add_row("profile", "Get a repository's info") + repo_cmd_table.add_row("issues", "Return a repository's issues") + repo_cmd_table.add_row("forks", "Return a repository's forks") + repo_cmd_table.add_row("releases", "Return a repository's releases") + repo_cmd_table.add_row("stargazers", "Return a repository's stargazers") + repo_cmd_table.add_row("contributors", "Return a repository's contributors") + repo_cmd_table.add_row("path_contents", "List contents in a path of a repository") + + syntax = f"{green}repo:{reset}" + xprint(f"{usage_text.format(syntax, 'repository investigation(s)')}") + xprint(repo_cmd_table) + + +def logs_command(): + logs_cmd_table = Table(show_header=True, header_style=header_title) + logs_cmd_table.add_column("Command", style="dim") + logs_cmd_table.add_column("Description") + logs_cmd_table.add_row("view", "View logs") + logs_cmd_table.add_row("read", "Read log") + logs_cmd_table.add_row("delete", "Delete log") + logs_cmd_table.add_row("clear", "clear logs") + + syntax = f"{green}logs:{reset}" + xprint(f"{usage_text.format(syntax, 'log(s) management')}") + xprint(logs_cmd_table) + + +def csv_command(): + csv_cmd_table = Table(show_header=True, header_style=header_title) + csv_cmd_table.add_column("Command", style="dim") + csv_cmd_table.add_column("Description") + csv_cmd_table.add_row("view", "View csv files") + csv_cmd_table.add_row("read", "Read csv") + csv_cmd_table.add_row("delete", "Delete csv") + csv_cmd_table.add_row("clear", "clear csv files") + + syntax = f"{green}csv:{reset}" + xprint(f"{usage_text.format(syntax, 'csv management')}") + xprint(csv_cmd_table) + + +def help_command(): + core_cmd_table = Table(show_header=True, header_style=header_title) + core_cmd_table.add_column("Command", style="dim", width=12) + core_cmd_table.add_column("Description") + core_cmd_table.add_row("help", "Help menu") + core_cmd_table.add_row("exit", "Close session") + core_cmd_table.add_row("clear", "Clear screen") + core_cmd_table.add_row("about", "Program's info") + core_cmd_table.add_row("author", "Developer's info") + + help_sub_cmd_table = Table(show_header=True, header_style=header_title) + help_sub_cmd_table.add_column("Command", style="dim", width=12) + help_sub_cmd_table.add_column("Description") + help_sub_cmd_table.add_row("csv", "List all csv management commands") + help_sub_cmd_table.add_row("logs", "List all logs management commands") + help_sub_cmd_table.add_row("org", "List all organization investigation commands") + help_sub_cmd_table.add_row("user", "List all users investigation commands") + help_sub_cmd_table.add_row("repo", "List all repository investigation commands") + help_sub_cmd_table.add_row("search", "List all target discovery commands") + help_sub_cmd_table.add_row("source", "List all source code download commands (for developers)") + + syntax = f"{green}help:{reset}" + xprint(core_cmd_table) + xprint(f"\n\n{usage_text.format(syntax, 'octosuite')}") + xprint(help_sub_cmd_table) diff --git a/octosuite/log_roller.py b/octosuite/log_roller.py index c0ba3cc..749be36 100644 --- a/octosuite/log_roller.py +++ b/octosuite/log_roller.py @@ -2,24 +2,23 @@ # cases (they're being used by logging to be written to log files, and being printed out to the screen). -class LogRoller: - ctrl_c = "Session terminated with Ctrl+C." - error = "An error occurred: {}" - session_opened = "Opened new session on {}:{}" - session_closed = "Session closed at {}." - viewing_logs = "Viewing logs" - viewing_csv = "Viewing CSV file(s)..." - deleted_log = "Deleted log: {}" - reading_log = "Reading log: {}" - reading_csv = 'Reading csv: {}' - deleted_csv = 'Deleted csv: {}' - file_downloading = "Downloading: {}" - file_downloaded = "Downloaded: downloads/{}" - info_not_found = "Information not found: {}, {}, {}" - user_not_found = "User not found: @{}" - org_not_found = "Organization not found: @{}" - repo_or_user_not_found = "Repository or User not found: {}, @{}" - prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) " - logged_to_csv = "Output logged: {}" - logging_skipped = "Logging skipped: {}" - limit_output = "Limit '{}' output to how many? (1-100) " +ctrl_c = "Session terminated with Ctrl+C." +error = "An error occurred: {}" +session_opened = "Opened new session on {}:{}" +session_closed = "Session closed at {}." +viewing_logs = "Viewing logs" +viewing_csv = "Viewing CSV file(s)..." +deleted_log = "Deleted log: {}" +reading_log = "Reading log: {}" +reading_csv = 'Reading csv: {}' +deleted_csv = 'Deleted csv: {}' +file_downloading = "Downloading: {}" +file_downloaded = "Downloaded: downloads/{}" +info_not_found = "Information not found: {}, {}, {}" +user_not_found = "User not found: @{}" +org_not_found = "Organization not found: @{}" +repo_or_user_not_found = "Repository or User not found: {}, @{}" +prompt_log_csv = "Do you wish to log this output to a CSV file? (yes/no) " +logged_to_csv = "Output logged: {}" +logging_skipped = "Logging skipped: {}" +limit_output = "Limit '{}' output to how many? (1-100) " diff --git a/octosuite/main.py b/octosuite/main.py index 915c60c..df3bbeb 100644 --- a/octosuite/main.py +++ b/octosuite/main.py @@ -1,5 +1,5 @@ # import everything from the octosuite.py file -from octosuite.octosuite import * +from octosuite.octosuite import * # I drifted away from the 'pythonic way' here def octosuite(): @@ -8,9 +8,9 @@ def octosuite(): run.on_start() except KeyboardInterrupt: - logging.warning(LogRoller.ctrl_c) - xprint(f"{MessagePrefix.warning} {LogRoller.ctrl_c}") + logging.warning(ctrl_c) + xprint(f"{WARNING} {ctrl_c}") except Exception as e: - logging.error(LogRoller.error.format(e)) - xprint(f"{MessagePrefix.error} {LogRoller.error.format(e)}") + logging.error(error.format(e)) + xprint(f"{ERROR} {error.format(e)}") diff --git a/octosuite/message_prefixes.py b/octosuite/message_prefixes.py index f65d0e4..98e98aa 100644 --- a/octosuite/message_prefixes.py +++ b/octosuite/message_prefixes.py @@ -1,14 +1,13 @@ from octosuite.colors import red, white, green, reset """ -MessagePrefix *Even here, I couldn't think of a good name.* The Attributes class holds the signs/symbols that show what +message prefixes that show what a notification in OctoSuite might be all about. This might not be very important or necessary in some cases, -but I think it's better to know the severity of the notifications you get in a program. +but I think it's better to know the severity of the notifications you get in a program. """ -class MessagePrefix: - prompt = f"{white}[{green}?{white}]{reset}" - warning = f"{white}[{red}!{white}]{reset}" - error = f"{white}[{red}x{white}]{reset}" - positive = f"{white}[{green}+{white}]{reset}" - negative = f"{white}[{red}-{white}]{reset}" - info = f"{white}[{green}*{white}]{reset}" +PROMPT = f"{white}[{green}PROMPT{white}]{reset}" +WARNING = f"{white}[{red}WARNING{white}]{reset}" +ERROR = f"{white}[{red}ERROR{white}]{reset}" +POSITIVE = f"{white}[{green}POSITIVE{white}]{reset}" +NEGATIVE = f"{white}[{red}NEGATIVE{white}]{reset}" +INFO = f"{white}[{green}INFO{white}]{reset}" \ No newline at end of file diff --git a/octosuite/octosuite.py b/octosuite/octosuite.py index 0063e78..d4aef4d 100644 --- a/octosuite/octosuite.py +++ b/octosuite/octosuite.py @@ -12,12 +12,13 @@ from rich.tree import Tree from rich.table import Table from datetime import datetime from rich import print as xprint -from octosuite.helper import Help -from octosuite.log_roller import LogRoller -from octosuite.csv_loggers import CsvLogger -from octosuite.message_prefixes import MessagePrefix from octosuite.banners import version_tag, ascii_banner +from octosuite.message_prefixes import PROMPT, WARNING, ERROR, POSITIVE, NEGATIVE, INFO # wondering why I name all the variables instead of just using the * wildcard?, because it's the pythonic way lol from octosuite.colors import red, white, green, white_bold, green_bold, header_title, reset +# seriously now, the reason why I am doing this, is so that you know exactly what I am importing from a named module :) +from octosuite.helper import help_command, source_command, search_command, user_command, repo_command, logs_command, csv_command, org_command, source, org, repo, user, search, logs, csv +from octosuite.log_roller import ctrl_c, error, session_opened, session_closed, viewing_logs, viewing_csv, deleted_log, reading_log, reading_csv, deleted_csv, file_downloading, file_downloaded, info_not_found, user_not_found, org_not_found, repo_or_user_not_found, prompt_log_csv, logged_to_csv, logging_skipped, limit_output +from octosuite.csv_loggers import log_org_profile, log_user_profile, log_repo_profile, log_repo_path_contents, log_repo_contributors, log_repo_startgazers, log_repo_forks, log_repo_issues, log_repo_releases, log_org_repos, log_org_events, log_user_repos, log_user_gists, log_user_orgs, log_user_events, log_user_subscriptions, log_user_following, log_user_followers, log_repos_search, log_users_search, log_topics_search, log_issues_search, log_commits_search class Octosuite: @@ -30,23 +31,23 @@ class Octosuite: ("clear", self.clear_screen), ("about", self.about), ("author", self.author), - ("help", Help.help_command), - ("help:source", Help.source_command), - ("help:search", Help.search_command), - ("help:user", Help.user_command), - ("help:repo", Help.repo_command), - ("help:logs", Help.logs_command), - ("help:csv", Help.csv_command), - ("help:org", Help.org_command), - ("source", Help.source), + ("help", help_command), + ("help:source", source_command), + ("help:search", search_command), + ("help:user", user_command), + ("help:repo", repo_command), + ("help:logs", logs_command), + ("help:csv", csv_command), + ("help:org", org_command), + ("source", source), ("source:tarball", self.download_tarball), ("source:zipball", self.download_zipball), - ("org", Help.org), + ("org", org), ("org:events", self.org_events), ("org:profile", self.org_profile), ("org:repos", self.org_repos), ("org:member", self.org_member), - ("repo", Help.repo), + ("repo", repo), ("repo:path_contents", self.path_contents), ("repo:profile", self.repo_profile), ("repo:contributors", self.repo_contributors), @@ -54,7 +55,7 @@ class Octosuite: ("repo:forks", self.repo_forks), ("repo:issues", self.repo_issues), ("repo:releases", self.repo_releases), - ("user", Help.user), + ("user", user), ("user:repos", self.user_repos), ("user:gists", self.user_gists), ("user:orgs", self.user_orgs), @@ -64,18 +65,18 @@ class Octosuite: ("user:follows", self.user_follows), ("user:following", self.user_following), ("user:subscriptions", self.user_subscriptions), - ("search", Help.search), + ("search", search), ("search:users", self.users_search), ("search:repos", self.repos_search), ("search:topics", self.topics_search), ("search:issues", self.issues_search), ("search:commits", self.commits_search), - ("logs", Help.logs), + ("logs", logs), ("logs:view", self.view_logs), ("logs:read", self.read_log), ("logs:delete", self.delete_log), ("logs:clear", self.clear_logs), - ("csv", Help.csv), + ("csv", csv), ("csv:view", self.view_csv), ("csv:read", self.read_csv), ("csv:delete", self.delete_csv), @@ -313,7 +314,7 @@ class Octosuite: logging.basicConfig(filename=f".logs/{now_formatted}.log", format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S%p", level=logging.DEBUG) # Log the start of a session - logging.info(LogRoller.session_opened.format(platform.node(), getpass.getuser())) + logging.info(session_opened.format(platform.node(), getpass.getuser())) # Check for updates @@ -365,13 +366,13 @@ class Octosuite: organization = input() response = requests.get(f"{self.endpoint}/orgs/{organization}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") + xprint(f"{NEGATIVE} {org_not_found.format(organization)}") elif response.status_code == 200: org_profile_tree = Tree("\n" + response.json()['name']) for attr in self.org_attrs: org_profile_tree.add(f"{self.org_attr_dict[attr]}: {response.json()[attr]}") xprint(org_profile_tree) - CsvLogger.log_org_profile(response) + log_org_profile(response) else: xprint(response.json()) @@ -382,13 +383,13 @@ class Octosuite: username = input() response = requests.get(f"{self.endpoint}/users/{username}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: user_profile_tree = Tree("\n" + response.json()['name']) for attr in self.profile_attrs: user_profile_tree.add(f"{self.profile_attr_dict[attr]}: {response.json()[attr]}") xprint(user_profile_tree) - CsvLogger.log_user_profile(response) + log_user_profile(response) else: xprint(response.json()) @@ -401,13 +402,13 @@ class Octosuite: username = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: repo_profile_tree = Tree("\n" + response.json()['full_name']) for attr in self.repo_attrs: repo_profile_tree.add(f"{self.repo_attr_dict[attr]}: {response.json()[attr]}") xprint(repo_profile_tree) - CsvLogger.log_repo_profile(response) + log_repo_profile(response) else: xprint(response.json()) @@ -422,15 +423,15 @@ class Octosuite: path_name = input() response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contents/{path_name}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.info_not_found.format(repo_name, username, path_name)}") + xprint(f"{NEGATIVE} {info_not_found.format(repo_name, username, path_name)}") elif response.status_code == 200: for content_count, content in enumerate(response.json(), start=1): path_contents_tree = Tree("\n" + content['name']) for attr in self.path_attrs: path_contents_tree.add(f"{self.path_attr_dict[attr]}: {content[attr]}") xprint(path_contents_tree) - CsvLogger.log_repo_path_contents(content, repo_name) - xprint(MessagePrefix.info, f"Found {content_count} file(s) in {repo_name}/{path_name}.") + log_repo_path_contents(content, repo_name) + xprint(INFO, f"Found {content_count} file(s) in {repo_name}/{path_name}.") else: xprint(response.json()) @@ -441,18 +442,18 @@ class Octosuite: repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username) ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("contributors"), end="") + xprint(PROMPT, limit_output.format("contributors"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/contributors?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.status_code == 200: for contributor in response.json(): contributor_tree = Tree("\n" + contributor['login']) for attr in self.user_attrs: contributor_tree.add(f"{self.user_attr_dict[attr]}: {contributor[attr]}") xprint(contributor_tree) - CsvLogger.log_repo_contributors(contributor, repo_name) + log_repo_contributors(contributor, repo_name) else: xprint(response.json()) @@ -463,20 +464,20 @@ class Octosuite: repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository stargazers"), end="") + xprint(PROMPT, limit_output.format("repository stargazers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/stargazers?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{MessagePrefix.negative} Repository does not have any stargazers -> ({repo_name})") + xprint(f"{NEGATIVE} Repository does not have any stargazers -> ({repo_name})") elif response.status_code == 200: for stargazer in response.json(): stargazer_tree = Tree("\n" + stargazer['login']) for attr in self.user_attrs: stargazer_tree.add(f"{self.user_attr_dict[attr]}: {stargazer[attr]}") xprint(stargazer_tree) - CsvLogger.log_repo_stargazers(stargazer, repo_name) + log_repo_stargazers(stargazer, repo_name) else: xprint(response.json()) @@ -487,20 +488,20 @@ class Octosuite: repo_name = input() xprint(f"{white}@{green}Owner{white} (username):{reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository forks"), end="") + xprint(PROMPT, limit_output.format("repository forks"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/forks?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.json() == {}: - xprint(f"{MessagePrefix.negative} Repository does not have forks -> ({repo_name})") + xprint(f"{NEGATIVE} Repository does not have forks -> ({repo_name})") elif response.status_code == 200: for count, fork in enumerate(response.json()): fork_tree = Tree("\n" + fork['full_name']) for attr in self.repo_attrs: fork_tree.add(f"{self.repo_attr_dict[attr]}: {fork[attr]}") xprint(fork_tree) - CsvLogger.log_repo_forks(fork, count) + log_repo_forks(fork, count) else: xprint(response.json()) @@ -510,13 +511,13 @@ class Octosuite: repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository issues"), end="") + xprint(PROMPT, limit_output.format("repository issues"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/issues?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{MessagePrefix.negative} Repository does not have open issues -> ({repo_name})") + xprint(f"{NEGATIVE} Repository does not have open issues -> ({repo_name})") elif response.status_code == 200: for issue in response.json(): issues_tree = Tree("\n" + issue['title']) @@ -524,7 +525,7 @@ class Octosuite: issues_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_tree) xprint(issue['body']) - CsvLogger.log_repo_issues(issue, repo_name) + log_repo_issues(issue, repo_name) else: xprint(response.json()) @@ -535,13 +536,13 @@ class Octosuite: repo_name = input() xprint(f"{white}>> @{green}Owner{white} (username){reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repository releases"), end="") + xprint(PROMPT, limit_output.format("repository releases"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/repos/{username}/{repo_name}/releases?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.repo_or_user_not_found.format(repo_name, username)}") + xprint(f"{NEGATIVE} {repo_or_user_not_found.format(repo_name, username)}") elif response.json() == []: - xprint(f"{MessagePrefix.negative} Repository does not have releases -> ({repo_name})") + xprint(f"{NEGATIVE} Repository does not have releases -> ({repo_name})") elif response.status_code == 200: for release in response.json(): releases_tree = Tree("\n" + release['name']) @@ -549,7 +550,7 @@ class Octosuite: releases_tree.add(f"{self.repo_releases_attr_dict[attr]}: {release[attr]}") xprint(releases_tree) xprint(release['body']) - CsvLogger.log_repo_releases(release, repo_name) + log_repo_releases(release, repo_name) else: xprint(response.json()) @@ -558,18 +559,18 @@ class Octosuite: def org_repos(self): xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") organization = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") + xprint(PROMPT, limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") + xprint(f"{NEGATIVE} {org_not_found.format(organization)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - CsvLogger.log_org_repos(repository, organization) + log_org_repos(repository, organization) else: xprint(response.json()) @@ -578,11 +579,11 @@ class Octosuite: def org_events(self): xprint(f"{white}@{green}organization{white} (username):{reset} ", end="") organization = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("organization repositories"), end="") + xprint(PROMPT, limit_output.format("organization repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/orgs/{organization}/events?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.org_not_found.format(organization)}") + xprint(f"{NEGATIVE} {org_not_found.format(organization)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -590,7 +591,7 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - CsvLogger.log_org_events(event, organization) + log_org_events(event, organization) else: xprint(response.json()) @@ -603,27 +604,27 @@ class Octosuite: username = input() response = requests.get(f"{self.endpoint}/orgs/{organization}/public_members/{username}") if response.status_code == 204: - xprint(f"{MessagePrefix.positive} User ({username}) is a public member of the organization -> ({organization})") + xprint(f"{POSITIVE} User ({username}) is a public member of the organization -> ({organization})") else: - xprint(f"{MessagePrefix.negative} {response.json()['message']}") + xprint(f"{NEGATIVE} {response.json()['message']}") # Fetching user repositories def user_repos(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositories"), end="") + xprint(PROMPT, limit_output.format("repositories"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/repos?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for repository in response.json(): repos_tree = Tree("\n" + repository['full_name']) for attr in self.repo_attrs: repos_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_tree) - CsvLogger.log_user_repos(repository, username) + log_user_repos(repository, username) else: xprint(response.json()) @@ -632,20 +633,20 @@ class Octosuite: def user_gists(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format('gists'), end="") + xprint(PROMPT, limit_output.format('gists'), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/gists?per_page={limit}") if response.json() == []: - xprint(f"{MessagePrefix.negative} User does not have gists.") + xprint(f"{NEGATIVE} User does not have gists.") elif response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for gist in response.json(): gists_tree = Tree("\n" + gist['id']) for attr in self.gists_attrs: gists_tree.add(f"{self.gists_attr_dict[attr]}: {gist[attr]}") xprint(gists_tree) - CsvLogger.log_user_gists(gist) + log_user_gists(gist) else: xprint(response.json()) @@ -654,20 +655,20 @@ class Octosuite: def user_orgs(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user organizations"), end="") + xprint(PROMPT, limit_output.format("user organizations"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/orgs?per_page={limit}") if response.json() == []: - xprint(f"{MessagePrefix.negative} User ({username}) does not (belong to/own) any organizations.") + xprint(f"{NEGATIVE} User ({username}) does not (belong to/own) any organizations.") elif response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for organization in response.json(): org_tree = Tree("\n" + organization['login']) for attr in self.user_orgs_attrs: org_tree.add(f"{self.user_orgs_attr_dict[attr]}: {organization[attr]}") xprint(org_tree) - CsvLogger.log_user_orgs(organization, username) + log_user_orgs(organization, username) else: xprint(response.json()) @@ -676,11 +677,11 @@ class Octosuite: def user_events(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("events"), end="") + xprint(PROMPT, limit_output.format("events"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/events/public?per_page={limit}") if response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for event in response.json(): events_tree = Tree("\n" + event['id']) @@ -690,7 +691,7 @@ class Octosuite: events_tree.add(f"Created at: {event['created_at']}") xprint(events_tree) xprint(event['payload']) - CsvLogger.log_user_events(event) + log_user_events(event) else: xprint(response.json()) @@ -699,20 +700,20 @@ class Octosuite: def user_subscriptions(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user subscriptions"), end="") + xprint(PROMPT, limit_output.format("user subscriptions"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/subscriptions?per_page={limit}") if response.json() == []: - xprint(f"{MessagePrefix.negative} User does not have any subscriptions.") + xprint(f"{NEGATIVE} User does not have any subscriptions.") elif response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for repository in response.json(): subscriptions_tree =Tree("\n" + repository['full_name']) for attr in self.repo_attrs: subscriptions_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(subscriptions_tree) - CsvLogger.log_user_subscriptions(repository, username) + log_user_subscriptions(repository, username) else: xprint(response.json()) @@ -721,20 +722,20 @@ class Octosuite: def user_following(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user' following"), end="") + xprint(PROMPT, limit_output.format("user' following"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/following?per_page={limit}") if response.json() == []: - xprint(f"{MessagePrefix.negative} User ({username})does not follow anyone.") + xprint(f"{NEGATIVE} User ({username})does not follow anyone.") elif response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for user in response.json(): following_tree = Tree("\n" + user['login']) for attr in self.user_attrs: following_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(following_tree) - CsvLogger.log_user_following(user, username) + log_user_following(user, username) else: xprint(response.json()) @@ -743,20 +744,20 @@ class Octosuite: def user_followers(self): xprint(f"{white}@{green}username:{reset} ", end="") username = input().lower() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user followers"), end="") + xprint(PROMPT, limit_output.format("user followers"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/users/{username}/followers?per_page={limit}") if response.json() == []: - xprint(f"{MessagePrefix.negative} User ({username})does not have followers.") + xprint(f"{NEGATIVE} User ({username})does not have followers.") elif response.status_code == 404: - xprint(f"{MessagePrefix.negative} {LogRoller.userNotFound.format(username)}") + xprint(f"{NEGATIVE} {user_not_found.format(username)}") elif response.status_code == 200: for follower in response.json(): followers_tree = Tree("\n" + follower['login']) for attr in self.user_attrs: followers_tree.add(f"{self.user_attr_dict[attr]}: {follower[attr]}") xprint(followers_tree) - CsvLogger.log_user_followers(follower, username) + log_user_followers(follower, username) else: xprint(response.json()) @@ -769,16 +770,16 @@ class Octosuite: user_b = input() response = requests.get(f"{self.endpoint}/users/{user_a}/following/{user_b}") if response.status_code == 204: - xprint(f"{MessagePrefix.positive} @{user_a} FOLLOWS @{user_b}") + xprint(f"{POSITIVE} @{user_a} FOLLOWS @{user_b}") else: - xprint(f"{MessagePrefix.negative} @{user_a} DOES NOT FOLLOW @{user_b}") + xprint(f"{NEGATIVE} @{user_a} DOES NOT FOLLOW @{user_b}") # User search def users_search(self): xprint(f"{white}@{green}query{white} (eg. john):{reset} ", end="") query = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("user search"), end="") + xprint(PROMPT, limit_output.format("user search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/users?q={query}&per_page={limit}").json() for user in response['items']: @@ -786,14 +787,14 @@ class Octosuite: for attr in self.user_attrs: users_search_tree.add(f"{self.user_attr_dict[attr]}: {user[attr]}") xprint(users_search_tree) - CsvLogger.log_users_search(user, query) + log_users_search(user, query) # Repository search def repos_search(self): xprint(f"{white}%{green}query{white} (eg. git):{reset} ", end="") query = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("repositor[y][ies] search"), end="") + xprint(PROMPT, limit_output.format("repositor[y][ies] search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/repositories?q={query}&per_page={limit}").json() for repository in response['items']: @@ -801,14 +802,14 @@ class Octosuite: for attr in self.repo_attrs: repos_search_tree.add(f"{self.repo_attr_dict[attr]}: {repository[attr]}") xprint(repos_search_tree) - CsvLogger.log_repos_search(repository, query) + log_repos_search(repository, query) # Topics search def topics_search(self): xprint(f"{white}#{green}query{white} (eg. osint):{reset} ", end="") query = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("topic(s) search"), end="") + xprint(PROMPT, limit_output.format("topic(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/topics?q={query}&per_page={limit}").json() for topic in response['items']: @@ -816,14 +817,14 @@ class Octosuite: for attr in self.topic_attrs: topics_search_tree.add(f"{self.topic_attr_dict[attr]}: {topic[attr]}") xprint(topics_search_tree) - CsvLogger.log_topics_search(topic, query) + log_topics_search(topic, query) # Issue search def issues_search(self): xprint(f"{white}!{green}Query{white} (eg. error):{reset} ", end="") query = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("issue(s) search"), end="") + xprint(PROMPT, limit_output.format("issue(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/issues?q={query}&per_page={limit}").json() for issue in response['items']: @@ -832,14 +833,14 @@ class Octosuite: issues_search_tree.add(f"{self.repo_issues_attr_dict[attr]}: {issue[attr]}") xprint(issues_search_tree) xprint(issue['body']) - CsvLogger.log_issues_search(issue, query) + log_issues_search(issue, query) # Commits search def commits_search(self): xprint(f"{white}:{green}Query{white} (eg. filename:index.php):{reset} ", end="") query = input() - xprint(MessagePrefix.prompt, LogRoller.limit_output.format("commit(s) search"), end="") + xprint(PROMPT, limit_output.format("commit(s) search"), end="") limit = int(input()) response = requests.get(f"{self.endpoint}/search/commits?q={query}&per_page={limit}").json() for commit in response['items']: @@ -852,12 +853,12 @@ class Octosuite: commits_search_tree.add(f"URL: {commit['html_url']}") xprint(commits_search_tree) xprint(commit['commit']['message']) - CsvLogger.log_commits_search(commit, query) + log_commits_search(commit, query) # View csv files def view_csv(self): - logging.info(LogRoller.viewing_csv) + logging.info(viewing_csv) csv_files = os.listdir("output") csv_table = Table(show_header=True, header_style=header_title) csv_table.add_column("CSV", style="dim") @@ -872,7 +873,7 @@ class Octosuite: xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() with open(os.path.join("output", csv_file + ".csv"), "r") as file: - logging.info(LogRoller.reading_csv.format(csv_file)) + logging.info(reading_csv.format(csv_file)) text = Text(file.read()) xprint(text) @@ -882,24 +883,24 @@ class Octosuite: xprint(f"{green}csv {white}(filename):{reset} ", end="") csv_file = input() os.remove(os.path.join("output", csv_file)) - logging.info(LogRoller.deleted_csv.format(csv_file)) - xprint(f"{MessagePrefix.positive} {LogRoller.deleted_csv.format(csv_file)}") + logging.info(deleted_csv.format(csv_file)) + xprint(f"{POSITIVE} {deleted_csv.format(csv_file)}") # Clear csv def clear_csv(self): - xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") + xprint(f"{PROMPT} This will clear all {len(os.listdir('output'))} csv files, continue? (yes/no) ", end="") prompt = input().lower() if prompt == "yes": shutil.rmtree("output", ignore_errors=True) - xprint(f"{MessagePrefix.info} CSV files cleared successfully!") + xprint(f"{INFO} CSV files cleared successfully!") else: pass # View octosuite log files def view_logs(self): - logging.info(LogRoller.viewing_logs) + logging.info(viewing_logs) logs = os.listdir(".logs") logs_table = Table(show_header=True, header_style=header_title) logs_table.add_column("Log", style="dim") @@ -914,7 +915,7 @@ class Octosuite: xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") log_file = input() with open(os.path.join(".logs", log_file + ".log"), "r") as log: - logging.info(LogRoller.reading_log.format(log_file)) + logging.info(reading_log.format(log_file)) xprint("\n" + log.read()) @@ -923,18 +924,18 @@ class Octosuite: xprint(f"{green}log date{white} (eg. 2022-04-27 10:09:36AM):{reset} ", end="") log_file = input() os.remove(os.path.join(".logs", log_file)) - logging.info(LogRoller.deleted_log.format(log_file)) - xprint(f"{MessagePrefix.positive} {LogRoller.deleted_log.format(log_file)}") + logging.info(deleted_log.format(log_file)) + xprint(f"{POSITIVE} {deleted_log.format(log_file)}") # Clear logs def clear_logs(self): - xprint(f"{MessagePrefix.prompt} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") + xprint(f"{PROMPT} This will clear all {len(os.listdir('.logs'))} logs and close the current session, continue? (yes/no) ", end="") prompt = input().lower() if prompt == "yes": shutil.rmtree(".logs", ignore_errors=True) - xprint(f"{MessagePrefix.info} Logs cleared successfully!") - xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") + xprint(f"{INFO} Logs cleared successfully!") + xprint(f"{INFO} {session_closed.format(datetime.now())}") exit() else: pass @@ -942,28 +943,28 @@ class Octosuite: # Downloading release tarball def download_tarball(self): - logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) - xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.tar")) + logging.info(file_downloading.format(f"octosuite.v{version_tag}.tar")) + xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.tar")) data = requests.get(f"{self.endpoint}/repos/bellingcat/octosuite/tarball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.tar"), "wb") as file: file.write(data.content) file.close() - logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) - xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.tar")) + logging.info(file_downloaded.format(f"octosuite.v{version_tag}.tar")) + xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.tar")) # Downloading release zipball def download_zipball(self): - logging.info(LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) - xprint(MessagePrefix.info, LogRoller.file_downloading.format(f"octosuite.v{version_tag}.zip")) + logging.info(file_downloading.format(f"octosuite.v{version_tag}.zip")) + xprint(INFO, file_downloading.format(f"octosuite.v{version_tag}.zip")) data = requests.get(f"{self.endpoint}/repos/rly0nheart/octosuite/zipball/{version_tag}") with open(os.path.join("downloads", f"octosuite.v{version_tag}.zip"), "wb") as file: file.write(data.content) file.close() - logging.info(LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) - xprint(MessagePrefix.positive, LogRoller.file_downloaded.format(f"octosuite.v{version_tag}.zip")) + logging.info(file_downloaded.format(f"octosuite.v{version_tag}.zip")) + xprint(POSITIVE, file_downloaded.format(f"octosuite.v{version_tag}.zip")) # Author info @@ -997,11 +998,11 @@ GitHub REST API documentation: https://docs.github.com/rest # Close session def exit_session(self): - xprint(f"{MessagePrefix.prompt} This will close the current session, continue? (yes/no) ", end="") + xprint(f"{PROMPT} This will close the current session, continue? (yes/no) ", end="") prompt = input().lower() if prompt == "yes": - logging.info(LogRoller.session_closed.format(datetime.now())) - xprint(f"{MessagePrefix.info} {LogRoller.session_closed.format(datetime.now())}") + logging.info(session_closed.format(datetime.now())) + xprint(f"{INFO} {session_closed.format(datetime.now())}") exit() else: pass