Compare commits

..

37 Commits
4.0.0b0 ... dev

Author SHA1 Message Date
Ritchie Mwewa
4248b10476 chore: show banner in cli 2026-02-28 03:21:10 +02:00
Ritchie Mwewa
054d299a04 chore: show banner in cli 2026-02-28 03:20:48 +02:00
Ritchie Mwewa
62491de83f feat: add cli. tui can be run with -t/--tui 2026-02-28 03:12:25 +02:00
Ritchie Mwewa
31742efb5c feat: add cli. tui can be run with -t/--tui 2026-02-28 03:11:18 +02:00
Ritchie Mwewa
0caf6f7745 Merge pull request #28 from bellingcat/5.0/beta
5.0/beta
2026-01-24 00:21:44 +02:00
Ritchie Mwewa
307a59cd9f Update README.md 2026-01-24 00:16:29 +02:00
Ritchie Mwewa
0470189396 Update README.md 2026-01-23 23:42:13 +02:00
Ritchie Mwewa
e21305669f Update _cli.py 2026-01-23 23:38:34 +02:00
Ritchie Mwewa
2ecce50686 Update pyproject.toml 2026-01-23 23:24:11 +02:00
Ritchie Mwewa
5464932b09 Update README.md 2026-01-23 23:22:59 +02:00
Ritchie Mwewa
fb3f1a39af 5.0 beta 2026-01-23 23:16:48 +02:00
Ritchie Mwewa
eed3ee8e1c 4.0 stable 2026-01-06 22:31:38 +02:00
Ritchie Mwewa
5bb0b6db6b 4.0 stable 2026-01-06 22:19:11 +02:00
Ritchie Mwewa
1916a7e1a6 4.0/release-candidate 2026-01-06 22:12:49 +02:00
Ritchie Mwewa
400829c6ec 4.0/release-candidate 2026-01-06 22:10:52 +02:00
Ritchie Mwewa
914e320b9e Merge pull request #27 from bellingcat/4.0/release-candidate
4.0/release-candidate
2026-01-06 21:44:48 +02:00
Ritchie Mwewa
170148172d Merge branch 'master' into 4.0/release-candidate 2026-01-06 21:44:39 +02:00
Ritchie Mwewa
4d589beda5 4.0/release-candidate 2026-01-06 21:36:25 +02:00
Ritchie Mwewa
4f207d5343 Merge pull request #26 from bellingcat/4.0/beta
4.0/beta
2026-01-04 06:07:56 +02:00
Ritchie Mwewa
9de7bf1bfc Merge branch 'master' into 4.0/beta 2026-01-04 06:07:45 +02:00
Ritchie Mwewa
8d0695324a Merge branch '4.0/beta' of github.com:bellingcat/octosuite into 4.0/beta 2026-01-04 06:06:29 +02:00
Ritchie Mwewa
6ce80a1eea Patch for bug that called the 'Press ENTER to continue...' prompt, when we skip exporting response 2026-01-04 06:05:18 +02:00
Ritchie Mwewa
9b0f490089 Merge pull request #25 from bellingcat/4.0/beta
4.0/beta
2026-01-04 05:54:20 +02:00
Ritchie Mwewa
d9e2dae71d Merge branch 'master' into 4.0/beta 2026-01-04 05:54:11 +02:00
Ritchie Mwewa
0c06b95d18 Merge branch '4.0/beta' of github.com:bellingcat/octosuite into 4.0/beta 2026-01-04 05:52:23 +02:00
Ritchie Mwewa
ba8ed6bd4e Patches in menus 2026-01-04 05:52:01 +02:00
Ritchie Mwewa
86d0112691 Merge pull request #24 from bellingcat/4.0/beta
4.0/beta
2026-01-03 23:04:21 +02:00
Ritchie Mwewa
39f82ad775 Merge branch 'master' into 4.0/beta 2026-01-03 23:04:11 +02:00
Ritchie Mwewa
b5ee95db2b Validating targets will instantly cache their profile data since we're hitting the profile api to validate 2026-01-03 23:02:58 +02:00
Ritchie Mwewa
77efa0982f Merge branch '4.0/beta' of github.com:bellingcat/octosuite into 4.0/beta 2026-01-03 22:52:22 +02:00
Ritchie Mwewa
38a9a4b6b3 Allow response caching 2026-01-03 22:51:52 +02:00
Ritchie Mwewa
ab3cc398d0 Merge pull request #23 from bellingcat/4.0/beta
4.0/beta
2026-01-03 17:18:57 +02:00
Ritchie Mwewa
0a453dade6 Merge branch 'master' into 4.0/beta 2026-01-03 17:18:45 +02:00
Ritchie Mwewa
66abbcbef6 Update README.md 2026-01-03 17:17:33 +02:00
Ritchie Mwewa
04e35dbc98 Update README.md 2026-01-03 16:56:07 +02:00
Ritchie Mwewa
f4d7fa81b8 4.0.0-beta1 2026-01-03 16:39:29 +02:00
Ritchie Mwewa
7bf04cffd4 4.0.0-beta1 2026-01-03 16:36:07 +02:00
23 changed files with 1942 additions and 652 deletions

View File

@@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2025 Bellingcat Copyright (c) 2026 Bellingcat
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

157
README.md
View File

