Compare commits

...

15 Commits

Author SHA1 Message Date
Ritchie Mwewa
9b0f490089 Merge pull request #25 from bellingcat/4.0/beta
4.0/beta
2026-01-04 05:54:20 +02:00
Ritchie Mwewa
d9e2dae71d Merge branch 'master' into 4.0/beta 2026-01-04 05:54:11 +02:00
Ritchie Mwewa
0c06b95d18 Merge branch '4.0/beta' of github.com:bellingcat/octosuite into 4.0/beta 2026-01-04 05:52:23 +02:00
Ritchie Mwewa
ba8ed6bd4e Patches in menus 2026-01-04 05:52:01 +02:00
Ritchie Mwewa
86d0112691 Merge pull request #24 from bellingcat/4.0/beta
4.0/beta
2026-01-03 23:04:21 +02:00
Ritchie Mwewa
39f82ad775 Merge branch 'master' into 4.0/beta 2026-01-03 23:04:11 +02:00
Ritchie Mwewa
b5ee95db2b Validating targets will instantly cache their profile data since we're hitting the profile api to validate 2026-01-03 23:02:58 +02:00
Ritchie Mwewa
77efa0982f Merge branch '4.0/beta' of github.com:bellingcat/octosuite into 4.0/beta 2026-01-03 22:52:22 +02:00
Ritchie Mwewa
38a9a4b6b3 Allow response caching 2026-01-03 22:51:52 +02:00
Ritchie Mwewa
ab3cc398d0 Merge pull request #23 from bellingcat/4.0/beta
4.0/beta
2026-01-03 17:18:57 +02:00
Ritchie Mwewa
0a453dade6 Merge branch 'master' into 4.0/beta 2026-01-03 17:18:45 +02:00
Ritchie Mwewa
66abbcbef6 Update README.md 2026-01-03 17:17:33 +02:00
Ritchie Mwewa
04e35dbc98 Update README.md 2026-01-03 16:56:07 +02:00
Ritchie Mwewa
f4d7fa81b8 4.0.0-beta1 2026-01-03 16:39:29 +02:00
Ritchie Mwewa
7bf04cffd4 4.0.0-beta1 2026-01-03 16:36:07 +02:00
11 changed files with 1012 additions and 262 deletions

View File

