Merge pull request #24 from bellingcat/4.0/beta

4.0/beta
This commit is contained in:
Ritchie Mwewa
2026-01-03 23:04:21 +02:00
committed by GitHub
7 changed files with 327 additions and 164 deletions

View File

@@ -1,5 +1,6 @@
import sys
from .core.cache import cache
from .lib import console, __pkg__, __version__
from .tui.menus import Menus
@@ -11,3 +12,5 @@ def start():
menu.main()
except KeyboardInterrupt:
sys.exit()
finally:
cache.clear()

View File

@@ -0,0 +1,44 @@
import hashlib
import json
__all__ = ["cache"]
class ResponseCache:
"""Simple in-memory cache for API responses."""
def __init__(self):
self._cache = {}
@staticmethod
def _generate_key(url: str, params: dict = None) -> str:
"""Generate a unique cache key from URL and params."""
cache_data = f"{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_data.encode()).hexdigest()
def get(self, url: str, params: dict = None):
"""Get cached response."""
key = self._generate_key(url, params)
return self._cache.get(key)
def set(self, url: str, data, params: dict = None):
"""Cache a response."""
key = self._generate_key(url, params)
self._cache[key] = data
def clear(self):
"""Clear all cached responses."""
self._cache.clear()
def remove(self, url: str, params: dict = None):
"""Remove a specific cached response."""
key = self._generate_key(url, params)
self._cache.pop(key, None)
cache = ResponseCache()
def clear_cache():
"""Clear all cached API responses."""
cache.clear()

View File