@@ -1,6 +1,6 @@
![octosuite](img/octosuite.png) ![octosuite](https://raw.githubusercontent.com/bellingcat/octosuite/refs/heads/master/img/octosuite.png)
TUI-based toolkit for GitHub data analysis. Terminal-based toolkit for GitHub data analysis.
![PyPI - Version](https://img.shields.io/pypi/v/octosuite) ![PyPI - Version](https://img.shields.io/pypi/v/octosuite)
![PyPI - Downloads](https://img.shields.io/pepy/dt/octosuite) ![PyPI - Downloads](https://img.shields.io/pepy/dt/octosuite)
@@ -9,65 +9,130 @@ TUI-based toolkit for GitHub data analysis.
![Build Status](https://img.shields.io/github/actions/workflow/status/bellingcat/octosuite/python-publish.yml) ![Build Status](https://img.shields.io/github/actions/workflow/status/bellingcat/octosuite/python-publish.yml)
![License](https://img.shields.io/github/license/bellingcat/octosuite) ![License](https://img.shields.io/github/license/bellingcat/octosuite)
## Overview ```shell
$ octosuite user torvalds
```
OctoSuite provides a terminal interface for exploring and exporting GitHub data. Access information about users, ```python
repositories, organizations, and search across GitHub's platform. from pprint import pprint
import octosuite
## Features user = octosuite.User(name="torvalds")
exists, profile = user.exists()
- **User Data** - View profiles, repositories, followers, organizations, and activity, e.t.c. if exists:
- **Repository Data** - Access repository details, commits, issues, releases, and contributors pprint(profile)
- **Organisation Data** - Explore organisation profiles, members, and repositories ```
- **Search** - Search across repositories, users, commits, issues, and topics
- **Export** - Save data in JSON, CSV, or HTML formats
## Installation ## Installation
### PyPI
```bash ```bash
pip install octosuite pip install octosuite
``` ```
### Build from source
```bash
# Clone repository
git clone https://github.com/bellingcat/octosuite.git
# Move to octosuite directory
cd octosuite
# Build and install (uses uv)
make install
# If you dont have uv installed, you can install directly with pip:
pip install .
# Run
octosuite
```
> [!Note]
> You can run octosuite with commands `octosuite`, or `ocs`
## Usage ## Usage
Navigate using <kbd>UP</kbd><kbd>DOWN</kbd> and <kbd>ENTER</kbd> to select options. The interface guides you through ### TUI (Interactive)
selecting a
data source
and
choosing what information to retrieve. Preview the results and optionally export them in your preferred format.
![home](img/menu.png) Launch the interactive terminal interface:
## License ```bash
octosuite -t/--tui
```
### MIT License Navigate using arrow keys and Enter to select options.
See the LICENSE file for details. License information is also available through the application's main menu. ### CLI
## Contributing Query GitHub data directly from the command line:
Contributions are welcome. Please submit pull requests or open issues for bugs and feature requests. Good luck! ```bash
# User data
octosuite user torvalds
octosuite user torvalds --repos --page 1 --per-page 50
octosuite user torvalds --followers --json
# Repository data
octosuite repo torvalds/linux
octosuite repo torvalds/linux --commits
octosuite repo torvalds/linux --stargazers --export ./data
# Organisation data
octosuite org github
octosuite org github --members --json
# Search
octosuite search "machine learning" --repos
octosuite search "python cli" --users --json
```
**Common options:**
- `--page` - Page number (default: 1)
- `--per-page` - Results per page, max 100 (default: 100)
- `--json` - Output as JSON
- `--export DIR` - Export to directory
Run `octosuite <command> --help` for available data type flags.
### Library
Use octosuite in your Python projects:
```python
from octosuite import User, Repo, Org, Search
# Get user data
user = User("torvalds")
exists, profile = user.exists()
if exists:
repos = user.repos(page=1, per_page=100)
followers = user.followers(page=1, per_page=50)
# Get repository data
repo = Repo(name="linux", owner="torvalds")
exists, profile = repo.exists()
if exists:
commits = repo.commits(page=1, per_page=100)
languages = repo.languages()
# Get organisation data
org = Org("github")
exists, profile = org.exists()
if exists:
members = org.members(page=1, per_page=100)
# Search GitHub
search = Search(query="machine learning", page=1, per_page=50)
results = search.repos()
```
## Features
<details>
<summary><strong>Data Types</strong></summary>
**User:** profile, repos, subscriptions, starred, followers, following, orgs, gists, events, received_events
**Repository:** profile, forks, issue_events, events, assignees, branches, tags, languages, stargazers, subscribers,
commits, comments, issues, releases, deployments, labels
**Organisation:** profile, repos, events, hooks, issues, members
**Search:** repos, users, commits, issues, topics
</details>
<details>
<summary><strong>Export Formats</strong></summary>
- JSON
- CSV
- HTML
</details>
## Licence
MIT Licence. See the [LICENCE](https://raw.githubusercontent.com/bellingcat/octosuite/refs/heads/master/LICENSE) file
for details.

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "octosuite" name = "octosuite"
version = "4.0.0-beta" version = "5.1.0"
description = "TUI-based toolkit for GitHub data analysis." description = "Terminal-based toolkit for GitHub data analysis."
readme = "README.md" readme = "README.md"
license = "MIT" license = "MIT"
authors = [ authors = [
@@ -9,13 +9,13 @@ authors = [
] ]
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"rich>=14.2.0", "rich>=14.3.3",
"questionary>=2.1.1", "questionary>=2.1.1",
"pyfiglet>=1.0.4", "pyfiglet>=1.0.4",
"update-checker>=0.18.0" "update-checker>=0.18.0",
] ]
classifiers = [ classifiers = [
"Development Status :: 4 - Beta", "Development Status :: 5 - Production/Stable",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Intended Audience :: End Users/Desktop", "Intended Audience :: End Users/Desktop",
"Operating System :: OS Independent", "Operating System :: OS Independent",
@@ -23,15 +23,17 @@ classifiers = [
] ]
[project.urls] [project.urls]
homepage = "https://bellingcat.com" Homepage = "https://bellingcat.com"
issues = "https://github.com/bellingcat/octosuite/issues" Issues = "https://github.com/bellingcat/octosuite/issues"
repository = "https://github.com/bellingcat/octosuite" Repository = "https://github.com/bellingcat/octosuite"
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"black>=25.12.0", "black>=26.1.0",
] ]
[project.scripts] [project.scripts]
octosuite = "octosuite.app:start" octosuite = "octosuite.app.main:start_app"
ocs = "octosuite.app:start"
[tool.uv]
package = true

View File

@@ -1,2 +1,4 @@
__pkg__ = "octosuite" from .api.cache import cache
__version__ = "4.0.0" from .api.models import User, Org, Repo, Search
__all__ = ["User", "Org", "Repo", "Search", "cache"]

View File

@@ -0,0 +1,74 @@
import hashlib
import json
__all__ = ["cache"]
class ResponseCache:
"""Simple in-memory cache for API responses."""
def __init__(self):
"""Initialise the ResponseCache with an empty cache dictionary."""
self._cache = {}
@staticmethod
def _generate_key(url: str, params: dict = None) -> str:
"""
Generate a unique cache key from URL and parameters.
:param url: The URL to generate a key for.
:param params: Optional dictionary of parameters to include in the key.
:return: MD5 hash string representing the unique cache key.
"""
cache_data = f"{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_data.encode()).hexdigest()
def get(self, url: str, params: dict = None):
"""
Retrieve a cached response if it exists.
:param url: The URL to retrieve cached data for.
:param params: Optional dictionary of parameters used in the original request.
:return: Cached response data if found, None otherwise.
"""
key = self._generate_key(url, params)
return self._cache.get(key)
def set(self, url: str, data, params: dict = None):
"""
Store a response in the cache.
:param url: The URL to cache data for.
:param data: The response data to cache.
:param params: Optional dictionary of parameters used in the request.
"""
key = self._generate_key(url, params)
self._cache[key] = data
def clear(self):
"""Clear all cached responses from memory."""
self._cache.clear()
def remove(self, url: str, params: dict = None):
"""
Remove a specific cached response.
:param url: The URL to remove cached data for.
:param params: Optional dictionary of parameters used in the original request.
"""
key = self._generate_key(url, params)
self._cache.pop(key, None)
cache = ResponseCache()
def clear_cache():
"""Clear all cached API responses from the global cache instance."""
cache.clear()

140
src/octosuite/api/github.py Normal file
View File

@@ -0,0 +1,140 @@
import re
import sys
import typing as t
import requests
from requests import Response
from .cache import cache
from ..meta import __version__
BASE_URL = "https://api.github.com"
__all__ = ["BASE_URL", "GitHub"]
class GitHub:
"""Handles GitHub API requests with caching and response sanitisation."""
def __init__(
self,
user_agent: str = (
f"octosuite/{__version__} "
f"(Python {sys.version.split()[0]}; "
f"https://github.com/bellingcat/octosuite) requests/{requests.__version__}"
),
):
"""
Initialise the GitHub API client.
:param user_agent: Custom User-Agent string for API requests.
"""
self.user_agent = user_agent
self.cache = cache
def get(
self,
url: str,
params: t.Optional[dict] = None,
return_response: bool = False,
use_cache: bool = True,
) -> t.Union[dict, list, Response]:
"""
Make a GET request to the GitHub API.
:param url: The API endpoint URL.
:param params: Optional query parameters for the request.
:param return_response: If True, return the raw Response object instead of JSON data.
:param use_cache: If True, use cached responses when available.
:return: Dictionary, list, or Response object depending on the request and parameters.
"""
if use_cache and not return_response:
cached = self.cache.get(url, params)
if cached is not None:
return cached
response = requests.get(
url=url, params=params, headers={"User-Agent": self.user_agent}
)
if return_response:
return response
if response.status_code == 200:
sanitised = self.sanitise_response(response=response.json())
# Cache the successful response
if use_cache:
self.cache.set(url, sanitised, params)
return sanitised
return []
def is_valid_entity(
self, _type: t.Literal["user", "org", "repo"], **kwargs
) -> bool:
"""
Validate whether a GitHub entity exists.
:param _type: Type of entity to validate ("user", "org", or "repo").
:param kwargs: Entity identifiers (username for user/org, repo_owner and repo_name for repo).
:return: True if the entity exists, False otherwise.
"""
try:
type_map = {
"user": f"https://api.github.com/users/{kwargs.get('username')}",
"org": f"https://api.github.com/orgs/{kwargs.get('username')}",
"repo": f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}",
}
url = type_map[_type]
# Check cache first
cached = self.cache.get(url)
if cached is not None:
return True # If cached, entity exists
response = requests.get(url=url, headers={"User-Agent": self.user_agent})
# Only cache if entity exists (status 200)
if response.status_code == 200:
sanitised = self.sanitise_response(response.json())
self.cache.set(url, sanitised)
return True
return False
except requests.RequestException:
return False
def sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]:
"""
Remove API URLs and null values from response data recursively.
:param response: The response data to sanitise (dict or list).
:return: Sanitised response with API URLs and null values removed.
"""
pattern = re.compile(r"https://api\.github\.com")
if isinstance(response, list):
return [self.sanitise_response(response=item) for item in response]
if isinstance(response, dict):
keys_to_remove = [
key
for key, value in response.items()
if (isinstance(value, str) and pattern.search(value)) or value is None
]
for key in keys_to_remove:
response.pop(key)
# Recursively clean nested dicts/lists
for key, value in response.items():
if isinstance(value, (dict, list)):
response[key] = self.sanitise_response(response=value)
return response

589
src/octosuite/api/models.py Normal file
View File

@@ -0,0 +1,589 @@
from requests import exceptions
from .github import GitHub, BASE_URL
github = GitHub()
__all__ = ["User", "Org", "Repo", "Search"]
class GitHubEntity:
"""
Base class for GitHub entities with common functionality."""
def __init__(self, source: str):
"""
Initialise a GitHub entity.
:param source: The source identifier for the entity.
"""
self.endpoint = None
self.source = source
def exists(self) -> tuple[bool, dict]:
"""
Check if the entity exists on GitHub.
:return: Tuple of (exists, response_data) where exists is True if the entity is found.
"""
# Check cache first
cached = github.cache.get(self.endpoint)
if cached is not None:
return True, cached
try:
response = github.get(url=self.endpoint, return_response=True)
if response.status_code == 200:
data = response.json()
# Sanitise the data
sanitised = github.sanitise_response(data)
# Cache the sanitised response
github.cache.set(self.endpoint, sanitised)
return True, sanitised
return False, response.json()
except exceptions.RequestException:
return False, {}
class User(GitHubEntity):
"""Represents a GitHub user with methods to query user data."""
def __init__(self, name: str):
"""
Initialise a User instance.
:param name: The GitHub username.
"""
super().__init__(source=name)
self.name = name
self.endpoint = f"{BASE_URL}/users/{name}"
def profile(self) -> dict:
"""
Retrieve the user's profile information.
:return: Dictionary containing user profile data.
"""
profile = github.get(url=self.endpoint)
return profile
def repos(self, page: int, per_page: int) -> list:
"""
Retrieve the user's public repositories.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository dictionaries.
"""
params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos
def subscriptions(self, page: int, per_page: int) -> list:
"""
Retrieve repositories the user is subscribed to.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository subscription dictionaries.
"""
params = {"page": page, "per_page": per_page}
subscriptions = github.get(url=f"{self.endpoint}/subscriptions", params=params)
return subscriptions
def starred(self, page: int, per_page: int) -> list:
"""
Retrieve repositories the user has starred.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of starred repository dictionaries.
"""
params = {"page": page, "per_page": per_page}
starred = github.get(url=f"{self.endpoint}/starred", params=params)
return starred
def followers(self, page: int, per_page: int) -> list:
"""
Retrieve the user's followers.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of follower user dictionaries.
"""
params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/followers", params=params)
return users
def following(self, page: int, per_page: int) -> list:
"""
Retrieve users that this user is following.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of followed user dictionaries.
"""
params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/following", params=params)
return users
def follows(self, user: str) -> bool:
"""Check if this user follows another user.
:param user: The username to check.
:return: True if this user follows the specified user.
"""
...
def orgs(self, page: int, per_page: int) -> list:
"""
Retrieve organisations the user belongs to.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of organisation dictionaries.
"""
params = {"page": page, "per_page": per_page}
orgs = github.get(url=f"{self.endpoint}/orgs", params=params)
return orgs
def gists(self, page: int, per_page: int) -> list:
"""
Retrieve the user's gists.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of gist dictionaries.
"""
params = {"page": page, "per_page": per_page}
gists = github.get(url=f"{self.endpoint}/gists", params=params)
return gists
def events(self, page: int, per_page: int) -> list:
"""
Retrieve the user's public events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def received_events(self, page: int, per_page: int) -> list:
"""
Retrieve events received by the user.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of received event dictionaries.
"""
params = {"page": page, "per_page": per_page}
received_events = github.get(
url=f"{self.endpoint}/received_events", params=params
)
return received_events
class Org(GitHubEntity):
"""Represents a GitHub organisation with methods to query organisation data."""
def __init__(self, name: str):
"""
Initialise an Org instance.
:param name: The GitHub organisation name.
"""
super().__init__(source=name)
self.name = name
self.endpoint = f"{BASE_URL}/orgs/{name}"
def profile(self) -> dict:
"""
Retrieve the organisation's profile information.
:return: Dictionary containing organisation profile data.
"""
profile = github.get(url=self.endpoint)
return profile
def repos(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public repositories.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of repository dictionaries.
"""
params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos
def events(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def hooks(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's webhooks.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of webhook dictionaries.
"""
params = {"page": page, "per_page": per_page}
hooks = github.get(url=f"{self.endpoint}/hooks", params=params)
return hooks
def issues(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's issues.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue dictionaries.
"""
params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues
def members(self, page: int, per_page: int) -> list:
"""
Retrieve the organisation's public members.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of member user dictionaries.
"""
params = {"page": page, "per_page": per_page}
members = github.get(url=f"{self.endpoint}/members", params=params)
return members
class Repo(GitHubEntity):
"""Represents a GitHub repository with methods to query repository data."""
def __init__(self, name: str, owner: str):
"""
Initialise a Repo instance.
:param name: The repository name.
:param owner: The repository owner's username.
"""
super().__init__(source=f"{owner}/{name}")
self.name = name
self.owner = owner
self.endpoint = f"{BASE_URL}/repos/{owner}/{name}"
def profile(self) -> dict:
"""
Retrieve the repository's information.
:return: Dictionary containing repository data.
"""
profile = github.get(url=self.endpoint)
return profile
def forks(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's forks.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of fork repository dictionaries.
"""
params = {"page": page, "per_page": per_page}
forks = github.get(url=f"{self.endpoint}/forks", params=params)
return forks
def issue_events(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's issue events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue event dictionaries.
"""
params = {"page": page, "per_page": per_page}
issue_events = github.get(url=f"{self.endpoint}/issue_events", params=params)
return issue_events
def events(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's events.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of event dictionaries.
"""
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def assignees(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's available assignees.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of assignee user dictionaries.
"""
params = {"page": page, "per_page": per_page}
assignees = github.get(url=f"{self.endpoint}/assignees", params=params)
return assignees
def branches(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's branches.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of branch dictionaries.
"""
params = {"page": page, "per_page": per_page}
branches = github.get(url=f"{self.endpoint}/branches", params=params)
return branches
def tags(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's tags.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of tag dictionaries.
"""
params = {"page": page, "per_page": per_page}
tags = github.get(url=f"{self.endpoint}/tags", params=params)
return tags
def languages(self) -> dict:
"""
Retrieve the programming languages used in the repository.
:return: Dictionary mapping language names to bytes of code.
"""
languages = github.get(url=f"{self.endpoint}/languages")
return languages
def stargazers(self, page: int, per_page: int) -> list:
"""
Retrieve users who have starred the repository.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of stargazer user dictionaries.
"""
params = {"page": page, "per_page": per_page}
stargazers = github.get(url=f"{self.endpoint}/stargazers", params=params)
return stargazers
def subscribers(self, page: int, per_page: int) -> list:
"""
Retrieve users subscribed to the repository.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of subscriber user dictionaries.
"""
params = {"page": page, "per_page": per_page}
subscribers = github.get(url=f"{self.endpoint}/subscribers", params=params)
return subscribers
def commits(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's commits.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of commit dictionaries.
"""
params = {"page": page, "per_page": per_page}
commits = github.get(url=f"{self.endpoint}/commits", params=params)
return commits
def comments(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's commit comments.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of comment dictionaries.
"""
params = {"page": page, "per_page": per_page}
comments = github.get(url=f"{self.endpoint}/comments", params=params)
return comments
def contents(self, path: str) -> list:
"""
Retrieve the contents of a file or directory in the repository.
:param path: Path to the file or directory.
:return: List of content item dictionaries.
"""
contents = github.get(url=f"{self.endpoint}/contents/{path}")
return contents
def issues(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's issues.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of issue dictionaries.
"""
params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues
def releases(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's releases.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of release dictionaries.
"""
params = {"page": page, "per_page": per_page}
releases = github.get(url=f"{self.endpoint}/releases", params=params)
return releases
def deployments(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's deployments.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of deployment dictionaries.
"""
params = {"page": page, "per_page": per_page}
deployments = github.get(url=f"{self.endpoint}/deployments", params=params)
return deployments
def labels(self, page: int, per_page: int) -> list:
"""
Retrieve the repository's labels.
:param page: Page number for pagination.
:param per_page: Number of results per page.
:return: List of label dictionaries.
"""
params = {"page": page, "per_page": per_page}
labels = github.get(url=f"{self.endpoint}/labels", params=params)
return labels
class Search:
"""Provides methods to search GitHub for various entity types."""
def __init__(self, query: str, page: int, per_page: int):
"""
Initialise a Search instance.
:param query: The search query string.
:param page: Page number for pagination.
:param per_page: Number of results per page.
"""
self.query = query
self.page = page
self.per_page = per_page
self.endpoint = f"{BASE_URL}/search"
self.params = {"q": query, "page": page, "per_page": per_page}
def repos(self) -> list:
"""
Search for repositories matching the query.
:return: List of repository search result dictionaries.
"""
repos = github.get(url=f"{self.endpoint}/repositories", params=self.params)
return repos
def users(self) -> list:
"""
Search for users matching the query.
:return: List of user search result dictionaries.
"""
users = github.get(url=f"{self.endpoint}/users", params=self.params)
return users
def commits(self) -> list:
"""
Search for commits matching the query.
:return: List of commit search result dictionaries.
"""
commits = github.get(url=f"{self.endpoint}/commits", params=self.params)
return commits
def issues(self) -> list:
"""
Search for issues and pull requests matching the query.
:return: List of issue search result dictionaries.
"""
issues = github.get(url=f"{self.endpoint}/issues", params=self.params)
return issues
def topics(self) -> list:
"""
Search for topics matching the query.
:return: List of topic search result dictionaries.
"""
topics = github.get(url=f"{self.endpoint}/topics", params=self.params)
return topics

View File

@@ -1,13 +0,0 @@
import sys
from .lib import console, __pkg__, __version__
from .tui.menus import Menus
def start():
try:
console.set_window_title(title=f"{__pkg__.title()} v{__version__}")
menu = Menus()
menu.main()
except KeyboardInterrupt:
sys.exit()

View File

@@ -0,0 +1,4 @@
from . import cli, tui
from ..meta import __pkg__, __version__
__all__ = ["cli", "tui", "__pkg__", "__version__"]

View File

@@ -0,0 +1,3 @@
from .main import run_cli, arg_parser
__all__ = ["arg_parser", "run_cli"]

View File

@@ -0,0 +1,483 @@
import argparse
import json
import sys
import typing as t
from ..lib import (
export_response,
preview_response,
console,
check_updates,
ascii_banner,
)
from ...api.models import User, Org, Repo, Search
from ...meta import __pkg__, __version__
__all__ = ["arg_parser", "run_cli"]
def arg_parser() -> argparse.ArgumentParser:
"""
Create the argument parser.
:return: Configured ArgumentParser.
"""
parser = argparse.ArgumentParser(
prog=__pkg__,
description="Terminal-based toolkit for GitHub data analysis - for Bellingcat",
)
parser.add_argument(
"-v",
"--version",
action="version",
version=f"%(prog)s {__version__}, MIT Licence © Bellingcat",
)
parser.add_argument(
"-t", "--tui", action="store_true", help="launch interactive TUI"
)
parser.add_argument(
"-p", "--page", type=int, default=1, help="page number (default: %(default)s)"
)
parser.add_argument(
"-N",
"--per-page",
type=int,
default=100,
help="maximum number of results per page (default: %(default)s)",
)
parser.add_argument("-j", "--json", action="store_true", help="output as JSON")
parser.add_argument("-d", "--dir", metavar="DIR", help="export to directory")
subparsers = parser.add_subparsers(dest="command")
# User command
user_parser = subparsers.add_parser("user", help="get user data")
user_parser.add_argument("username", help="GitHub username")
user_group = user_parser.add_mutually_exclusive_group()
user_group.add_argument(
"--profile",
action="store_const",
const="profile",
dest="data_type",
help="profile data (default)",
)
user_group.add_argument(
"--repos",
action="store_const",
const="repos",
dest="data_type",
help="repositories",
)
user_group.add_argument(
"--subscriptions",
action="store_const",
const="subscriptions",
dest="data_type",
help="subscriptions",
)
user_group.add_argument(
"--starred",
action="store_const",
const="starred",
dest="data_type",
help="starred repos",
)
user_group.add_argument(
"--followers",
action="store_const",
const="followers",
dest="data_type",
help="followers",
)
user_group.add_argument(
"--following",
action="store_const",
const="following",
dest="data_type",
help="following",
)
user_group.add_argument(
"--orgs",
action="store_const",
const="orgs",
dest="data_type",
help="organisations",
)
user_group.add_argument(
"--gists", action="store_const", const="gists", dest="data_type", help="gists"
)
user_group.add_argument(
"--events",
action="store_const",
const="events",
dest="data_type",
help="events",
)
user_group.add_argument(
"--received-events",
action="store_const",
const="received_events",
dest="data_type",
help="received events",
)
user_parser.set_defaults(data_type="profile")
# Repo command
repo_parser = subparsers.add_parser("repo", help="get repository data")
repo_parser.add_argument("repository", help="repository (owner/name)")
repo_group = repo_parser.add_mutually_exclusive_group()
repo_group.add_argument(
"--profile",
action="store_const",
const="profile",
dest="data_type",
help="repo data (default)",
)
repo_group.add_argument(
"--forks", action="store_const", const="forks", dest="data_type", help="forks"
)
repo_group.add_argument(
"--issue-events",
action="store_const",
const="issue_events",
dest="data_type",
help="issue events",
)
repo_group.add_argument(
"--events",
action="store_const",
const="events",
dest="data_type",
help="events",
)
repo_group.add_argument(
"--assignees",
action="store_const",
const="assignees",
dest="data_type",
help="assignees",
)
repo_group.add_argument(
"--branches",
action="store_const",
const="branches",
dest="data_type",
help="branches",
)
repo_group.add_argument(
"--tags", action="store_const", const="tags", dest="data_type", help="tags"
)
repo_group.add_argument(
"--languages",
action="store_const",
const="languages",
dest="data_type",
help="languages",
)
repo_group.add_argument(
"--stargazers",
action="store_const",
const="stargazers",
dest="data_type",
help="stargazers",
)
repo_group.add_argument(
"--subscribers",
action="store_const",
const="subscribers",
dest="data_type",
help="subscribers",
)
repo_group.add_argument(
"--commits",
action="store_const",
const="commits",
dest="data_type",
help="commits",
)
repo_group.add_argument(
"--comments",
action="store_const",
const="comments",
dest="data_type",
help="comments",
)
repo_group.add_argument(
"--issues",
action="store_const",
const="issues",
dest="data_type",
help="issues",
)
repo_group.add_argument(
"--releases",
action="store_const",
const="releases",
dest="data_type",
help="releases",
)
repo_group.add_argument(
"--deployments",
action="store_const",
const="deployments",
dest="data_type",
help="deployments",
)
repo_group.add_argument(
"--labels",
action="store_const",
const="labels",
dest="data_type",
help="labels",
)
repo_parser.set_defaults(data_type="profile")
# Org command
org_parser = subparsers.add_parser("org", help="get organisation data")
org_parser.add_argument("name", help="organisation name")
org_group = org_parser.add_mutually_exclusive_group()
org_group.add_argument(
"--profile",
action="store_const",
const="profile",
dest="data_type",
help="profile data (default)",
)
org_group.add_argument(
"--repos",
action="store_const",
const="repos",
dest="data_type",
help="repositories",
)
org_group.add_argument(
"--events",
action="store_const",
const="events",
dest="data_type",
help="events",
)
org_group.add_argument(
"--hooks",
action="store_const",
const="hooks",
dest="data_type",
help="webhooks",
)
org_group.add_argument(
"--issues",
action="store_const",
const="issues",
dest="data_type",
help="issues",
)
org_group.add_argument(
"--members",
action="store_const",
const="members",
dest="data_type",
help="members",
)
org_parser.set_defaults(data_type="profile")
# Search command
search_parser = subparsers.add_parser("search", help="search GitHub")
search_parser.add_argument("query", help="search query")
search_group = search_parser.add_mutually_exclusive_group()
search_group.add_argument(
"--repos",
action="store_const",
const="repos",
dest="search_type",
help="search repositories (default)",
)
search_group.add_argument(
"--users",
action="store_const",
const="users",
dest="search_type",
help="search users",
)
search_group.add_argument(
"--commits",
action="store_const",
const="commits",
dest="search_type",
help="search commits",
)
search_group.add_argument(
"--issues",
action="store_const",
const="issues",
dest="search_type",
help="search issues",
)
search_group.add_argument(
"--topics",
action="store_const",
const="topics",
dest="search_type",
help="search topics",
)
search_parser.set_defaults(search_type="repos")
return parser
def output(
data: t.Union[dict, list],
as_json: bool,
source: str,
data_type: str,
export_dir: t.Optional[str] = None,
):
"""
Output data in the appropriate format.
:param data: Data to output.
:param as_json: Output as JSON.
:param source: Source identifier.
:param data_type: Type of data.
:param export_dir: Export directory.
"""
if not data:
console.print(f"[yellow]No {data_type} data found for '{source}'[/yellow]")
return
if export_dir:
export_response(
data=data,
data_type=data_type,
source=source,
file_formats=["json"],
output_dir=export_dir,
)
elif as_json:
print(json.dumps(data, indent=2, ensure_ascii=False))
else:
preview_response(data=data, source=source, _type=data_type)
def run_cli(args: argparse.Namespace):
"""Run the CLI."""
ascii_banner(text=__pkg__)
try:
with console.status("Initialising…") as status:
check_updates(is_cli=True, status=status)
if args.command == "user":
user = User(name=args.username)
status.update(f"[dim]Validating user ({args.username})…[/dim]")
exists, _ = user.exists()
if not exists:
console.print(f"[red]User '{args.username}' not found[/red]")
sys.exit(1)
method = getattr(user, args.data_type)
status.update(
f"[dim]Getting {args.data_type} from {args.username}…[/dim]"
)
data = (
method()
if args.data_type == "profile"
else method(page=args.page, per_page=min(args.per_page, 100))
)
output(
data=data,
as_json=args.json,
source=args.username,
data_type=args.data_type,
export_dir=args.dir,
)
elif args.command == "repo":
if "/" not in args.repository:
console.print(
"[red]Repository must be in 'owner/name' format[/red]"
)
sys.exit(1)
owner, name = args.repository.split("/", 1)
repo = Repo(name=name, owner=owner)
status.update(f"[dim]Validating repo ({args.repository})…[/dim]")
exists, _ = repo.exists()
if not exists:
console.print(
f"[red]Repository '{args.repository}' not found[/red]"
)
sys.exit(1)
method = getattr(repo, args.data_type)
status.update(
f"[dim]Getting {args.data_type} from {args.repository}…[/dim]"
)
data = (
method()
if args.data_type in ("profile", "languages")
else method(page=args.page, per_page=min(args.per_page, 100))
)
output(
data=data,
as_json=args.json,
source=args.repository,
data_type=args.data_type,
export_dir=args.dir,
)
elif args.command == "org":
org = Org(name=args.name)
status.update(f"[dim]Validating org ({args.name})…[/dim]")
exists, _ = org.exists()
if not exists:
console.print(f"[red]Organisation '{args.name}' not found[/red]")
sys.exit(1)
method = getattr(org, args.data_type)
status.update(f"[dim]Getting {args.data_type} from {args.name}…[/dim]")
data = (
method()
if args.data_type == "profile"
else method(page=args.page, per_page=min(args.per_page, 100))
)
output(
data=data,
as_json=args.json,
source=args.name,
data_type=args.data_type,
export_dir=args.dir,
)
elif args.command == "search":
search = Search(
query=args.query,
page=args.page,
per_page=min(args.per_page, 100),
)
method = getattr(search, args.search_type)
status.update(
f"[dim]Searching {args.search_type} for '{args.query}'…[/dim]"
)
result = method()
data = (
result.get("items", result) if isinstance(result, dict) else result
)
output(
data=data,
as_json=args.json,
source=args.query,
data_type=args.search_type,
export_dir=args.dir,
)
except KeyboardInterrupt:
console.print("\n[dim]Cancelled[/dim]")
sys.exit(130)

View File

@@ -9,15 +9,14 @@ from pathlib import Path
import pyfiglet import pyfiglet
from prompt_toolkit.shortcuts import message_dialog from prompt_toolkit.shortcuts import message_dialog
from rich.console import Console from rich.console import Console
from rich.status import Status
from rich.text import Text from rich.text import Text
from rich.tree import Tree from rich.tree import Tree
from update_checker import UpdateChecker from update_checker import UpdateChecker
from . import __pkg__, __version__ from ..meta import __pkg__, __version__
__all__ = [ __all__ = [
"__pkg__",
"__version__",
"console", "console",
"preview_response", "preview_response",
"export_response", "export_response",
@@ -31,6 +30,14 @@ console = Console(log_time=False)
def preview_response(data: t.Union[dict, list], source: str, _type: str): def preview_response(data: t.Union[dict, list], source: str, _type: str):
"""
Display a preview of response data in a tree structure.
:param data: The data to preview (dict or list).
:param source: The source identifier of the data.
:param _type: The type of data being previewed.
"""
if isinstance(data, dict): if isinstance(data, dict):
tree = Tree( tree = Tree(
label=f"\n[bold]{data.get('name') or data.get('login') or data.get('id') or 'Data'}[/bold]", label=f"\n[bold]{data.get('name') or data.get('login') or data.get('id') or 'Data'}[/bold]",
@@ -44,7 +51,7 @@ def preview_response(data: t.Union[dict, list], source: str, _type: str):
elif isinstance(data, list): elif isinstance(data, list):
preview_data = data[:5] preview_data = data[:5]
tree = Tree( tree = Tree(
label=f"\nPreview: First {len(preview_data)} of {len(data)} {_type} from {source}", label=f"\n[bold]First {len(preview_data)} of {len(data)} {_type} for '{source}'[/bold]",
guide_style="#444444", guide_style="#444444",
highlight=True, highlight=True,
) )
@@ -55,6 +62,7 @@ def preview_response(data: t.Union[dict, list], source: str, _type: str):
item.get("full_name") item.get("full_name")
or item.get("name") or item.get("name")
or item.get("login") or item.get("login")
or item.get("type")
or item.get("id") or item.get("id")
or "Item" or "Item"
) )
@@ -74,7 +82,17 @@ def export_response(
file_formats: list, file_formats: list,
output_dir: str = "../exports", output_dir: str = "../exports",
): ):
"""Export data to selected formats using built-in libraries.""" """
Export response data to one or more file formats.
:param data: The data to export (dict or list).
:param data_type: The type of data being exported.
:param source: The source identifier of the data.
:param file_formats: List of file formats to export to (json, csv, html).
:param output_dir: Directory path where exported files will be saved.
:return: List of exported file paths.
"""
# Create output directory if it doesn't exist # Create output directory if it doesn't exist
output_dir = Path(output_dir) output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True) output_dir.mkdir(exist_ok=True)
@@ -165,6 +183,14 @@ def export_response(
def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree: def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree:
"""
Recursively populate a Rich Tree with data.
:param tree: The Tree object to populate.
:param data: The data to add to the tree (dict or list).
:return: The populated Tree object.
"""
if isinstance(data, dict): if isinstance(data, dict):
for key, value in data.items(): for key, value in data.items():
if isinstance(value, dict) or isinstance(value, list): if isinstance(value, dict) or isinstance(value, list):
@@ -186,15 +212,28 @@ def fill_tree(tree: Tree, data: t.Union[dict, list]) -> Tree:
return tree return tree
def check_updates(): def check_updates(is_cli: bool = False, status: t.Optional[Status] = None):
with console.status("[dim]Checking for updates...[/dim]") as status: """
checker = UpdateChecker() Check for available package updates and display the result.
result = checker.check(__pkg__, __version__)
if result: :param status: The rich.status object for showing a live status.
status.stop() :param is_cli: Whether we're running as a CLI.
message_dialog(title="Update Available", text=result).run() """
if isinstance(status, Status):
status.update("[dim]Checking for updates[/dim]…")
checker = UpdateChecker()
result = checker.check(__pkg__, __version__)
if result is not None:
if not is_cli:
if isinstance(status, Status):
status.stop()
message_dialog(title="Update Available", text=str(result)).run()
else: else:
status.stop() console.print(result)
else:
if not is_cli:
message_dialog( message_dialog(
title="Up to Date", title="Up to Date",
text=f"You're running the current version, {__version__}", text=f"You're running the current version, {__version__}",
@@ -207,6 +246,10 @@ def clear_screen():
def ascii_banner(text: str): def ascii_banner(text: str):
"""Display a colourful ASCII art banner with gradient styling.
:param text: The text to convert to ASCII art.
"""
clear_screen() clear_screen()
ascii_text = pyfiglet.figlet_format(text=text, font="chunky") ascii_text = pyfiglet.figlet_format(text=text, font="chunky")
@@ -224,6 +267,12 @@ def ascii_banner(text: str):
def set_menu_title(menu_type: t.Literal["home", "user", "org", "repo", "search"]): def set_menu_title(menu_type: t.Literal["home", "user", "org", "repo", "search"]):
"""
Set the terminal window title based on the current menu.
:param menu_type: The type of menu being displayed.
"""
title: str = __pkg__.title() title: str = __pkg__.title()
title += f" | {menu_type.title()}" title += f" | {menu_type.title()}"
console.set_window_title(title=title) console.set_window_title(title=title)

15
src/octosuite/app/main.py Normal file
View File

@@ -0,0 +1,15 @@
from .cli import run_cli
from .cli.main import arg_parser
from .tui import run_tui
def start_app():
parser = arg_parser()
args = parser.parse_args()
if args.tui:
run_tui()
if args.command:
run_cli(args=args)
else:
parser.print_usage()

View File

@@ -0,0 +1,10 @@
from .menus import Menus
__all__ = ["run_tui", "Menus"]
def run_tui():
"""Run the interactive TUI."""
menu = Menus()
menu.main()

View File

@@ -26,20 +26,56 @@ __all__ = ["Dialogs"]
class Dialogs: class Dialogs:
def __init__(self): ... """Provides interactive dialogue boxes for user confirmations and information display."""
def __init__(self):
"""Initialise the Dialogs class."""
...
@staticmethod @staticmethod
def quit() -> bool: def _boolean(title: str, text: str) -> bool:
"""
Display a Yes/No dialogue and return the user's choice.
:param title: The title of the dialogue box.
:param text: The message text to display.
:return: True if user selects Yes, False if No or cancelled.
"""
try: try:
result = button_dialog( result = button_dialog(
title="Quit", title=title,
text="This will close the session, continue?", text=text,
buttons=[("Yes", True), ("No", False)], buttons=[("Yes", True), ("No", False)],
).run() ).run()
return result if result is not None else False return result if result is not None else False
except KeyboardInterrupt: except KeyboardInterrupt:
return True return True
def quit(self) -> bool:
"""
Display a confirmation dialogue for quitting the application.
:return: True if user confirms quit, False otherwise.
"""
return self._boolean(
title="Quit", text="This will close the session, continue?"
)
def clear_cache(self) -> bool:
"""
Display a confirmation dialogue for clearing the cache.
:return: True if user confirms cache clearing, False otherwise.
"""
return self._boolean(
title="Clear Cache",
text="This will clear all octosuite caches, continue?",
)
@staticmethod @staticmethod
def license(): def license():
"""Display the MIT license notice in a dialogue box."""
message_dialog(title="MIT License", text=LICENSE_NOTICE).run() message_dialog(title="MIT License", text=LICENSE_NOTICE).run()

View File

@@ -5,11 +5,17 @@ import questionary as q
from questionary import Style from questionary import Style
from rich.status import Status from rich.status import Status
from octosuite.api.cache import cache
from octosuite.api.models import User, Repo, Org, Search
from octosuite.app.lib import (
check_updates,
preview_response,
export_response,
set_menu_title,
)
from octosuite.app.lib import console, clear_screen, ascii_banner
from .dialogs import Dialogs from .dialogs import Dialogs
from .prompts import Prompts from .prompts import Prompts
from ..core.models import User, Repo, Org, Search
from ..lib import check_updates, preview_response, export_response, set_menu_title
from ..lib import console, clear_screen, ascii_banner
CUSTOM_STYLE = Style( CUSTOM_STYLE = Style(
[ [
@@ -20,7 +26,6 @@ CUSTOM_STYLE = Style(
INSTRUCTIONS = "↑↓ [move] • ⮠ [select]" INSTRUCTIONS = "↑↓ [move] • ⮠ [select]"
EXPORT_INSTRUCTIONS = "↑↓ [move] • ⮠ [confirm] • spacebar [check]" EXPORT_INSTRUCTIONS = "↑↓ [move] • ⮠ [confirm] • spacebar [check]"
POINTER: str = "🖝 "
dialogs = Dialogs() dialogs = Dialogs()
prompts = Prompts() prompts = Prompts()
@@ -28,8 +33,16 @@ prompts = Prompts()
__all__ = ["Menus"] __all__ = ["Menus"]
class Menus: class BaseMenu:
def __init__(self): """Base class providing common menu functionality and response handling for GitHub data queries."""
def __init__(self, main_menu: t.Callable):
"""
Initialise the BaseMenu with pagination and search method configurations.
:param main_menu: Callable reference to the main menu function.
"""
# Define which methods require pagination # Define which methods require pagination
self.paginated_methods = { self.paginated_methods = {
"repos", "repos",
@@ -64,15 +77,91 @@ class Menus:
# Search methods (all require pagination) # Search methods (all require pagination)
self.search_methods = {"repos", "users", "commits", "issues", "topics"} self.search_methods = {"repos", "users", "commits", "issues", "topics"}
self.mode_handlers = { self.main_menu = main_menu
"user": self.user,
"repo": self.repo, @staticmethod
"org": self.org, def target_validator(
"search": self.search, identifier: str,
} instance: t.Union[User, Repo, Org],
callback: t.Callable,
*callback_args,
) -> bool:
"""
Validate whether a GitHub entity exists.
:param identifier: Name or identifier of the entity to validate.
:param instance: The instance with an exists() method (User, Repo, or Org).
:param callback: Function to call if validation fails.
:param callback_args: Arguments to pass to the callback function.
:return: True if the entity exists, False otherwise.
"""
if isinstance(instance, User):
target_type = "user"
elif isinstance(instance, Repo):
target_type = "repo"
else:
target_type = "org"
with Status(
f"[dim]Validating {target_type} ({identifier})[/dim]…",
console=console,
) as status:
exists, response = instance.exists()
if not exists:
status.stop()
if response and "message" in response:
console.print(
f"[bold][yellow]✘[/yellow] {response['message']}[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue …")
callback(*callback_args)
return False
return True
def execute_and_handle_response(
self,
instance: t.Union[User, Repo, Org, Search],
method_name: str,
target_type: t.Literal["user", "repo", "org"],
source: str,
):
"""
Execute a method on an instance and handle the resulting data.
:param instance: The instance to execute the method on.
:param method_name: Name of the method to execute.
:param target_type: Type of entity ("user", "repo", or "org").
:param source: Source identifier for response handling.
"""
valid_methods = self.paginated_methods | self.non_paginated_methods
if method_name in valid_methods:
with Status(
status=f"[dim]Initialising {target_type} {method_name}[/dim]…",
console=console,
) as status:
data = self.execute_selection(
source=source,
instance=instance,
method_name=method_name,
status=status,
)
status.stop()
self.response_handler(data=data, data_type=method_name, source=source)
def execute_selection(self, status: Status, **kwargs):
"""
Execute a method on an instance with pagination if required.
:param status: Rich Status object for displaying progress.
:param kwargs: Must include 'instance', 'method_name', and 'source'.
:return: The data returned by the executed method.
"""
def _execute_selection(self, status: Status, **kwargs):
"""Execute a method on an instance, prompting for pagination if needed."""
instance = kwargs.get("instance") instance = kwargs.get("instance")
method_name = kwargs.get("method_name") method_name = kwargs.get("method_name")
source = kwargs.get("source") source = kwargs.get("source")
@@ -84,13 +173,113 @@ class Menus:
prompts.pagination_params() if method_name in self.paginated_methods else {} prompts.pagination_params() if method_name in self.paginated_methods else {}
) )
status.start() status.start()
status.update(f"[dim]Getting {method_name} from {source}...[/dim]") status.update(f"[dim]Getting {method_name} from {source}[/dim]")
return method(**params) return method(**params)
def _navigation(self, option: str, callback: t.Callable, *callback_args): def response_handler(self, data: t.Union[dict, list], data_type: str, source: str):
"""Handle navigation options (back, quit, change settings).""" """
Handle retrieved data by previewing and offering export options.
:param data: The data to handle (dict or list).
:param data_type: Type of data retrieved.
:param source: Source identifier of the data.
"""
try:
if not data:
console.print(
f"[bold][yellow]✘[/yellow] No data found for '{source}'[/bold]"
)
else:
preview_response(data=data, source=source, _type=data_type)
export_choice = q.select(
"What would you like to do?",
choices=[
q.Choice(
title="Export",
value="export",
description="Export the data",
shortcut_key="e",
),
q.Choice(
title="Skip",
value="skip",
description="Do nothing, and go back to previous menu",
shortcut_key="x",
),
q.Choice(
title="Quit",
value="quit",
description="Close this session",
),
],
style=CUSTOM_STYLE,
instruction=INSTRUCTIONS,
).ask()
if export_choice == "skip":
return
if export_choice == "quit":
if dialogs.quit():
sys.exit()
else:
clear_screen()
self.response_handler(
data=data,
data_type=data_type,
source=source,
)
return
file_formats = q.checkbox(
"Select export format(s)",
choices=[
q.Choice(
title="JSON",
value="json",
description="Export as JSON file",
),
q.Choice(
title="CSV",
value="csv",
description="Export as CSV file",
),
q.Choice(
title="HTML",
value="html",
description="Export as HTML table",
),
],
style=CUSTOM_STYLE,
instruction=EXPORT_INSTRUCTIONS,
validate=lambda x: len(x) > 0
or "Please select at least one format",
).ask()
export_response(
data=data,
data_type=data_type,
source=source,
file_formats=file_formats,
)
console.input(" Press [bold]ENTER[/bold] to continue …")
except KeyboardInterrupt:
console.print("\nExport cancelled")
def navigation_handler(self, option: str, callback: t.Callable, *callback_args):
"""
Handle common navigation options across menus.
:param option: The selected navigation option.
:param callback: Function to call for certain navigation actions.
:param callback_args: Arguments to pass to the callback function.
:return: True if navigation was handled, False otherwise.
"""
navigation_handlers = { navigation_handlers = {
"back": lambda: self.main(), "back": lambda: self.main_menu(),
"quit": lambda: ( "quit": lambda: (
sys.exit() if dialogs.quit() else callback(*callback_args) sys.exit() if dialogs.quit() else callback(*callback_args)
), ),
@@ -102,83 +291,25 @@ class Menus:
return True return True
return False return False
def response_handling(self, data: t.Union[dict, list], data_type: str, source: str):
"""Export data to file in user-selected format(s)."""
preview_response(data=data, source=source, _type=data_type)
try: class Menus(BaseMenu):
export_choice = q.select( """Main menu system providing interactive interfaces for GitHub data queries."""
"What would you like to do?",
choices=[
q.Choice(
title="Export",
value="export",
description="Export the data",
shortcut_key="e",
),
q.Choice(
title="Skip",
value="skip",
description="Do nothing, and go back to previous menu",
shortcut_key="x",
),
q.Choice(
title="Quit",
value="quit",
description="Close this session",
),
],
pointer=POINTER,
style=CUSTOM_STYLE,
instruction=INSTRUCTIONS,
).ask()
if export_choice == "skip": def __init__(self):
return """Initialise the Menus class with mode handlers for different query types."""
if export_choice == "quit": super().__init__(main_menu=self.main)
if dialogs.quit():
sys.exit()
else:
self.response_handling(
data=data, data_type=data_type, source=source
)
return
file_formats = q.checkbox( self.mode_handlers = {
"Select export format(s)", "user": self.user,
choices=[ "repo": self.repo,
q.Choice( "org": self.org,
title="JSON", "search": self.search,
value="json", }
description="Export as JSON file",
),
q.Choice(
title="CSV",
value="csv",
description="Export as CSV file",
),
q.Choice(
title="HTML",
value="html",
description="Export as HTML table",
),
],
pointer=POINTER,
style=CUSTOM_STYLE,
instruction=EXPORT_INSTRUCTIONS,
validate=lambda x: len(x) > 0 or "Please select at least one format",
).ask()
export_response(
data=data, data_type=data_type, source=source, file_formats=file_formats
)
except KeyboardInterrupt:
print("\nExport cancelled")
def main(self): def main(self):
"""Main menu to select mode.""" """Display the main menu for selecting query mode."""
set_menu_title(menu_type="home") set_menu_title(menu_type="home")
clear_screen() clear_screen()
try: try:
@@ -207,6 +338,11 @@ class Menus:
value="search", value="search",
description="Search GitHub", description="Search GitHub",
), ),
q.Choice(
title="Clear cache",
value="clear_cache",
description="Clear all in-memory cache from Octosuite",
),
q.Choice( q.Choice(
title="Updates", title="Updates",
value="updates", value="updates",
@@ -223,7 +359,6 @@ class Menus:
description="Close this session", description="Close this session",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
).ask() ).ask()
@@ -239,6 +374,10 @@ class Menus:
elif action == "updates": elif action == "updates":
check_updates() check_updates()
self.main() self.main()
elif action == "clear_cache":
if dialogs.clear_cache():
cache.clear()
self.main()
elif action is None: elif action is None:
sys.exit() sys.exit()
else: else:
@@ -251,7 +390,12 @@ class Menus:
sys.exit() sys.exit()
def search(self, query: t.Optional[str] = None): def search(self, query: t.Optional[str] = None):
"""Search menu for querying GitHub.""" """
Display the search menu for querying GitHub content.
:param query: Optional search query string. If None, prompts user for input.
"""
set_menu_title(menu_type="search") set_menu_title(menu_type="search")
clear_screen() clear_screen()
if query is None: if query is None:
@@ -262,9 +406,7 @@ class Menus:
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
) )
clear_screen()
ascii_banner(text=query) ascii_banner(text=query)
option = q.select( option = q.select(
"What would you like to do/search?", "What would you like to do/search?",
choices=[ choices=[
@@ -302,7 +444,7 @@ class Menus:
q.Choice( q.Choice(
title="Go Back", title="Go Back",
value="back", value="back",
description="Go back to ewous menu", description="Go back to previous menu",
shortcut_key="b", shortcut_key="b",
), ),
q.Choice( q.Choice(
@@ -312,7 +454,6 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
@@ -320,9 +461,10 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.search, query): if self.navigation_handler(option, self.search, query):
return return
# Handle change query # Handle change query
@@ -332,11 +474,9 @@ class Menus:
# Execute search if it's a valid method # Execute search if it's a valid method
if option in self.search_methods: if option in self.search_methods:
with console.status( with Status(
status=f"[dim]Initialising {option} search...[/dim]" status=f"[dim]Initialising {option} search[/dim]", console=console
) as status: ) as status:
# Get pagination params
status.stop() status.stop()
params = prompts.pagination_params() params = prompts.pagination_params()
status.start() status.start()
@@ -349,22 +489,29 @@ class Menus:
) )
method = getattr(search, option) method = getattr(search, option)
status.update(f"[dim]Searching {option} for {query}...[/dim]") status.update(f"[dim]Searching {option} for {query}[/dim]")
data = method() data = method()
if data: if data:
items = data.get("items") items = data.get("items")
status.stop() status.stop()
self.response_handling( self.response_handler(
data=items if items is not None else data, data=items if items is not None else data,
data_type=option, data_type=option,
source=query, source=query,
) )
# After handling response, show menu again
self.search(query=query) self.search(query=query)
def user(self, username: t.Optional[str] = None): def user(self, username: t.Optional[str] = None, is_validated: bool = False):
"""User menu for querying user data.""" """
Display the user menu for querying GitHub user data.
:param username: Optional GitHub username. If None, prompts user for input.
:param is_validated: Whether the username has already been validated.
"""
set_menu_title(menu_type="user") set_menu_title(menu_type="user")
clear_screen() clear_screen()
if username is None: if username is None:
@@ -376,9 +523,16 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() user = User(name=username)
ascii_banner(text=username)
if not self.target_validator(
identifier=username,
instance=user,
callback=self.user,
):
return
ascii_banner(text=username)
option = q.select( option = q.select(
"What would you like to do/get?", "What would you like to do/get?",
choices=[ choices=[
@@ -451,7 +605,6 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
@@ -459,50 +612,40 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.user, username): elif self.navigation_handler(option, self.user, username):
return return
# Handle change username # Handle change username
if option == "change_username": elif option == "change_username":
self.user() self.user()
return return
else:
# Execute action and handle response
self.execute_and_handle_response(
instance=user,
method_name=option,
target_type="user",
source=username,
)
# Execute action if it's a valid method # After handling response, show menu again WITHOUT re-validating
valid_methods = self.paginated_methods | self.non_paginated_methods self.user(username=username, is_validated=True)
if option in valid_methods:
with Status(
status=f"[dim]Initialising user {option}...[/dim]",
console=console,
) as status:
user = User(name=username)
status.update(f"[dim]Validating user's ({username}) existence...[/dim]") def repo(
if user.exists(): self,
console.print( name: t.Optional[str] = None,
f"[bold][green]✔[/green] User ({username}) exists on GitHub[/bold]" owner: t.Optional[str] = None,
) ):
data = self._execute_selection( """
source=username, Display the repository menu for querying GitHub repository data.
instance=user,
method_name=option,
status=status,
)
status.stop() :param name: Optional repository name. If None, prompts user for input.
self.response_handling(data=data, data_type=option, source=username) :param owner: Optional repository owner. If None, prompts user for input.
else: """
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] User ({username}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.user()
self.user(username=username)
def repo(self, name: t.Optional[str] = None, owner: t.Optional[str] = None):
"""Repository menu for querying repo data."""
set_menu_title(menu_type="repo") set_menu_title(menu_type="repo")
clear_screen() clear_screen()
if name is None or owner is None: if name is None or owner is None:
@@ -521,9 +664,18 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() repo = Repo(name=name, owner=owner)
ascii_banner(text=f"{owner}/{name}") source = f"{owner}/{name}"
# Only validate if not already validated
if not self.target_validator(
identifier=source,
instance=repo,
callback=self.repo,
):
return
ascii_banner(text=source)
option = q.select( option = q.select(
"What would you like to do/get?", "What would you like to do/get?",
choices=[ choices=[
@@ -635,19 +787,11 @@ class Menus:
shortcut_key="q", shortcut_key="q",
), ),
], ],
pointer=POINTER,
style=CUSTOM_STYLE, style=CUSTOM_STYLE,
instruction=INSTRUCTIONS, instruction=INSTRUCTIONS,
use_shortcuts=True, use_shortcuts=True,
).ask() ).ask()
if option is None:
self.main()
# Handle navigation
if self._navigation(option, self.repo, name, owner):
return
# Handle change options # Handle change options
change_handlers = { change_handlers = {
"change_repo_name": lambda: self.repo(owner=owner), "change_repo_name": lambda: self.repo(owner=owner),
@@ -655,45 +799,36 @@ class Menus:
"change_both": lambda: self.repo(), "change_both": lambda: self.repo(),
} }
if option in change_handlers: if option is None:
change_handlers[option]() self.main()
return return
# Execute action if it's a valid method # Handle navigation
valid_methods = self.paginated_methods | self.non_paginated_methods elif self.navigation_handler(option, self.repo, name, owner):
if option in valid_methods: return
source = f"{owner}/{name}"
with Status(
status=f"[dim]Initialising repository {option}...[/dim]",
console=console,
) as status:
repo = Repo(name=name, owner=owner)
status.update( elif option in change_handlers:
f"[dim]Validating repository's ({source}) existence...[/dim]" change_handlers[option]()
) return
if repo.exists(): else:
console.print( # Execute action and handle response
f"[bold][green]✔[/green] Repository ({source}) exists on GitHub[/bold]" self.execute_and_handle_response(
) instance=repo,
data = self._execute_selection( method_name=option,
source=source, instance=repo, method_name=option, status=status target_type="repo",
) source=source,
)
status.stop() # After handling response, show menu again WITHOUT re-validating
self.response_handling(data=data, data_type=option, source=source) self.repo(name=name, owner=owner)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Repository ({source}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.repo()
self.repo(name=name, owner=owner)
def org(self, name: t.Optional[str] = None): def org(self, name: t.Optional[str] = None):
"""Organisation menu for querying org data.""" """
Display the organisation menu for querying GitHub organisation data.
:param name: Optional organisation name. If None, prompts user for input.
"""
set_menu_title(menu_type="org") set_menu_title(menu_type="org")
clear_screen() clear_screen()
if name is None: if name is None:
@@ -705,8 +840,14 @@ class Menus:
qmark="@", qmark="@",
) )
clear_screen() org = Org(name=name)
ascii_banner(text=name)
if not self.target_validator(
identifier=name,
instance=org,
callback=self.org,
):
return
option = q.select( option = q.select(
"What would you like to do?", "What would you like to do?",
@@ -766,45 +907,22 @@ class Menus:
if option is None: if option is None:
self.main() self.main()
return
# Handle navigation # Handle navigation
if self._navigation(option, self.org, name): elif self.navigation_handler(option, self.org, name):
return return
# Handle change org # Handle change org
if option == "change_org": elif option == "change_org":
self.org() self.org()
return return
# Execute action if it's a valid method else:
valid_methods = self.paginated_methods | self.non_paginated_methods # Execute action and handle response
if option in valid_methods: self.execute_and_handle_response(
with Status( instance=org, method_name=option, target_type="org", source=name
status=f"[dim]Initialising organisation {option}...[/dim]", )
console=console,
) as status:
org = Org(name=name)
status.update( # After handling response, show menu again WITHOUT re-validating
f"[dim]Validating organisation's ({name}) existence...[/dim]" self.org(name=name)
)
if org.exists():
console.print(
f"[bold][green]✔[/green] Organisation ({name}) exists on GitHub[/bold]"
)
data = self._execute_selection(
source=name, instance=org, method_name=option, status=status
)
status.stop()
self.response_handling(data=data, data_type=option, source=name)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Organisation ({name}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.org()
self.org(name=name)

View File

@@ -7,7 +7,10 @@ __all__ = ["Prompts"]
class Prompts: class Prompts:
"""Provides interactive prompts for user input collection."""
def __init__(self): def __init__(self):
"""Initialise the Prompts class."""
pass pass
@staticmethod @staticmethod
@@ -17,17 +20,40 @@ class Prompts:
style: t.Optional[Style] = None, style: t.Optional[Style] = None,
qmark: t.Optional[str] = "?", qmark: t.Optional[str] = "?",
) -> str: ) -> str:
return q.text( """
Display a text input prompt and validate non-empty input.
:param message: The prompt message to display.
:param instruction: Optional instruction text to guide the user.
:param style: Optional questionary Style object for custom styling.
:param qmark: Optional custom question mark character or string.
:return: The user's input string.
:raises KeyboardInterrupt: If the user cancels the prompt with CTRL+C.
"""
result = q.text(
message=message, message=message,
instruction=instruction, instruction=instruction,
style=style, style=style,
qmark=qmark, qmark=qmark,
validate=lambda text: len(text.strip()) > 0 or "Input cannot be empty", validate=lambda text: len(text.strip()) > 0
or None
or "Input cannot be empty",
).ask() ).ask()
if result is None:
raise KeyboardInterrupt
return result
@staticmethod @staticmethod
def pagination_params() -> dict: def pagination_params() -> dict:
"""Prompt user for pagination parameters.""" """
Prompt the user for pagination parameters.
:return: Dictionary containing 'page' and 'per_page' integer values.
"""
try: try:
page = q.text(message="Page", default="1", qmark="n").ask() page = q.text(message="Page", default="1", qmark="n").ask()
per_page = q.text( per_page = q.text(

View File

@@ -1,79 +0,0 @@
import re
import sys
import typing as t
import requests
from requests import Response
from ..lib import __version__
BASE_URL = "https://api.github.com"
__all__ = ["BASE_URL", "GitHub"]
class GitHub:
def __init__(
self,
user_agent: str = (
f"octosuite/{__version__} "
f"(Python {sys.version.split()[0]}; "
f"https://github.com/bellingcat/octosuite) requests/{requests.__version__}"
),
):
self.user_agent = user_agent
def get(
self, url: str, params: t.Optional[dict] = None, return_response: bool = False
) -> t.Union[dict, list, Response]:
response = requests.get(
url=url, params=params, headers={"User-Agent": self.user_agent}
)
if return_response:
return response
if response.status_code == 200:
return self._sanitise_response(response=response.json())
return []
def is_valid_entity(
self, entity_type: t.Literal["user", "org", "repo"], **kwargs
) -> bool:
"""Validate if a GitHub entity exists."""
try:
if entity_type == "user":
url = f"https://api.github.com/users/{kwargs.get('username')}"
elif entity_type == "org":
url = f"https://api.github.com/orgs/{kwargs.get('username')}"
elif entity_type == "repo":
url = f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}"
else:
return True
response = self.get(url=url, return_response=True)
return response.status_code == 200
except requests.RequestException:
return False
def _sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]:
pattern = re.compile(r"https://api\.github\.com")
if isinstance(response, list):
return [self._sanitise_response(response=item) for item in response]
if isinstance(response, dict):
keys_to_remove = [
key
for key, value in response.items()
if (isinstance(value, str) and pattern.search(value)) or value is None
]
for key in keys_to_remove:
response.pop(key)
# Recursively clean nested dicts/lists
for key, value in response.items():
if isinstance(value, (dict, list)):
response[key] = self._sanitise_response(response=value)
return response

View File

@@ -1,238 +0,0 @@
from requests import exceptions
from .github import GitHub, BASE_URL
github = GitHub()
__all__ = ["User", "Org", "Repo", "Search"]
class GitHubEntity:
"""Base class for GitHub entities with common functionality."""
def __init__(self, source: str):
self.endpoint = None
self.source = source
def exists(self) -> bool:
"""Check if the entity exists on GitHub."""
try:
response = github.get(url=self.endpoint, return_response=True)
return response.status_code == 200
except exceptions.RequestException as err:
return False
class User(GitHubEntity):
def __init__(self, name: str):
super().__init__(source=name)
self.name = name
self.endpoint = f"{BASE_URL}/users/{name}"
def profile(self) -> dict:
profile = github.get(url=self.endpoint)
return profile
def repos(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos
def subscriptions(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
subscriptions = github.get(url=f"{self.endpoint}/subscriptions", params=params)
return subscriptions
def starred(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
starred = github.get(url=f"{self.endpoint}/starred", params=params)
return starred
def followers(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/followers", params=params)
return users
def following(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
users = github.get(url=f"{self.endpoint}/following", params=params)
return users
def follows(self, user: str) -> bool: ...
def orgs(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
orgs = github.get(url=f"{self.endpoint}/orgs", params=params)
return orgs
def gists(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
gists = github.get(url=f"{self.endpoint}/gists", params=params)
return gists
def events(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def received_events(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
received_events = github.get(
url=f"{self.endpoint}/received_events", params=params
)
return received_events
class Org(GitHubEntity):
def __init__(self, name: str):
super().__init__(source=name)
self.name = name
self.endpoint = f"{BASE_URL}/orgs/{name}"
def profile(self) -> dict:
profile = github.get(url=self.endpoint)
return profile
def repos(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
repos = github.get(url=f"{self.endpoint}/repos", params=params)
return repos
def events(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def hooks(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
hooks = github.get(url=f"{self.endpoint}/hooks", params=params)
return hooks
def issues(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues
def members(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
members = github.get(url=f"{self.endpoint}/members", params=params)
return members
class Repo(GitHubEntity):
def __init__(self, name: str, owner: str):
super().__init__(source=f"{owner}/{name}")
self.name = name
self.owner = owner
self.endpoint = f"{BASE_URL}/repos/{owner}/{name}"
def profile(self) -> dict:
profile = github.get(url=self.endpoint)
return profile
def forks(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
forks = github.get(url=f"{self.endpoint}/forks", params=params)
return forks
def issue_events(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
issue_events = github.get(url=f"{self.endpoint}/issue_events", params=params)
return issue_events
def events(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
events = github.get(url=f"{self.endpoint}/events", params=params)
return events
def assignees(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
assignees = github.get(url=f"{self.endpoint}/assignees", params=params)
return assignees
def branches(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
branches = github.get(url=f"{self.endpoint}/branches", params=params)
return branches
def tags(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
tags = github.get(url=f"{self.endpoint}/tags", params=params)
return tags
def languages(self) -> dict:
languages = github.get(url=f"{self.endpoint}/languages")
return languages
def stargazers(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
stargazers = github.get(url=f"{self.endpoint}/stargazers", params=params)
return stargazers
def subscribers(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
subscribers = github.get(url=f"{self.endpoint}/subscribers", params=params)
return subscribers
def commits(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
commits = github.get(url=f"{self.endpoint}/commits", params=params)
return commits
def comments(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
comments = github.get(url=f"{self.endpoint}/comments", params=params)
return comments
def contents(self, path: str) -> list:
contents = github.get(url=f"{self.endpoint}/contents/{path}")
return contents
def issues(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
issues = github.get(url=f"{self.endpoint}/issues", params=params)
return issues
def releases(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
releases = github.get(url=f"{self.endpoint}/releases", params=params)
return releases
def deployments(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
deployments = github.get(url=f"{self.endpoint}/deployments", params=params)
return deployments
def labels(self, page: int, per_page: int) -> list:
params = {"page": page, "per_page": per_page}
labels = github.get(url=f"{self.endpoint}/labels", params=params)
return labels
class Search:
def __init__(self, query: str, page: int, per_page: int):
self.query = query
self.page = page
self.per_page = per_page
self.endpoint = f"{BASE_URL}/search"
self.params = {"q": query, "page": page, "per_page": per_page}
def repos(self) -> list:
repos = github.get(url=f"{self.endpoint}/repositories", params=self.params)
return repos
def users(self) -> list:
users = github.get(url=f"{self.endpoint}/users", params=self.params)
return users
def commits(self) -> list:
commits = github.get(url=f"{self.endpoint}/commits", params=self.params)
return commits
def issues(self) -> list:
issues = github.get(url=f"{self.endpoint}/issues", params=self.params)
return issues
def topics(self) -> list:
topics = github.get(url=f"{self.endpoint}/topics", params=self.params)
return topics

4
src/octosuite/meta.py Normal file
View File

@@ -0,0 +1,4 @@
from importlib.metadata import version
__pkg__ = "octosuite"
__version__ = version(__pkg__)

46
uv.lock generated
View File

@@ -4,7 +4,7 @@ requires-python = ">=3.13"
[[package]] [[package]]
name = "black" name = "black"
version = "25.12.0" version = "26.1.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "click" }, { name = "click" },
@@ -14,19 +14,19 @@ dependencies = [
{ name = "platformdirs" }, { name = "platformdirs" },
{ name = "pytokens" }, { name = "pytokens" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/c4/d9/07b458a3f1c525ac392b5edc6b191ff140b596f9d77092429417a54e249d/black-25.12.0.tar.gz", hash = "sha256:8d3dd9cea14bff7ddc0eb243c811cdb1a011ebb4800a5f0335a01a68654796a7", size = 659264, upload-time = "2025-12-08T01:40:52.501Z" } sdist = { url = "https://files.pythonhosted.org/packages/13/88/560b11e521c522440af991d46848a2bde64b5f7202ec14e1f46f9509d328/black-26.1.0.tar.gz", hash = "sha256:d294ac3340eef9c9eb5d29288e96dc719ff269a88e27b396340459dd85da4c58", size = 658785, upload-time = "2026-01-18T04:50:11.993Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/c8/52/c551e36bc95495d2aa1a37d50566267aa47608c81a53f91daa809e03293f/black-25.12.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a05ddeb656534c3e27a05a29196c962877c83fa5503db89e68857d1161ad08a5", size = 1923809, upload-time = "2025-12-08T01:46:55.126Z" }, { url = "https://files.pythonhosted.org/packages/79/04/fa2f4784f7237279332aa735cdfd5ae2e7730db0072fb2041dadda9ae551/black-26.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ba1d768fbfb6930fc93b0ecc32a43d8861ded16f47a40f14afa9bb04ab93d304", size = 1877781, upload-time = "2026-01-18T04:59:39.054Z" },
{ url = "https://files.pythonhosted.org/packages/a0/f7/aac9b014140ee56d247e707af8db0aae2e9efc28d4a8aba92d0abd7ae9d1/black-25.12.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:9ec77439ef3e34896995503865a85732c94396edcc739f302c5673a2315e1e7f", size = 1742384, upload-time = "2025-12-08T01:49:37.022Z" }, { url = "https://files.pythonhosted.org/packages/cf/ad/5a131b01acc0e5336740a039628c0ab69d60cf09a2c87a4ec49f5826acda/black-26.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:2b807c240b64609cb0e80d2200a35b23c7df82259f80bef1b2c96eb422b4aac9", size = 1699670, upload-time = "2026-01-18T04:59:41.005Z" },
{ url = "https://files.pythonhosted.org/packages/74/98/38aaa018b2ab06a863974c12b14a6266badc192b20603a81b738c47e902e/black-25.12.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0e509c858adf63aa61d908061b52e580c40eae0dfa72415fa47ac01b12e29baf", size = 1798761, upload-time = "2025-12-08T01:46:05.386Z" }, { url = "https://files.pythonhosted.org/packages/da/7c/b05f22964316a52ab6b4265bcd52c0ad2c30d7ca6bd3d0637e438fc32d6e/black-26.1.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1de0f7d01cc894066a1153b738145b194414cc6eeaad8ef4397ac9abacf40f6b", size = 1775212, upload-time = "2026-01-18T04:59:42.545Z" },
{ url = "https://files.pythonhosted.org/packages/16/3a/a8ac542125f61574a3f015b521ca83b47321ed19bb63fe6d7560f348bfe1/black-25.12.0-cp313-cp313-win_amd64.whl", hash = "sha256:252678f07f5bac4ff0d0e9b261fbb029fa530cfa206d0a636a34ab445ef8ca9d", size = 1429180, upload-time = "2025-12-08T01:45:34.903Z" }, { url = "https://files.pythonhosted.org/packages/a6/a3/e8d1526bea0446e040193185353920a9506eab60a7d8beb062029129c7d2/black-26.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:91a68ae46bf07868963671e4d05611b179c2313301bd756a89ad4e3b3db2325b", size = 1409953, upload-time = "2026-01-18T04:59:44.357Z" },
{ url = "https://files.pythonhosted.org/packages/e6/2d/bdc466a3db9145e946762d52cd55b1385509d9f9004fec1c97bdc8debbfb/black-25.12.0-cp313-cp313-win_arm64.whl", hash = "sha256:bc5b1c09fe3c931ddd20ee548511c64ebf964ada7e6f0763d443947fd1c603ce", size = 1239350, upload-time = "2025-12-08T01:46:09.458Z" }, { url = "https://files.pythonhosted.org/packages/c7/5a/d62ebf4d8f5e3a1daa54adaab94c107b57be1b1a2f115a0249b41931e188/black-26.1.0-cp313-cp313-win_arm64.whl", hash = "sha256:be5e2fe860b9bd9edbf676d5b60a9282994c03fbbd40fe8f5e75d194f96064ca", size = 1217707, upload-time = "2026-01-18T04:59:45.719Z" },
{ url = "https://files.pythonhosted.org/packages/35/46/1d8f2542210c502e2ae1060b2e09e47af6a5e5963cb78e22ec1a11170b28/black-25.12.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:0a0953b134f9335c2434864a643c842c44fba562155c738a2a37a4d61f00cad5", size = 1917015, upload-time = "2025-12-08T01:53:27.987Z" }, { url = "https://files.pythonhosted.org/packages/6a/83/be35a175aacfce4b05584ac415fd317dd6c24e93a0af2dcedce0f686f5d8/black-26.1.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:9dc8c71656a79ca49b8d3e2ce8103210c9481c57798b48deeb3a8bb02db5f115", size = 1871864, upload-time = "2026-01-18T04:59:47.586Z" },
{ url = "https://files.pythonhosted.org/packages/41/37/68accadf977672beb8e2c64e080f568c74159c1aaa6414b4cd2aef2d7906/black-25.12.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:2355bbb6c3b76062870942d8cc450d4f8ac71f9c93c40122762c8784df49543f", size = 1741830, upload-time = "2025-12-08T01:54:36.861Z" }, { url = "https://files.pythonhosted.org/packages/a5/f5/d33696c099450b1274d925a42b7a030cd3ea1f56d72e5ca8bbed5f52759c/black-26.1.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:b22b3810451abe359a964cc88121d57f7bce482b53a066de0f1584988ca36e79", size = 1701009, upload-time = "2026-01-18T04:59:49.443Z" },
{ url = "https://files.pythonhosted.org/packages/ac/76/03608a9d8f0faad47a3af3a3c8c53af3367f6c0dd2d23a84710456c7ac56/black-25.12.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9678bd991cc793e81d19aeeae57966ee02909877cb65838ccffef24c3ebac08f", size = 1791450, upload-time = "2025-12-08T01:44:52.581Z" }, { url = "https://files.pythonhosted.org/packages/1b/87/670dd888c537acb53a863bc15abbd85b22b429237d9de1b77c0ed6b79c42/black-26.1.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:53c62883b3f999f14e5d30b5a79bd437236658ad45b2f853906c7cbe79de00af", size = 1767806, upload-time = "2026-01-18T04:59:50.769Z" },
{ url = "https://files.pythonhosted.org/packages/06/99/b2a4bd7dfaea7964974f947e1c76d6886d65fe5d24f687df2d85406b2609/black-25.12.0-cp314-cp314-win_amd64.whl", hash = "sha256:97596189949a8aad13ad12fcbb4ae89330039b96ad6742e6f6b45e75ad5cfd83", size = 1452042, upload-time = "2025-12-08T01:46:13.188Z" }, { url = "https://files.pythonhosted.org/packages/fe/9c/cd3deb79bfec5bcf30f9d2100ffeec63eecce826eb63e3961708b9431ff1/black-26.1.0-cp314-cp314-win_amd64.whl", hash = "sha256:f016baaadc423dc960cdddf9acae679e71ee02c4c341f78f3179d7e4819c095f", size = 1433217, upload-time = "2026-01-18T04:59:52.218Z" },
{ url = "https://files.pythonhosted.org/packages/b2/7c/d9825de75ae5dd7795d007681b752275ea85a1c5d83269b4b9c754c2aaab/black-25.12.0-cp314-cp314-win_arm64.whl", hash = "sha256:778285d9ea197f34704e3791ea9404cd6d07595745907dd2ce3da7a13627b29b", size = 1267446, upload-time = "2025-12-08T01:46:14.497Z" }, { url = "https://files.pythonhosted.org/packages/4e/29/f3be41a1cf502a283506f40f5d27203249d181f7a1a2abce1c6ce188035a/black-26.1.0-cp314-cp314-win_arm64.whl", hash = "sha256:66912475200b67ef5a0ab665011964bf924745103f51977a78b4fb92a9fc1bf0", size = 1245773, upload-time = "2026-01-18T04:59:54.457Z" },
{ url = "https://files.pythonhosted.org/packages/68/11/21331aed19145a952ad28fca2756a1433ee9308079bd03bd898e903a2e53/black-25.12.0-py3-none-any.whl", hash = "sha256:48ceb36c16dbc84062740049eef990bb2ce07598272e673c17d1a7720c71c828", size = 206191, upload-time = "2025-12-08T01:40:50.963Z" }, { url = "https://files.pythonhosted.org/packages/e4/3d/51bdb3ecbfadfaf825ec0c75e1de6077422b4afa2091c6c9ba34fbfc0c2d/black-26.1.0-py3-none-any.whl", hash = "sha256:1054e8e47ebd686e078c0bb0eaf31e6ce69c966058d122f2c0c950311f9f3ede", size = 204010, upload-time = "2026-01-18T04:50:09.978Z" },
] ]
[[package]] [[package]]
@@ -141,8 +141,8 @@ wheels = [
[[package]] [[package]]
name = "octosuite" name = "octosuite"
version = "4.0.0" version = "5.1.0"
source = { virtual = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "pyfiglet" }, { name = "pyfiglet" },
{ name = "questionary" }, { name = "questionary" },
@@ -157,10 +157,10 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "black", marker = "extra == 'dev'", specifier = ">=25.12.0" }, { name = "black", marker = "extra == 'dev'", specifier = ">=26.1.0" },
{ name = "pyfiglet", specifier = ">=1.0.4" }, { name = "pyfiglet", specifier = ">=1.0.4" },
{ name = "questionary", specifier = ">=2.1.1" }, { name = "questionary", specifier = ">=2.1.1" },
{ name = "rich", specifier = ">=14.2.0" }, { name = "rich", specifier = ">=14.3.3" },
{ name = "update-checker", specifier = ">=0.18.0" }, { name = "update-checker", specifier = ">=0.18.0" },
] ]
provides-extras = ["dev"] provides-extras = ["dev"]
@@ -176,11 +176,11 @@ wheels = [
[[package]] [[package]]
name = "pathspec" name = "pathspec"
version = "0.12.1" version = "1.0.3"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ca/bc/f35b8446f4531a7cb215605d100cd88b7ac6f44ab3fc94870c120ab3adbf/pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712", size = 51043, upload-time = "2023-12-10T22:30:45Z" } sdist = { url = "https://files.pythonhosted.org/packages/4c/b2/bb8e495d5262bfec41ab5cb18f522f1012933347fb5d9e62452d446baca2/pathspec-1.0.3.tar.gz", hash = "sha256:bac5cf97ae2c2876e2d25ebb15078eb04d76e4b98921ee31c6f85ade8b59444d", size = 130841, upload-time = "2026-01-09T15:46:46.009Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/20/ff623b09d963f88bfde16306a54e12ee5ea43e9b597108672ff3a408aad6/pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08", size = 31191, upload-time = "2023-12-10T22:30:43.14Z" }, { url = "https://files.pythonhosted.org/packages/32/2b/121e912bd60eebd623f873fd090de0e84f322972ab25a7f9044c056804ed/pathspec-1.0.3-py3-none-any.whl", hash = "sha256:e80767021c1cc524aa3fb14bedda9c34406591343cc42797b386ce7b9354fb6c", size = 55021, upload-time = "2026-01-09T15:46:44.652Z" },
] ]
[[package]] [[package]]
@@ -260,15 +260,15 @@ wheels = [
[[package]] [[package]]
name = "rich" name = "rich"
version = "14.2.0" version = "14.3.3"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "markdown-it-py" }, { name = "markdown-it-py" },
{ name = "pygments" }, { name = "pygments" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/fb/d2/8920e102050a0de7bfabeb4c4614a49248cf8d5d7a8d01885fbb24dc767a/rich-14.2.0.tar.gz", hash = "sha256:73ff50c7c0c1c77c8243079283f4edb376f0f6442433aecb8ce7e6d0b92d1fe4", size = 219990, upload-time = "2025-10-09T14:16:53.064Z" } sdist = { url = "https://files.pythonhosted.org/packages/b3/c6/f3b320c27991c46f43ee9d856302c70dc2d0fb2dba4842ff739d5f46b393/rich-14.3.3.tar.gz", hash = "sha256:b8daa0b9e4eef54dd8cf7c86c03713f53241884e814f4e2f5fb342fe520f639b", size = 230582, upload-time = "2026-02-19T17:23:12.474Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/25/7a/b0178788f8dc6cafce37a212c99565fa1fe7872c70c6c9c1e1a372d9d88f/rich-14.2.0-py3-none-any.whl", hash = "sha256:76bc51fe2e57d2b1be1f96c524b890b816e334ab4c1e45888799bfaab0021edd", size = 243393, upload-time = "2025-10-09T14:16:51.245Z" }, { url = "https://files.pythonhosted.org/packages/14/25/b208c5683343959b670dc001595f2f3737e051da617f66c31f7c4fa93abc/rich-14.3.3-py3-none-any.whl", hash = "sha256:793431c1f8619afa7d3b52b2cdec859562b950ea0d4b6b505397612db8d5362d", size = 310458, upload-time = "2026-02-19T17:23:13.732Z" },
] ]
[[package]] [[package]]