@@ -1,4 +1,4 @@
![octosuite](img/octosuite.png) ![octosuite](https://raw.githubusercontent.com/bellingcat/octosuite/refs/heads/master/img/octosuite.png)
TUI-based toolkit for GitHub data analysis. TUI-based toolkit for GitHub data analysis.
@@ -7,6 +7,7 @@ TUI-based toolkit for GitHub data analysis.
![Code Size](https://img.shields.io/github/languages/code-size/bellingcat/octosuite) ![Code Size](https://img.shields.io/github/languages/code-size/bellingcat/octosuite)
![Release Date](https://img.shields.io/github/release-date/bellingcat/octosuite) ![Release Date](https://img.shields.io/github/release-date/bellingcat/octosuite)
![Build Status](https://img.shields.io/github/actions/workflow/status/bellingcat/octosuite/python-publish.yml) ![Build Status](https://img.shields.io/github/actions/workflow/status/bellingcat/octosuite/python-publish.yml)
[![CodeQL Advanced](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml/badge.svg)](https://github.com/bellingcat/octosuite/actions/workflows/codeql.yml)
![License](https://img.shields.io/github/license/bellingcat/octosuite) ![License](https://img.shields.io/github/license/bellingcat/octosuite)
## Overview ## Overview
@@ -16,11 +17,56 @@ repositories, organizations, and search across GitHub's platform.
## Features ## Features
- **User Data** - View profiles, repositories, followers, organizations, and activity, e.t.c. <details>
- **Repository Data** - Access repository details, commits, issues, releases, and contributors <summary><strong>See details</strong></summary>
- **Organisation Data** - Explore organisation profiles, members, and repositories
- **Search** - Search across repositories, users, commits, issues, and topics - **User** - Get user data
- **Export** - Save data in JSON, CSV, or HTML formats - Profile
- Repositories
- Subscriptions
- Starred
- Followers
- Following
- Organizations
- Gists
- Events
- Received Events
- **Repository** - Get repository data
- Profile
- Forks
- Issues
- Issue Events
- Events
- Assignees
- Branches
- Tags
- Languages
- Stargazers
- Subscribers
- Commits
- Comments
- Releases
- Deployments
- Labels
- **Organisation** - Get organisation data
- Profile
- Repositories
- Events
- Hooks
- Issues
- Members
- **Search** - Search GitHub
- Repositories
- Users
- Commits
- Issues
- Topics
- **Export** - Export data
- JSON
- CSV
- HTML
</details>
## Installation ## Installation
@@ -50,17 +96,20 @@ octosuite
``` ```
> [!Note] > [!Note]
> You can run octosuite with commands `octosuite`, or `ocs` > You can then run octosuite with command `octosuite`
## Usage ## Usage
Navigate using <kbd>UP</kbd><kbd>DOWN</kbd> and <kbd>ENTER</kbd> to select options. The interface guides you through Navigate using <kbd>UP</kbd><kbd>DOWN</kbd> and <kbd>ENTER</kbd> to select options. In the export menu, you should
use <kbd>SPACE</kbd> to check the format you want.
The interface guides you through
selecting a selecting a
data source data source
and and
choosing what information to retrieve. Preview the results and optionally export them in your preferred format. choosing what information to retrieve. Preview the results and optionally export them in your preferred format.
![home](img/menu.png) ![home](https://raw.githubusercontent.com/bellingcat/octosuite/refs/heads/master/img/menu.png)
## License ## License

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "octosuite" name = "octosuite"
version = "4.0.0-beta" version = "4.0.0beta2"
description = "TUI-based toolkit for GitHub data analysis." description = "TUI-based toolkit for GitHub data analysis."
readme = "README.md" readme = "README.md"
license = "MIT" license = "MIT"
@@ -33,5 +33,4 @@ dev = [
] ]
[project.scripts] [project.scripts]
octosuite = "octosuite.app:start" octosuite = "octosuite.app:start"
ocs = "octosuite.app:start"

View File

@@ -1,2 +1,2 @@
__pkg__ = "octosuite" __pkg__ = "octosuite"
__version__ = "4.0.0" __version__ = "4.0.0beta2"

View File

@@ -1,5 +1,6 @@
import sys import sys
from .core.cache import cache
from .lib import console, __pkg__, __version__ from .lib import console, __pkg__, __version__
from .tui.menus import Menus from .tui.menus import Menus
@@ -11,3 +12,5 @@ def start():
menu.main() menu.main()
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit() sys.exit()
finally:
cache.clear()

View File

@@ -0,0 +1,74 @@
import hashlib
import json
__all__ = ["cache"]
class ResponseCache:
"""Simple in-memory cache for API responses."""
def __init__(self):
"""Initialise the ResponseCache with an empty cache dictionary."""
self._cache = {}
@staticmethod
def _generate_key(url: str, params: dict = None) -> str:
"""
Generate a unique cache key from URL and parameters.
:param url: The URL to generate a key for.
:param params: Optional dictionary of parameters to include in the key.
:return: MD5 hash string representing the unique cache key.
"""
cache_data = f"{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_data.encode()).hexdigest()
def get(self, url: str, params: dict = None):
"""
Retrieve a cached response if it exists.
:param url: The URL to retrieve cached data for.
:param params: Optional dictionary of parameters used in the original request.
:return: Cached response data if found, None otherwise.
"""
key = self._generate_key(url, params)
return self._cache.get(key)
def set(self, url: str, data, params: dict = None):
"""
Store a response in the cache.
:param url: The URL to cache data for.
:param data: The response data to cache.
:param params: Optional dictionary of parameters used in the request.
"""
key = self._generate_key(url, params)
self._cache[key] = data
def clear(self):
"""Clear all cached responses from memory."""
self._cache.clear()
def remove(self, url: str, params: dict = None):
"""
Remove a specific cached response.
:param url: The URL to remove cached data for.
:param params: Optional dictionary of parameters used in the original request.
"""
key = self._generate_key(url, params)
self._cache.pop(key, None)
cache = ResponseCache()
def clear_cache():
"""Clear all cached API responses from the global cache instance."""
cache.clear()

View File

@@ -5,6 +5,7 @@ import typing as t
import requests import requests
from requests import Response from requests import Response
from .cache import cache
from ..lib import __version__ from ..lib import __version__
BASE_URL = "https://api.github.com" BASE_URL = "https://api.github.com"
@@ -13,6 +14,8 @@ __all__ = ["BASE_URL", "GitHub"]
class GitHub: class GitHub:
"""Handles GitHub API requests with caching and response sanitisation."""
def __init__( def __init__(
self, self,
user_agent: str = ( user_agent: str = (
@@ -21,11 +24,37 @@ class GitHub:
f"https://github.com/bellingcat/octosuite) requests/{requests.__version__}" f"https://github.com/bellingcat/octosuite) requests/{requests.__version__}"
), ),
): ):
"""
Initialise the GitHub API client.
:param user_agent: Custom User-Agent string for API requests.
"""
self.user_agent = user_agent self.user_agent = user_agent
self.cache = cache
def get( def get(
self, url: str, params: t.Optional[dict] = None, return_response: bool = False self,
url: str,
params: t.Optional[dict] = None,
return_response: bool = False,
use_cache: bool = True,
) -> t.Union[dict, list, Response]: ) -> t.Union[dict, list, Response]:
"""
Make a GET request to the GitHub API.
:param url: The API endpoint URL.
:param params: Optional query parameters for the request.
:param return_response: If True, return the raw Response object instead of JSON data.
:param use_cache: If True, use cached responses when available.
:return: Dictionary, list, or Response object depending on the request and parameters.
"""
if use_cache and not return_response:
cached = self.cache.get(url, params)
if cached is not None:
return cached
response = requests.get( response = requests.get(
url=url, params=params, headers={"User-Agent": self.user_agent} url=url, params=params, headers={"User-Agent": self.user_agent}
) )
@@ -34,33 +63,65 @@ class GitHub:
return response return response
if response.status_code == 200: if response.status_code == 200:
return self._sanitise_response(response=response.json()) sanitised = self.sanitise_response(response=response.json())
# Cache the successful response
if use_cache:
self.cache.set(url, sanitised, params)
return sanitised
return [] return []
def is_valid_entity( def is_valid_entity(
self, entity_type: t.Literal["user", "org", "repo"], **kwargs self, _type: t.Literal["user", "org", "repo"], **kwargs
) -> bool: ) -> bool:
"""Validate if a GitHub entity exists.""" """
Validate whether a GitHub entity exists.
:param _type: Type of entity to validate ("user", "org", or "repo").
:param kwargs: Entity identifiers (username for user/org, repo_owner and repo_name for repo).
:return: True if the entity exists, False otherwise.
"""
try: try:
if entity_type == "user": type_map = {
url = f"https://api.github.com/users/{kwargs.get('username')}" "user": f"https://api.github.com/users/{kwargs.get('username')}",
elif entity_type == "org": "org": f"https://api.github.com/orgs/{kwargs.get('username')}",
url = f"https://api.github.com/orgs/{kwargs.get('username')}" "repo": f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}",
elif entity_type == "repo": }
url = f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}"
else: url = type_map[_type]
# Check cache first
cached = self.cache.get(url)
if cached is not None:
return True # If cached, entity exists
response = requests.get(url=url, headers={"User-Agent": self.user_agent})
# Only cache if entity exists (status 200)
if response.status_code == 200:
sanitised = self.sanitise_response(response.json())
self.cache.set(url, sanitised)
return True return True
response = self.get(url=url, return_response=True) return False
return response.status_code == 200
except requests.RequestException: except requests.RequestException:
return False return False
def _sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]: def sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]:
"""
Remove API URLs and null values from response data recursively.
:param response: The response data to sanitise (dict or list).
:return: Sanitised response with API URLs and null values removed.
"""
pattern = re.compile(r"https://api\.github\.com") pattern = re.compile(r"https://api\.github\.com")
if isinstance(response, list): if isinstance(response, list):
return [self._sanitise_response(response=item) for item in response] return [self.sanitise_response(response=item) for item in response]
if isinstance(response, dict): if isinstance(response, dict):
keys_to_remove = [ keys_to_remove = [
@@ -74,6 +135,6 @@ class GitHub:
# Recursively clean nested dicts/lists # Recursively clean nested dicts/lists
for key, value in response.items(): for key, value in response.items():
if isinstance(value, (dict, list)): if isinstance(value, (dict, list)):
response[key] = self._sanitise_response(response=value) response[key] = self.sanitise_response(response=value)
return response return response

View File

@@ -8,74 +8,191 @@ __all__ = ["User", "Org", "Repo", "Search"]
class GitHubEntity: class GitHubEntity:
"""Base class for GitHub entities with common functionality.""" """
Base class for GitHub entities with common functionality."""
def __init__(self, source: str): def __init__(self, source: str):
"""
Initialise a GitHub entity.
:param source: The source identifier for the entity.
"""
self.endpoint = None self.endpoint = None
self.source = source self.source = source
def exists(self) -> bool: def exists(self) -> tuple[bool, dict]:
"""Check if the entity exists on GitHub.""" """
Check if the entity exists on GitHub.
:return: Tuple of (exists, response_data) where exists is True if the entity is found.
"""
# Check cache first
cached = github.cache.get(self.endpoint)
if cached is not None:
return True, cached
try: try:
response = github.get(url=self.endpoint, return_response=True) response = github.get(url=self.endpoint, return_response=True)
return response.status_code == 200
except exceptions.RequestException as err: if response.status_code == 200:
return False data = response.json()
# Sanitise the data
sanitised = github.sanitise_response(data)
# Cache the sanitised response
github.cache.set(self.endpoint, sanitised)
return True, sanitised
return False, response.json()
except exceptions.RequestException:
return False, {}
class User(GitHubEntity): class User(GitHubEntity):
"""Represents a GitHub user with methods to query user data."""
def __init__(self, name: str): def __init__(self, name: str):
"""
Initialise a User instance.
:param name: The GitHub username.
"""
super().__init__(source=name) super().__init__(source=name)
self.name = name self.name = name
self.endpoint = f"{BASE_URL}/users/{name}" self.endpoint = f"{BASE_URL}/users/{name}"
def profile(self) -> dict: def profile(self) -> dict:
"""
Retrieve the user's profile information.
:return: Dictionary containing user profile data.
"""
profile = github.get(url=self.endpoint) profile = github.get(url=self.endpoint)
return profile return profile
def repos(self, page: int, per_page: int) -> list: def repos(self, page: int, per_page: int) -> list:
"""
Retrieve the user's public repositories.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params) repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos return repos
def subscriptions(self, page: int, per_page: int) -> list: def subscriptions(self, page: int, per_page: int) -> list:
"""
Retrieve repositories the user is subscribed to.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository subscription dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
subscriptions = github.get(url=f"{self.endpoint}/subscriptions", params=params) subscriptions = github.get(url=f"{self.endpoint}/subscriptions", params=params)
return subscriptions return subscriptions
def starred(self, page: int, per_page: int) -> list: def starred(self, page: int, per_page: int) -> list:
"""
Retrieve repositories the user has starred.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of starred repository dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
starred = github.get(url=f"{self.endpoint}/starred", params=params) starred = github.get(url=f"{self.endpoint}/starred", params=params)
return starred return starred
def followers(self, page: int, per_page: int) -> list: def followers(self, page: int, per_page: int) -> list:
"""
Retrieve the user's followers.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of follower user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/followers", params=params) users = github.get(url=f"{self.endpoint}/followers", params=params)
return users return users
def following(self, page: int, per_page: int) -> list: def following(self, page: int, per_page: int) -> list:
"""
Retrieve users that this user is following.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of followed user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/following", params=params) users = github.get(url=f"{self.endpoint}/following", params=params)
return users return users
def follows(self, user: str) -> bool: ... def follows(self, user: str) -> bool:
"""Check if this user follows another user.
:param user: The username to check.
:return: True if this user follows the specified user.
"""
...
def orgs(self, page: int, per_page: int) -> list: def orgs(self, page: int, per_page: int) -> list:
"""
Retrieve organisations the user belongs to.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of organisation dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
orgs = github.get(url=f"{self.endpoint}/orgs", params=params) orgs = github.get(url=f"{self.endpoint}/orgs", params=params)
return orgs return orgs
def gists(self, page: int, per_page: int) -> list: def gists(self, page: int, per_page: int) -> list:
"""
Retrieve the user's gists.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of gist dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
gists = github.get(url=f"{self.endpoint}/gists", params=params) gists = github.get(url=f"{self.endpoint}/gists", params=params)
return gists return gists
def events(self, page: int, per_page: int) -> list: def events(self, page: int, per_page: int) -> list:
"""
Retrieve the user's public events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params) events = github.get(url=f"{self.endpoint}/events", params=params)
return events return events
def received_events(self, page: int, per_page: int) -> list: def received_events(self, page: int, per_page: int) -> list:
"""
Retrieve events received by the user.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of received event dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
received_events = github.get( received_events = github.get(
url=f"{self.endpoint}/received_events", params=params url=f"{self.endpoint}/received_events", params=params
@@ -84,133 +201,337 @@ class User(GitHubEntity):
class Org(GitHubEntity): class Org(GitHubEntity):
"""Represents a GitHub organisation with methods to query organisation data."""
def __init__(self, name: str): def __init__(self, name: str):
"""
Initialise an Org instance.
:param name: The GitHub organisation name.
"""
super().__init__(source=name) super().__init__(source=name)
self.name = name self.name = name
self.endpoint = f"{BASE_URL}/orgs/{name}" self.endpoint = f"{BASE_URL}/orgs/{name}"
def profile(self) -> dict: def profile(self) -> dict:
"""
Retrieve the organisation's profile information.
:return: Dictionary containing organisation profile data.
"""
profile = github.get(url=self.endpoint) profile = github.get(url=self.endpoint)
return profile return profile
def repos(self, page: int, per_page: int) -> list: def repos(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public repositories.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params) repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos return repos
def events(self, page: int, per_page: int) -> list: def events(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params) events = github.get(url=f"{self.endpoint}/events", params=params)
return events return events
def hooks(self, page: int, per_page: int) -> list: def hooks(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's webhooks.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of webhook dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
hooks = github.get(url=f"{self.endpoint}/hooks", params=params) hooks = github.get(url=f"{self.endpoint}/hooks", params=params)
return hooks return hooks
def issues(self, page: int, per_page: int) -> list: def issues(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's issues.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params) issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues return issues
def members(self, page: int, per_page: int) -> list: def members(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public members.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of member user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
members = github.get(url=f"{self.endpoint}/members", params=params) members = github.get(url=f"{self.endpoint}/members", params=params)
return members return members
class Repo(GitHubEntity): class Repo(GitHubEntity):
"""Represents a GitHub repository with methods to query repository data."""
def __init__(self, name: str, owner: str): def __init__(self, name: str, owner: str):
"""
Initialise a Repo instance.
:param name: The repository name.
:param owner: The repository owner's username.
"""
super().__init__(source=f"{owner}/{name}") super().__init__(source=f"{owner}/{name}")
self.name = name self.name = name
self.owner = owner self.owner = owner
self.endpoint = f"{BASE_URL}/repos/{owner}/{name}" self.endpoint = f"{BASE_URL}/repos/{owner}/{name}"
def profile(self) -> dict: def profile(self) -> dict:
"""
Retrieve the repository's information.
:return: Dictionary containing repository data.
"""
profile = github.get(url=self.endpoint) profile = github.get(url=self.endpoint)
return profile return profile
def forks(self, page: int, per_page: int) -> list: def forks(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's forks.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of fork repository dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
forks = github.get(url=f"{self.endpoint}/forks", params=params) forks = github.get(url=f"{self.endpoint}/forks", params=params)
return forks return forks
def issue_events(self, page: int, per_page: int) -> list: def issue_events(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's issue events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue event dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
issue_events = github.get(url=f"{self.endpoint}/issue_events", params=params) issue_events = github.get(url=f"{self.endpoint}/issue_events", params=params)
return issue_events return issue_events
def events(self, page: int, per_page: int) -> list: def events(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params) events = github.get(url=f"{self.endpoint}/events", params=params)
return events return events
def assignees(self, page: int, per_page: int) -> list: def assignees(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's available assignees.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of assignee user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
assignees = github.get(url=f"{self.endpoint}/assignees", params=params) assignees = github.get(url=f"{self.endpoint}/assignees", params=params)
return assignees return assignees
def branches(self, page: int, per_page: int) -> list: def branches(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's branches.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of branch dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
branches = github.get(url=f"{self.endpoint}/branches", params=params) branches = github.get(url=f"{self.endpoint}/branches", params=params)
return branches return branches
def tags(self, page: int, per_page: int) -> list: def tags(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's tags.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of tag dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
tags = github.get(url=f"{self.endpoint}/tags", params=params) tags = github.get(url=f"{self.endpoint}/tags", params=params)
return tags return tags
def languages(self) -> dict: def languages(self) -> dict:
"""
Retrieve the programming languages used in the repository.
:return: Dictionary mapping language names to bytes of code.
"""
languages = github.get(url=f"{self.endpoint}/languages") languages = github.get(url=f"{self.endpoint}/languages")
return languages return languages
def stargazers(self, page: int, per_page: int) -> list: def stargazers(self, page: int, per_page: int) -> list:
"""
Retrieve users who have starred the repository.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of stargazer user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
stargazers = github.get(url=f"{self.endpoint}/stargazers", params=params) stargazers = github.get(url=f"{self.endpoint}/stargazers", params=params)
return stargazers return stargazers
def subscribers(self, page: int, per_page: int) -> list: def subscribers(self, page: int, per_page: int) -> list:
"""
Retrieve users subscribed to the repository.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of subscriber user dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
subscribers = github.get(url=f"{self.endpoint}/subscribers", params=params) subscribers = github.get(url=f"{self.endpoint}/subscribers", params=params)
return subscribers return subscribers
def commits(self, page: int, per_page: int) -> list: def commits(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's commits.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of commit dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
commits = github.get(url=f"{self.endpoint}/commits", params=params) commits = github.get(url=f"{self.endpoint}/commits", params=params)
return commits return commits
def comments(self, page: int, per_page: int) -> list: def comments(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's commit comments.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of comment dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
comments = github.get(url=f"{self.endpoint}/comments", params=params) comments = github.get(url=f"{self.endpoint}/comments", params=params)
return comments return comments
def contents(self, path: str) -> list: def contents(self, path: str) -> list:
"""
Retrieve the contents of a file or directory in the repository.
:param path: Path to the file or directory.
:return: List of content item dictionaries.
"""
contents = github.get(url=f"{self.endpoint}/contents/{path}") contents = github.get(url=f"{self.endpoint}/contents/{path}")
return contents return contents
def issues(self, page: int, per_page: int) -> list: def issues(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's issues.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params) issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues return issues
def releases(self, page: int, per_page: int) -> list: def releases(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's releases.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of release dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
releases = github.get(url=f"{self.endpoint}/releases", params=params) releases = github.get(url=f"{self.endpoint}/releases", params=params)
return releases return releases
def deployments(self, page: int, per_page: int) -> list: def deployments(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's deployments.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of deployment dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
deployments = github.get(url=f"{self.endpoint}/deployments", params=params) deployments = github.get(url=f"{self.endpoint}/deployments", params=params)
return deployments return deployments
def labels(self, page: int, per_page: int) -> list: def labels(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's labels.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of label dictionaries.
"""
params = {"page": page, "per_page": per_page} params = {"page": page, "per_page": per_page}
labels = github.get(url=f"{self.endpoint}/labels", params=params) labels = github.get(url=f"{self.endpoint}/labels", params=params)
return labels return labels
class Search: class Search:
"""Provides methods to search GitHub for various entity types."""
def __init__(self, query: str, page: int, per_page: int): def __init__(self, query: str, page: int, per_page: int):
"""
Initialise a Search instance.
:param query: The search query string.
:param page: Page number for pagination.
:param per_page: Number of results per page.
"""
self.query = query self.query = query
self.page = page self.page = page
self.per_page = per_page self.per_page = per_page
@@ -218,21 +539,51 @@ class Search:
self.params = {"q": query, "page": page, "per_page": per_page} self.params = {"q": query, "page": page, "per_page": per_page}
def repos(self) -> list: def repos(self) -> list:
"""
Search for repositories matching the query.
:return: List of repository search result dictionaries.
"""
repos = github.get(url=f"{self.endpoint}/repositories", params=self.params) repos = github.get(url=f"{self.endpoint}/repositories", params=self.params)
return repos return repos
def users(self) -> list: def users(self) -> list:
"""
Search for users matching the query.
:return: List of user search result dictionaries.
"""
users = github.get(url=f"{self.endpoint}/users", params=self.params) users = github.get(url=f"{self.endpoint}/users", params=self.params)
return users return users
def commits(self) -> list: def commits(self) -> list:
"""
Search for commits matching the query.
:return: List of commit search result dictionaries.
"""
commits = github.get(url=f"{self.endpoint}/commits", params=self.params) commits = github.get(url=f"{self.endpoint}/commits", params=self.params)
return commits return commits
def issues(self) -> list: def issues(self) -> list:
"""
Search for issues and pull requests matching the query.
:return: List of issue search result dictionaries.
"""
issues = github.get(url=f"{self.endpoint}/issues", params=self.params) issues = github.get(url=f"{self.endpoint}/issues", params=self.params)
return issues return issues
def topics(self) -> list: def topics(self) -> list:
"""
Search for topics matching the query.
:return: List of topic search result dictionaries.
"""
topics = github.get(url=f"{self.endpoint}/topics", params=self.params) topics = github.get(url=f"{self.endpoint}/topics", params=self.params)
return topics return topics

View File

@@ -31,6 +31,14 @@ console = Console(log_time=False)
def preview_response(data: t.Union[dict, list], source: str, _type: str): def preview_response(data: t.Union[dict, list], source: str, _type: str):
"""
Display a preview of response data in a tree structure.
:param data: The data to preview (dict or list).
:param source: The source identifier of the data.
:param _type: The type of data being previewed.
"""
if isinstance(data, dict): if isinstance(data, dict):
tree = Tree( tree = Tree(
label=f"\n[bold]{data.get('name') or data.get('login') or data.get('id') or 'Data'}[/bold]", label=f"\n[bold]{data.get('name') or data.get('login') or data.get('id') or 'Data'}[/bold]",
@@ -44,7 +52,7 @@ def preview_response(data: t.Union[dict, list], source: str, _type: str):
elif isinstance(data, list): elif isinstance(data, list):
preview_data = data[:5] preview_data = data[:5]
tree = Tree( tree = Tree(
label=f"\nPreview: First {len(preview_data)} of {len(data)} {_type} from {source}", label=f"\n[bold]First {len(preview_data)} of {len(data)} {_type} for '{source}'[/bold]",
guide_style="#444444", guide_style="#444444",
highlight=True, highlight=True,
) )
@@ -74,7 +82,17 @@ def export_response(
file_formats: list, file_formats: list,
output_dir: str = "../exports", output_dir: str = "../exports",
): ):
"""Export data to selected formats using built-in libraries.""" """
Export response data to one or more file formats.
:param data: The data to export (dict or list).
:param data_type: The type of data being exported.
:param source: The source identifier of the data.
:param file_formats: List of file formats to export to (json, csv, html).
:param output_dir: Directory path where exported files will be saved.
:return: List of exported file paths.
"""
# Create output directory if it doesn't exist # Create output directory if it doesn't exist
output_dir = Path(output_dir) output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True) output_dir.mkdir(exist_ok=True)
@@ -165,6 +183,14 @@ def export_response(
def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree: def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree:
"""
Recursively populate a Rich Tree with data.
:param tree: The Tree object to populate.
:param data: The data to add to the tree (dict or list).
:return: The populated Tree object.
"""
if isinstance(data, dict): if isinstance(data, dict):
for key, value in data.items(): for key, value in data.items():
if isinstance(value, dict) or isinstance(value, list): if isinstance(value, dict) or isinstance(value, list):
@@ -187,10 +213,12 @@ def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree:
def check_updates(): def check_updates():
"""Check for available package updates and display the result."""
with console.status("[dim]Checking for updates...[/dim]") as status: with console.status("[dim]Checking for updates...[/dim]") as status:
checker = UpdateChecker() checker = UpdateChecker()
result = checker.check(__pkg__, __version__) result = checker.check(__pkg__, __version__)
if result: if result is not None:
status.stop() status.stop()
message_dialog(title="Update Available", text=result).run() message_dialog(title="Update Available", text=result).run()
else: else:
@@ -207,6 +235,10 @@ def clear_screen():
def ascii_banner(text: str): def ascii_banner(text: str):
"""Display a colourful ASCII art banner with gradient styling.
:param text: The text to convert to ASCII art.
"""
clear_screen() clear_screen()
ascii_text = pyfiglet.figlet_format(text=text, font="chunky") ascii_text = pyfiglet.figlet_format(text=text, font="chunky")
@@ -224,6 +256,12 @@ def ascii_banner(text: str):
def set_menu_title(menu_type: t.Literal["home", "user", "org", "repo", "search"]): def set_menu_title(menu_type: t.Literal["home", "user", "org", "repo", "search"]):
"""
Set the terminal window title based on the current menu.
:param menu_type: The type of menu being displayed.
"""
title: str = __pkg__.title() title: str = __pkg__.title()
title += f" | {menu_type.title()}" title += f" | {menu_type.title()}"
console.set_window_title(title=title) console.set_window_title(title=title)

View File

@@ -26,20 +26,56 @@ __all__ = ["Dialogs"]
class Dialogs: class Dialogs:
def __init__(self): ... """Provides interactive dialogue boxes for user confirmations and information display."""
def __init__(self):
"""Initialise the Dialogs class."""
...
@staticmethod @staticmethod
def quit() -> bool: def _boolean(title: str, text: str) -> bool:
"""
Display a Yes/No dialogue and return the user's choice.
:param title: The title of the dialogue box.
:param text: The message text to display.
:return: True if user selects Yes, False if No or cancelled.
"""
try: try:
result = button_dialog( result = button_dialog(
title="Quit", title=title,
text="This will close the session, continue?", text=text,
buttons=[("Yes", True), ("No", False)], buttons=[("Yes", True), ("No", False)],
).run() ).run()
return result if result is not None else False return result if result is not None else False
except KeyboardInterrupt: except KeyboardInterrupt:
return True return True
def quit(self) -> bool:
"""
Display a confirmation dialogue for quitting the application.
:return: True if user confirms quit, False otherwise.
"""
return self._boolean(
title="Quit", text="This will close the session, continue?"
)
def clear_cache(self) -> bool:
"""
Display a confirmation dialogue for clearing the cache.
:return: True if user confirms cache clearing, False otherwise.
"""
return self._boolean(
title="Clear Cache",
text="This will clear all octosuite caches, continue?",
)
@staticmethod @staticmethod
def license(): def license():
"""Display the MIT license notice in a dialogue box."""
message_dialog(title="MIT License", text=LICENSE_NOTICE).run() message_dialog(title="MIT License", text=LICENSE_NOTICE).run()

View File

@@ -7,6 +7,7 @@ from rich.status import Status
from .dialogs import Dialogs from .dialogs import Dialogs
from .prompts import Prompts from .prompts import Prompts
from ..core.cache import cache
from ..core.models import User, Repo, Org, Search from ..core.models import User, Repo, Org, Search
from ..lib import check_updates, preview_response, export_response, set_menu_title from ..lib import check_updates, preview_response, export_response, set_menu_title
from ..lib import console, clear_screen, ascii_banner from ..lib import console, clear_screen, ascii_banner
@@ -20,7 +21,6 @@ CUSTOM_STYLE = Style(
INSTRUCTIONS = "↑↓ [move] • ⮠ [select]" INSTRUCTIONS = "↑↓ [move] • ⮠ [select]"
EXPORT_INSTRUCTIONS = "↑↓ [move] • ⮠ [confirm] • spacebar [check]" EXPORT_INSTRUCTIONS = "↑↓ [move] • ⮠ [confirm] • spacebar [check]"
POINTER: str = "🖝 "
dialogs = Dialogs() dialogs = Dialogs()
prompts = Prompts() prompts = Prompts()
@@ -28,8 +28,16 @@ prompts = Prompts()
__all__ = ["Menus"] __all__ = ["Menus"]
class Menus: class BaseMenu:
def __init__(self): """Base class providing common menu functionality and response handling for GitHub data queries."""
def __init__(self, main_menu: t.Callable):
"""
Initialise the BaseMenu with pagination and search method configurations.
:param main_menu: Callable reference to the main menu function.
"""
# Define which methods require pagination # Define which methods require pagination
self.paginated_methods = { self.paginated_methods = {
"repos", "repos",
@@ -64,15 +72,91 @@ class Menus:
# Search methods (all require pagination) # Search methods (all require pagination)
self.search_methods = {"repos", "users", "commits", "issues", "topics"} self.search_methods = {"repos", "users", "commits", "issues", "topics"}
self.mode_handlers = { self.main_menu = main_menu
"user": self.user,
"repo": self.repo, @staticmethod
"org": self.org, def target_validator(
"search": self.search, identifier: str,
} instance: t.Union[User, Repo, Org],
callback: t.Callable,
*callback_args,
) -> bool:
"""
Validate whether a GitHub entity exists.
:param identifier: Name or identifier of the entity to validate.
:param instance: The instance with an exists() method (User, Repo, or Org).
:param callback: Function to call if validation fails.
:param callback_args: Arguments to pass to the callback function.
:return: True if the entity exists, False otherwise.
"""
if isinstance(instance, User):
target_type = "user"
elif isinstance(instance, Repo):
target_type = "repo"
else:
target_type = "org"
with Status(
f"[dim]Validating {target_type} ({identifier})[/dim]...",
console=console,
) as status:
exists, response = instance.exists()
if not exists:
status.stop()
if response and "message" in response:
console.print(
f"[bold][yellow]✘[/yellow] {response['message']}[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
callback(*callback_args)
return False
return True
def execute_and_handle_response(
self,
instance: t.Union[User, Repo, Org, Search],
method_name: str,
target_type: t.Literal["user", "repo", "org"],
source: str,
):
"""
Execute a method on an instance and handle the resulting data.
:param instance: The instance to execute the method on.
:param method_name: Name of the method to execute.
:param target_type: Type of entity ("user", "repo", or "org").
:param source: Source identifier for response handling.
"""
valid_methods = self.paginated_methods | self.non_paginated_methods
if method_name in valid_methods:
with Status(
status=f"[dim]Initialising {target_type} {method_name}[/dim]...",
console=console,
) as status:
data = self.execute_selection(
source=source,
instance=instance,
method_name=method_name,
status=status,
)
status.stop()
self.response_handler(data=data, data_type=method_name, source=source)
def execute_selection(self, status: Status, **kwargs):
"""
Execute a method on an instance with pagination if required.
:param status: Rich Status object for displaying progress.
:param kwargs: Must include 'instance', 'method_name', and 'source'.
:return: The data returned by the executed method.
"""
def _execute_selection(self, status: Status, **kwargs):
"""Execute a method on an instance, prompting for pagination if needed."""
instance = kwargs.get("instance") instance = kwargs.get("instance")
method_name = kwargs.get("method_name") method_name = kwargs.get("method_name")
source = kwargs.get("source") source = kwargs.get("source")
@@ -84,13 +168,113 @@ class Menus:
prompts.pagination_params() if method_name in self.paginated_methods else {} prompts.pagination_params() if method_name in self.paginated_methods else {}
) )
status.start() status.start()
status.update(f"[dim]Getting {method_name} from {source}...[/dim]") status.update(f"[dim]Getting {method_name} from {source}[/dim]...")
return method(**params) return method(**params)
def _navigation(self, option: str, callback: t.Callable, *callback_args): def response_handler(self, data: t.Union[dict, list], data_type: str, source: str):
"""Handle navigation options (back, quit, change settings).""" """
Handle retrieved data by previewing and offering export options.
:param data: The data to handle (dict or list).
:param data_type: Type of data retrieved.
:param source: Source identifier of the data.
"""
try:
if not data:
console.print(
f"[bold][yellow]✘[/yellow] No data found for '{source}'[/bold]"
)
else:
preview_response(data=data, source=source, _type=data_type)
export_choice = q.select(
"What would you like to do?",
choices=[
q.Choice(
title="Export",
value="export",
description="Export the data",
shortcut_key="e",
),
q.Choice(
title="Skip",
value="skip",
description="Do nothing, and go back to previous menu",
shortcut_key="x",
),
q.Choice(
title="Quit",
value="quit",
description="Close this session",
),
],
style=CUSTOM_STYLE,
instruction=INSTRUCTIONS,
).ask()
if export_choice == "skip":
return
if export_choice == "quit":
if dialogs.quit():
sys.exit()
else:
clear_screen()
self.response_handler(
data=data,
data_type=data_type,
source=source,
)
return
file_formats = q.checkbox(
"Select export format(s)",
choices=[
q.Choice(
title="JSON",
value="json",
description="Export as JSON file",
),
q.Choice(
title="CSV",
value="csv",
description="Export as CSV file",
),
q.Choice(
title="HTML",
value="html",
description="Export as HTML table",
),
],
style=CUSTOM_STYLE,
instruction=EXPORT_INSTRUCTIONS,
validate=lambda x: len(x) > 0
or "Please select at least one format",
).ask()
export_response(
data=data,
data_type=data_type,
source=source,
file_formats=file_formats,
)
except KeyboardInterrupt:
console.print("\nExport cancelled")
finally:
console.input(" Press [bold]ENTER[/bold] to continue ...")
def navigation_handler(self, option: str, callback: t.Callable, *callback_args):
"""
Handle common navigation options across menus.
:param option: The selected navigation option.
:param callback: Function to call for certain navigation actions.
:param callback_args: Arguments to pass to the callback function.
:return: True if navigation was handled, False otherwise.
"""
navigation_handlers = { navigation_handlers = {
"back": lambda: self.main(), "back": lambda: self.main_menu(),
"quit": lambda: ( "quit": lambda: (
sys.exit() if dialogs.quit() else callback(*callback_args) sys.exit() if dialogs.quit() else callback(*callback_args)
), ),
@@ -102,83 +286,25 @@ class Menus:
return True return True
return False return False
def response_handling(self, data: t.Union[dict, list], data_type: str, source: str):
"""Export data to file in user-selected format(s)."""
preview_response(data=data, source=source, _type=data_type)
try: class Menus(BaseMenu):
export_choice = q.select( """Main menu system providing interactive interfaces for GitHub data queries."""
"What would you like to do?",
choices=[
q.Choice(
title="Export",
value="export",
description="Export the data",
shortcut_key="e",
),
q.Choice(
title="Skip",
value="skip",
description="Do nothing, and go back to previous menu",
shortcut_key="x",
),
q.Choice(
title="Quit",
value="quit",
description="Close this session",
),
],
pointer=POINTER,
style=CUSTOM_STYLE,
instruction=INSTRUCTIONS,
).ask()
if export_choice == "skip": def __init__(self):
return """Initialise the Menus class with mode handlers for different query types."""
if export_choice == "quit": super().__init__(main_menu=self.main)
if dialogs.quit():
sys.exit()
else:
self.response_handling(
data=data, data_type=data_type, source=source
)
return
file_formats = q.checkbox( self.mode_handlers = {
"Select export format(s)", "user": self.user,
choices=[ "repo": self.repo,
q.Choice( "org": self.org,
title="JSON", "search": self.search,
value="json", }
description="Export as JSON file",
),
q.Choice(
title="CSV",
value="csv",
description="Export as CSV file",
),
q.Choice(
title="HTML",
value="html",
description="Export as HTML table",
),
],
pointer=POINTER,
style=CUSTOM_STYLE,
instruction=EXPORT_INSTRUCTIONS,
validate=lambda x: len(x) > 0 or "Please select at least one format",
).ask()
export_response(
data=data, data_type=data_type, source=source, file_formats=file_formats
)
except KeyboardInterrupt:
print("\nExport cancelled")
def main(self): def main(self):
"""Main menu to select mode.""" """Display the main menu for selecting query mode."""
set_menu_title(menu_type="home") set_menu_title(menu_type="home")
clear_screen() clear_screen()
try: try:
@@ -207,6 +333,11 @@ class Menus:
value="search", value="search",
description="Search GitHub", description="Search GitHub",
), ),
q.Choice(
title="Clear cache",
value="clear_cache",
description="Clear all in-memory cache from Octosuite",
),
q.Choice( q.Choice(
title="Updates", title="Updates",
value="updates", value="updates",
@@ -223,7 +354,6 @@ class Menus:
description="Close this session", description="Close this session",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
).ask() ).ask()
@@ -239,6 +369,10 @@ class Menus:
elif action == "updates": elif action == "updates":
check_updates() check_updates()
self.main() self.main()
elif action == "clear_cache":
if dialogs.clear_cache():
cache.clear()
self.main()
elif action is None: elif action is None:
sys.exit() sys.exit()
else: else:
@@ -251,7 +385,12 @@ class Menus:
sys.exit() sys.exit()
def search(self, query: t.Optional[str] = None): def search(self, query: t.Optional[str] = None):
"""Search menu for querying GitHub.""" """
Display the search menu for querying GitHub content.
:param query: Optional search query string. If None, prompts user for input.
"""
set_menu_title(menu_type="search") set_menu_title(menu_type="search")
clear_screen() clear_screen()
if query is None: if query is None:
@@ -262,9 +401,7 @@ class Menus:
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
) )
clear_screen()
ascii_banner(text=query) ascii_banner(text=query)
option = q.select( option = q.select(
"What would you like to do/search?", "What would you like to do/search?",
choices=[ choices=[
@@ -302,7 +439,7 @@ class Menus:
q.Choice( q.Choice(
title="Go Back", title="Go Back",
value="back", value="back",
description="Go back to ewous menu", description="Go back to previous menu",
shortcut_key="b", shortcut_key="b",
), ),
q.Choice( q.Choice(
@@ -312,7 +449,6 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
@@ -320,9 +456,10 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.search, query): if self.navigation_handler(option, self.search, query):
return return
# Handle change query # Handle change query
@@ -332,11 +469,9 @@ class Menus:
# Execute search if it's a valid method # Execute search if it's a valid method
if option in self.search_methods: if option in self.search_methods:
with console.status( with Status(
status=f"[dim]Initialising {option} search...[/dim]" status=f"[dim]Initialising {option} search[/dim]...", console=console
) as status: ) as status:
# Get pagination params
status.stop() status.stop()
params = prompts.pagination_params() params = prompts.pagination_params()
status.start() status.start()
@@ -349,22 +484,29 @@ class Menus:
) )
method = getattr(search, option) method = getattr(search, option)
status.update(f"[dim]Searching {option} for {query}...[/dim]") status.update(f"[dim]Searching {option} for {query}[/dim]...")
data = method() data = method()
if data: if data:
items = data.get("items") items = data.get("items")
status.stop() status.stop()
self.response_handling( self.response_handler(
data=items if items is not None else data, data=items if items is not None else data,
data_type=option, data_type=option,
source=query, source=query,
) )
# After handling response, show menu again
self.search(query=query) self.search(query=query)
def user(self, username: t.Optional[str] = None): def user(self, username: t.Optional[str] = None, is_validated: bool = False):
"""User menu for querying user data.""" """
Display the user menu for querying GitHub user data.
:param username: Optional GitHub username. If None, prompts user for input.
:param is_validated: Whether the username has already been validated.
"""
set_menu_title(menu_type="user") set_menu_title(menu_type="user")
clear_screen() clear_screen()
if username is None: if username is None:
@@ -376,9 +518,16 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() user = User(name=username)
ascii_banner(text=username)
if not self.target_validator(
identifier=username,
instance=user,
callback=self.user,
):
return
ascii_banner(text=username)
option = q.select( option = q.select(
"What would you like to do/get?", "What would you like to do/get?",
choices=[ choices=[
@@ -451,7 +600,6 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
@@ -459,50 +607,40 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.user, username): elif self.navigation_handler(option, self.user, username):
return return
# Handle change username # Handle change username
if option == "change_username": elif option == "change_username":
self.user() self.user()
return return
else:
# Execute action and handle response
self.execute_and_handle_response(
instance=user,
method_name=option,
target_type="user",
source=username,
)
# Execute action if it's a valid method # After handling response, show menu again WITHOUT re-validating
valid_methods = self.paginated_methods | self.non_paginated_methods self.user(username=username, is_validated=True)
if option in valid_methods:
with Status(
status=f"[dim]Initialising user {option}...[/dim]",
console=console,
) as status:
user = User(name=username)
status.update(f"[dim]Validating user's ({username}) existence...[/dim]") def repo(
if user.exists(): self,
console.print( name: t.Optional[str] = None,
f"[bold][green]✔[/green] User ({username}) exists on GitHub[/bold]" owner: t.Optional[str] = None,
) ):
data = self._execute_selection( """
source=username, Display the repository menu for querying GitHub repository data.
instance=user,
method_name=option,
status=status,
)
status.stop() :param name: Optional repository name. If None, prompts user for input.
self.response_handling(data=data, data_type=option, source=username) :param owner: Optional repository owner. If None, prompts user for input.
else: """
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] User ({username}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.user()
self.user(username=username)
def repo(self, name: t.Optional[str] = None, owner: t.Optional[str] = None):
"""Repository menu for querying repo data."""
set_menu_title(menu_type="repo") set_menu_title(menu_type="repo")
clear_screen() clear_screen()
if name is None or owner is None: if name is None or owner is None:
@@ -521,9 +659,18 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() repo = Repo(name=name, owner=owner)
ascii_banner(text=f"{owner}/{name}") source = f"{owner}/{name}"
# Only validate if not already validated
if not self.target_validator(
identifier=source,
instance=repo,
callback=self.repo,
):
return
ascii_banner(text=source)
option = q.select( option = q.select(
"What would you like to do/get?", "What would you like to do/get?",
choices=[ choices=[
@@ -635,19 +782,11 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
).ask() ).ask()
if option is None:
self.main()
# Handle navigation
if self._navigation(option, self.repo, name, owner):
return
# Handle change options # Handle change options
change_handlers = { change_handlers = {
"change_repo_name": lambda: self.repo(owner=owner), "change_repo_name": lambda: self.repo(owner=owner),
@@ -655,45 +794,36 @@ class Menus:
"change_both": lambda: self.repo(), "change_both": lambda: self.repo(),
} }
if option in change_handlers: if option is None:
change_handlers[option]() self.main()
return return
# Execute action if it's a valid method # Handle navigation
valid_methods = self.paginated_methods | self.non_paginated_methods elif self.navigation_handler(option, self.repo, name, owner):
if option in valid_methods: return
source = f"{owner}/{name}"
with Status(
status=f"[dim]Initialising repository {option}...[/dim]",
console=console,
) as status:
repo = Repo(name=name, owner=owner)
status.update( elif option in change_handlers:
f"[dim]Validating repository's ({source}) existence...[/dim]" change_handlers[option]()
) return
if repo.exists(): else:
console.print( # Execute action and handle response
f"[bold][green]✔[/green] Repository ({source}) exists on GitHub[/bold]" self.execute_and_handle_response(
) instance=repo,
data = self._execute_selection( method_name=option,
source=source, instance=repo, method_name=option, status=status target_type="repo",
) source=source,
)
status.stop() # After handling response, show menu again WITHOUT re-validating
self.response_handling(data=data, data_type=option, source=source) self.repo(name=name, owner=owner)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Repository ({source}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.repo()
self.repo(name=name, owner=owner)
def org(self, name: t.Optional[str] = None): def org(self, name: t.Optional[str] = None):
"""Organisation menu for querying org data.""" """
Display the organisation menu for querying GitHub organisation data.
:param name: Optional organisation name. If None, prompts user for input.
"""
set_menu_title(menu_type="org") set_menu_title(menu_type="org")
clear_screen() clear_screen()
if name is None: if name is None:
@@ -705,8 +835,14 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() org = Org(name=name)
ascii_banner(text=name)
if not self.target_validator(
identifier=name,
instance=org,
callback=self.org,
):
return
option = q.select( option = q.select(
"What would you like to do?", "What would you like to do?",
@@ -766,45 +902,22 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.org, name): elif self.navigation_handler(option, self.org, name):
return return
# Handle change org # Handle change org
if option == "change_org": elif option == "change_org":
self.org() self.org()
return return
# Execute action if it's a valid method else:
valid_methods = self.paginated_methods | self.non_paginated_methods # Execute action and handle response
if option in valid_methods: self.execute_and_handle_response(
with Status( instance=org, method_name=option, target_type="org", source=name
status=f"[dim]Initialising organisation {option}...[/dim]", )
console=console,
) as status:
org = Org(name=name)
status.update( # After handling response, show menu again WITHOUT re-validating
f"[dim]Validating organisation's ({name}) existence...[/dim]" self.org(name=name)
)
if org.exists():
console.print(
f"[bold][green]✔[/green] Organisation ({name}) exists on GitHub[/bold]"
)
data = self._execute_selection(
source=name, instance=org, method_name=option, status=status
)
status.stop()
self.response_handling(data=data, data_type=option, source=name)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Organisation ({name}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.org()
self.org(name=name)

View File

@@ -7,7 +7,10 @@ __all__ = ["Prompts"]
class Prompts: class Prompts:
"""Provides interactive prompts for user input collection."""
def __init__(self): def __init__(self):
"""Initialise the Prompts class."""
pass pass
@staticmethod @staticmethod
@@ -17,17 +20,40 @@ class Prompts:
style: t.Optional[Style] = None, style: t.Optional[Style] = None,
qmark: t.Optional[str] = "?", qmark: t.Optional[str] = "?",
) -> str: ) -> str:
return q.text( """
Display a text input prompt and validate non-empty input.
:param message: The prompt message to display.
:param instruction: Optional instruction text to guide the user.
:param style: Optional questionary Style object for custom styling.
:param qmark: Optional custom question mark character or string.
:return: The user's input string.
:raises KeyboardInterrupt: If the user cancels the prompt with CTRL+C.
"""
result = q.text(
message=message, message=message,
instruction=instruction, instruction=instruction,
style=style, style=style,
qmark=qmark, qmark=qmark,
validate=lambda text: len(text.strip()) > 0 or "Input cannot be empty", validate=lambda text: len(text.strip()) > 0
or None
or "Input cannot be empty",
).ask() ).ask()
if result is None:
raise KeyboardInterrupt
return result
@staticmethod @staticmethod
def pagination_params() -> dict: def pagination_params() -> dict:
"""Prompt user for pagination parameters.""" """
Prompt the user for pagination parameters.
:return: Dictionary containing 'page' and 'per_page' integer values.
"""
try: try:
page = q.text(message="Page", default="1", qmark="n").ask() page = q.text(message="Page", default="1", qmark="n").ask()
per_page = q.text( per_page = q.text(