@@ -5,6 +5,7 @@ import typing as t
import requests
from requests import Response
from .cache import cache
from ..lib import __version__
BASE_URL = "https://api.github.com"
@@ -22,10 +23,20 @@ class GitHub:
),
):
self.user_agent = user_agent
self.cache = cache
def get(
self, url: str, params: t.Optional[dict] = None, return_response: bool = False
self,
url: str,
params: t.Optional[dict] = None,
return_response: bool = False,
use_cache: bool = True,
) -> t.Union[dict, list, Response]:
if use_cache and not return_response:
cached = self.cache.get(url, params)
if cached is not None:
return cached
response = requests.get(
url=url, params=params, headers={"User-Agent": self.user_agent}
)
@@ -34,33 +45,51 @@ class GitHub:
return response
if response.status_code == 200:
return self._sanitise_response(response=response.json())
sanitised = self.sanitise_response(response=response.json())
# Cache the successful response
if use_cache:
self.cache.set(url, sanitised, params)
return sanitised
return []
def is_valid_entity(
self, entity_type: t.Literal["user", "org", "repo"], **kwargs
self, _type: t.Literal["user", "org", "repo"], **kwargs
) -> bool:
"""Validate if a GitHub entity exists."""
try:
if entity_type == "user":
url = f"https://api.github.com/users/{kwargs.get('username')}"
elif entity_type == "org":
url = f"https://api.github.com/orgs/{kwargs.get('username')}"
elif entity_type == "repo":
url = f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}"
else:
type_map = {
"user": f"https://api.github.com/users/{kwargs.get('username')}",
"org": f"https://api.github.com/orgs/{kwargs.get('username')}",
"repo": f"https://api.github.com/repos/{kwargs.get('repo_owner')}/{kwargs.get('repo_name')}",
}
url = type_map[_type]
# Check cache first
cached = self.cache.get(url)
if cached is not None:
return True # If cached, entity exists
response = requests.get(url=url, headers={"User-Agent": self.user_agent})
# Only cache if entity exists (status 200)
if response.status_code == 200:
sanitised = self.sanitise_response(response.json())
self.cache.set(url, sanitised)
return True
response = self.get(url=url, return_response=True)
return response.status_code == 200
return False
except requests.RequestException:
return False
def _sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]:
def sanitise_response(self, response: t.Union[dict, list]) -> t.Union[dict, list]:
pattern = re.compile(r"https://api\.github\.com")
if isinstance(response, list):
return [self._sanitise_response(response=item) for item in response]
return [self.sanitise_response(response=item) for item in response]
if isinstance(response, dict):
keys_to_remove = [
@@ -74,6 +103,6 @@ class GitHub:
# Recursively clean nested dicts/lists
for key, value in response.items():
if isinstance(value, (dict, list)):
response[key] = self._sanitise_response(response=value)
response[key] = self.sanitise_response(response=value)
return response

View File

@@ -14,13 +14,27 @@ class GitHubEntity:
self.endpoint = None
self.source = source
def exists(self) -> bool:
def exists(self) -> tuple[bool, dict]:
"""Check if the entity exists on GitHub."""
# Check cache first
cached = github.cache.get(self.endpoint)
if cached is not None:
return True, cached
try:
response = github.get(url=self.endpoint, return_response=True)
return response.status_code == 200
except exceptions.RequestException as err:
return False
if response.status_code == 200:
data = response.json()
# Sanitise the data
sanitised = github.sanitise_response(data)
# Cache the sanitised response
github.cache.set(self.endpoint, sanitised)
return True, sanitised
return False, response.json()
except exceptions.RequestException:
return False, {}
class User(GitHubEntity):

View File

@@ -190,7 +190,7 @@ def check_updates():
with console.status("[dim]Checking for updates...[/dim]") as status:
checker = UpdateChecker()
result = checker.check(__pkg__, __version__)
if result:
if result is not None:
status.stop()
message_dialog(title="Update Available", text=result).run()
else:

View File

@@ -29,17 +29,28 @@ class Dialogs:
def __init__(self): ...
@staticmethod
def quit() -> bool:
def _boolean(title: str, text: str) -> bool:
try:
result = button_dialog(
title="Quit",
text="This will close the session, continue?",
title=title,
text=text,
buttons=[("Yes", True), ("No", False)],
).run()
return result if result is not None else False
except KeyboardInterrupt:
return True
def quit(self) -> bool:
return self._boolean(
title="Quit", text="This will close the session, continue?"
)
def clear_cache(self) -> bool:
return self._boolean(
title="Clear Cache",
text="This will clear all octosuite caches, continue?",
)
@staticmethod
def license():
message_dialog(title="MIT License", text=LICENSE_NOTICE).run()

View File

@@ -7,6 +7,7 @@ from rich.status import Status
from .dialogs import Dialogs
from .prompts import Prompts
from ..core.cache import cache
from ..core.models import User, Repo, Org, Search
from ..lib import check_updates, preview_response, export_response, set_menu_title
from ..lib import console, clear_screen, ascii_banner
@@ -28,8 +29,10 @@ prompts = Prompts()
__all__ = ["Menus"]
class Menus:
def __init__(self):
class BaseMenu:
"""Base class with common menu functionality."""
def __init__(self, main_menu: t.Callable):
# Define which methods require pagination
self.paginated_methods = {
"repos",
@@ -64,14 +67,87 @@ class Menus:
# Search methods (all require pagination)
self.search_methods = {"repos", "users", "commits", "issues", "topics"}
self.mode_handlers = {
"user": self.user,
"repo": self.repo,
"org": self.org,
"search": self.search,
}
self.main_menu = main_menu
def _execute_selection(self, status: Status, **kwargs):
@staticmethod
def target_validator(
identifier: str,
instance: t.Union[User, Repo, Org],
callback: t.Callable,
*callback_args,
) -> bool:
"""Validate if an entity exists on GitHub.
Args:
identifier: Name/identifier of the entity
instance: The instance with an exists() method (User, Repo, or Org)
callback: Function to call if validation fails
*callback_args: Arguments to pass to callback
Returns:
bool: True if entity exists, False otherwise
"""
if isinstance(instance, User):
target_type = "user"
elif isinstance(instance, Repo):
target_type = "repo"
else:
target_type = "org"
with Status(
f"[dim]Validating {target_type}'s ({identifier}) existence...[/dim]",
console=console,
) as status:
exists, response = instance.exists()
if exists:
ascii_banner(text=identifier)
console.print(
f"[bold][green]✔[/green] {target_type.capitalize()} ({identifier}) exists on GitHub[/bold]"
)
return True
else:
status.stop()
if response and "message" in response:
console.print(
f"[bold][yellow]✘[/yellow] {response['message']}[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
callback(*callback_args)
return False
def execute_and_handle_response(
self,
instance: t.Union[User, Repo, Org, Search],
method_name: str,
target_type: t.Literal["user", "repo", "org"],
source: str,
):
"""Execute a method and handle its response.
Args:
instance: The instance to execute the method on
method_name: Name of the method to execute
target_type: Type of entity ("user", "repo", "org")
source: Source identifier for response handling
"""
valid_methods = self.paginated_methods | self.non_paginated_methods
if method_name in valid_methods:
with Status(
status=f"[dim]Initialising {target_type} {method_name}...[/dim]",
console=console,
) as status:
data = self.execute_selection(
source=source,
instance=instance,
method_name=method_name,
status=status,
)
status.stop()
self.response_handler(data=data, data_type=method_name, source=source)
def execute_selection(self, status: Status, **kwargs):
"""Execute a method on an instance, prompting for pagination if needed."""
instance = kwargs.get("instance")
method_name = kwargs.get("method_name")
@@ -87,22 +163,7 @@ class Menus:
status.update(f"[dim]Getting {method_name} from {source}...[/dim]")
return method(**params)
def _navigation(self, option: str, callback: t.Callable, *callback_args):
"""Handle navigation options (back, quit, change settings)."""
navigation_handlers = {
"back": lambda: self.main(),
"quit": lambda: (
sys.exit() if dialogs.quit() else callback(*callback_args)
),
}
handler = navigation_handlers.get(option)
if handler:
handler()
return True
return False
def response_handling(self, data: t.Union[dict, list], data_type: str, source: str):
def response_handler(self, data: t.Union[dict, list], data_type: str, source: str):
"""Export data to file in user-selected format(s)."""
preview_response(data=data, source=source, _type=data_type)
@@ -140,8 +201,10 @@ class Menus:
if dialogs.quit():
sys.exit()
else:
self.response_handling(
data=data, data_type=data_type, source=source
self.response_handler(
data=data,
data_type=data_type,
source=source,
)
return
@@ -177,6 +240,33 @@ class Menus:
except KeyboardInterrupt:
print("\nExport cancelled")
def navigation_handler(self, option: str, callback: t.Callable, *callback_args):
"""Handle navigation options (back, quit, change settings)."""
navigation_handlers = {
"back": lambda: self.main_menu(),
"quit": lambda: (
sys.exit() if dialogs.quit() else callback(*callback_args)
),
}
handler = navigation_handlers.get(option)
if handler:
handler()
return True
return False
class Menus(BaseMenu):
def __init__(self):
super().__init__(main_menu=self.main)
self.mode_handlers = {
"user": self.user,
"repo": self.repo,
"org": self.org,
"search": self.search,
}
def main(self):
"""Main menu to select mode."""
set_menu_title(menu_type="home")
@@ -207,6 +297,11 @@ class Menus:
value="search",
description="Search GitHub",
),
q.Choice(
title="Clear cache",
value="clear_cache",
description="Clear all in-memory cache from Octosuite",
),
q.Choice(
title="Updates",
value="updates",
@@ -239,6 +334,10 @@ class Menus:
elif action == "updates":
check_updates()
self.main()
elif action == "clear_cache":
if dialogs.clear_cache():
cache.clear()
self.main()
elif action is None:
sys.exit()
else:
@@ -262,9 +361,7 @@ class Menus:
style=CUSTOM_STYLE,
)
clear_screen()
ascii_banner(text=query)
option = q.select(
"What would you like to do/search?",
choices=[
@@ -302,7 +399,7 @@ class Menus:
q.Choice(
title="Go Back",
value="back",
description="Go back to ewous menu",
description="Go back to previous menu",
shortcut_key="b",
),
q.Choice(
@@ -320,9 +417,10 @@ class Menus:
if option is None:
self.main()
return
# Handle navigation
if self._navigation(option, self.search, query):
if self.navigation_handler(option, self.search, query):
return
# Handle change query
@@ -332,11 +430,9 @@ class Menus:
# Execute search if it's a valid method
if option in self.search_methods:
with console.status(
status=f"[dim]Initialising {option} search...[/dim]"
with Status(
status=f"[dim]Initialising {option} search...[/dim]", console=console
) as status:
# Get pagination params
status.stop()
params = prompts.pagination_params()
status.start()
@@ -355,15 +451,16 @@ class Menus:
if data:
items = data.get("items")
status.stop()
self.response_handling(
self.response_handler(
data=items if items is not None else data,
data_type=option,
source=query,
)
# After handling response, show menu again
self.search(query=query)
def user(self, username: t.Optional[str] = None):
def user(self, username: t.Optional[str] = None, is_validated: bool = False):
"""User menu for querying user data."""
set_menu_title(menu_type="user")
clear_screen()
@@ -376,8 +473,15 @@ class Menus:
qmark="@",
)
clear_screen()
ascii_banner(text=username)
user = User(name=username)
if not is_validated:
if not self.target_validator(
identifier=username,
instance=user,
callback=self.user,
):
return
option = q.select(
"What would you like to do/get?",
@@ -459,8 +563,10 @@ class Menus:
if option is None:
self.main()
return
# Handle navigation
if self._navigation(option, self.user, username):
if self.navigation_handler(option, self.user, username):
return
# Handle change username
@@ -468,40 +574,23 @@ class Menus:
self.user()
return
# Execute action if it's a valid method
valid_methods = self.paginated_methods | self.non_paginated_methods
if option in valid_methods:
with Status(
status=f"[dim]Initialising user {option}...[/dim]",
console=console,
) as status:
user = User(name=username)
# Execute action and handle response
self.execute_and_handle_response(
instance=user,
method_name=option,
target_type="user",
source=username,
)
status.update(f"[dim]Validating user's ({username}) existence...[/dim]")
if user.exists():
console.print(
f"[bold][green]✔[/green] User ({username}) exists on GitHub[/bold]"
)
data = self._execute_selection(
source=username,
instance=user,
method_name=option,
status=status,
)
# After handling response, show menu again WITHOUT re-validating
self.user(username=username, is_validated=True)
status.stop()
self.response_handling(data=data, data_type=option, source=username)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] User ({username}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.user()
self.user(username=username)
def repo(self, name: t.Optional[str] = None, owner: t.Optional[str] = None):
def repo(
self,
name: t.Optional[str] = None,
owner: t.Optional[str] = None,
is_validated: bool = False,
):
"""Repository menu for querying repo data."""
set_menu_title(menu_type="repo")
clear_screen()
@@ -521,8 +610,17 @@ class Menus:
qmark="@",
)
clear_screen()
ascii_banner(text=f"{owner}/{name}")
repo = Repo(name=name, owner=owner)
source = f"{owner}/{name}"
# Only validate if not already validated
if not is_validated:
if not self.target_validator(
identifier=source,
instance=repo,
callback=self.repo,
):
return
option = q.select(
"What would you like to do/get?",
@@ -643,9 +741,10 @@ class Menus:
if option is None:
self.main()
return
# Handle navigation
if self._navigation(option, self.repo, name, owner):
if self.navigation_handler(option, self.repo, name, owner):
return
# Handle change options
@@ -659,40 +758,18 @@ class Menus:
change_handlers[option]()
return
# Execute action if it's a valid method
valid_methods = self.paginated_methods | self.non_paginated_methods
if option in valid_methods:
source = f"{owner}/{name}"
with Status(
status=f"[dim]Initialising repository {option}...[/dim]",
console=console,
) as status:
repo = Repo(name=name, owner=owner)
# Execute action and handle response
self.execute_and_handle_response(
instance=repo,
method_name=option,
target_type="repo",
source=source,
)
status.update(
f"[dim]Validating repository's ({source}) existence...[/dim]"
)
if repo.exists():
console.print(
f"[bold][green]✔[/green] Repository ({source}) exists on GitHub[/bold]"
)
data = self._execute_selection(
source=source, instance=repo, method_name=option, status=status
)
# After handling response, show menu again WITHOUT re-validating
self.repo(name=name, owner=owner, is_validated=True)
status.stop()
self.response_handling(data=data, data_type=option, source=source)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Repository ({source}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.repo()
self.repo(name=name, owner=owner)
def org(self, name: t.Optional[str] = None):
def org(self, name: t.Optional[str] = None, is_validated: bool = False):
"""Organisation menu for querying org data."""
set_menu_title(menu_type="org")
clear_screen()
@@ -705,8 +782,16 @@ class Menus:
qmark="@",
)
clear_screen()
ascii_banner(text=name)
org = Org(name=name)
# Only validate if not already validated
if not is_validated:
if not self.target_validator(
identifier=name,
instance=org,
callback=self.org,
):
return
option = q.select(
"What would you like to do?",
@@ -759,6 +844,7 @@ class Menus:
shortcut_key="q",
),
],
pointer=POINTER,
style=CUSTOM_STYLE,
instruction=INSTRUCTIONS,
use_shortcuts=True,
@@ -766,9 +852,10 @@ class Menus:
if option is None:
self.main()
return
# Handle navigation
if self._navigation(option, self.org, name):
if self.navigation_handler(option, self.org, name):
return
# Handle change org
@@ -776,35 +863,10 @@ class Menus:
self.org()
return
# Execute action if it's a valid method
valid_methods = self.paginated_methods | self.non_paginated_methods
if option in valid_methods:
with Status(
status=f"[dim]Initialising organisation {option}...[/dim]",
console=console,
) as status:
org = Org(name=name)
# Execute action and handle response
self.execute_and_handle_response(
instance=org, method_name=option, target_type="org", source=name
)
status.update(
f"[dim]Validating organisation's ({name}) existence...[/dim]"
)
if org.exists():
console.print(
f"[bold][green]✔[/green] Organisation ({name}) exists on GitHub[/bold]"
)
data = self._execute_selection(
source=name, instance=org, method_name=option, status=status
)
status.stop()
self.response_handling(data=data, data_type=option, source=name)
else:
status.stop()
console.print(
f"[bold][yellow]✘[/yellow] Organisation ({name}) doesn't exist on GitHub[/bold]"
)
console.input(" Press [bold]ENTER[/bold] to continue ...")
self.org()
self.org(name=name)
# After handling response, show menu again WITHOUT re-validating
self.org(name=name, is_validated=True)