#!/usr/bin/env python3 """ AnythingLLM Persona RAG Monitor Usage: python3 monitor.py # CLI one-shot python3 monitor.py --watch # CLI auto-refresh (2s) python3 monitor.py --web # Web dashboard on :8899 python3 monitor.py --web 9000 # Custom port """ import json import os import sys import time from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path import yaml try: import requests except ImportError: requests = None CONFIG_PATH = Path(__file__).parent / "config.yaml" PROGRESS_PATH = Path(__file__).parent / "upload_progress.json" LOG_PATH = Path(__file__).parent / "setup.log" LANCEDB_PATH = Path.home() / ".config/anythingllm-desktop/storage/lancedb" DOCS_PATH = Path.home() / ".config/anythingllm-desktop/storage/documents" VCACHE_PATH = Path.home() / ".config/anythingllm-desktop/storage/vector-cache" def load_config(): with open(CONFIG_PATH) as f: return yaml.safe_load(f) def load_progress(): if PROGRESS_PATH.exists(): with open(PROGRESS_PATH) as f: return json.load(f) return {} def dir_size_mb(path): if not path.exists(): return 0 try: return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / (1024 * 1024) except Exception: return 0 def get_lance_workspaces(): if not LANCEDB_PATH.exists(): return set() return {d.name.replace(".lance", "") for d in LANCEDB_PATH.iterdir() if d.is_dir() and d.name.endswith(".lance")} def get_lance_sizes(): sizes = {} if not LANCEDB_PATH.exists(): return sizes for d in LANCEDB_PATH.iterdir(): if d.is_dir() and d.name.endswith(".lance"): slug = d.name.replace(".lance", "") sizes[slug] = sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) / (1024 * 1024) return sizes def check_api(config): if not requests: return None try: url = f"{config['anythingllm']['base_url']}/auth" headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"} resp = requests.get(url, headers=headers, timeout=3) return resp.status_code == 200 except Exception: return False def check_script_running(): try: import subprocess result = subprocess.run(["pgrep", "-f", "setup.py"], capture_output=True, text=True) return result.returncode == 0 except Exception: return None def collect_status(): config = load_config() progress = load_progress() workspaces = config.get("workspaces", {}) ws_docs = progress.get("workspace_docs", {}) uploaded = progress.get("uploaded_files", {}) lance_ws = get_lance_workspaces() lance_sizes = get_lance_sizes() # Build expected doc counts per persona folder_counts = {} for fpath, info in uploaded.items(): f = info.get("folder", "") if f: folder_counts[f] = folder_counts.get(f, 0) + 1 personas = [] for codename, ws_cfg in workspaces.items(): slug = ws_cfg["name"].lower() # Normalize slug like AnythingLLM does import re slug = re.sub(r'[^a-z0-9\s-]', '', slug.replace('ş', 's').replace('ç', 'c') .replace('ğ', 'g').replace('ü', 'u').replace('ö', 'o') .replace('ı', 'i').replace('İ', 'i').replace('&', 'and')) slug = re.sub(r'\s+', '-', slug.strip()) # Expected docs from mapped folders expected = 0 for entry in ws_cfg.get("folders", []): fn = entry["path"].replace("/", "_") expected += folder_counts.get(fn, 0) assigned = len(ws_docs.get(codename, [])) has_vectors = any(slug in lw for lw in lance_ws) vector_size = 0 for lw, sz in lance_sizes.items(): if slug in lw: vector_size = sz break personas.append({ "codename": codename, "name": ws_cfg["name"], "expected": expected, "assigned": assigned, "has_vectors": has_vectors, "vector_size_mb": vector_size, }) # Cluster grouping clusters = { "intel": ["frodo", "echo", "ghost", "oracle", "wraith", "scribe", "polyglot"], "cyber": ["neo", "bastion", "sentinel", "specter", "phantom", "cipher", "vortex"], "military": ["marshal", "centurion", "corsair", "warden", "medic"], "humanities": ["chronos", "tribune", "arbiter", "ledger", "sage", "herald", "scholar", "gambit"], "engineering": ["forge", "architect"], } api_ok = check_api(config) script_running = check_script_running() # Read last N lines from setup.log log_lines = [] if LOG_PATH.exists(): try: with open(LOG_PATH, "r", encoding="utf-8") as f: all_lines = f.readlines() log_lines = [l.rstrip() for l in all_lines[-15:]] except Exception: pass return { "personas": personas, "clusters": clusters, "total_uploaded": len(uploaded), "total_assigned": sum(len(v) for v in ws_docs.values()), "total_personas": len(workspaces), "personas_with_vectors": sum(1 for p in personas if p["has_vectors"]), "lancedb_size_mb": dir_size_mb(LANCEDB_PATH), "docs_size_mb": dir_size_mb(DOCS_PATH), "vcache_size_mb": dir_size_mb(VCACHE_PATH), "api_online": api_ok, "script_running": script_running, "timestamp": time.strftime("%H:%M:%S"), "log_tail": log_lines, } # ────────────────────────────────────────────────── # CLI OUTPUT # ────────────────────────────────────────────────── CLUSTER_COLORS = { "intel": "\033[34m", # blue "cyber": "\033[31m", # red "military": "\033[33m", # yellow "humanities": "\033[35m", # magenta "engineering": "\033[36m",# cyan } RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" GREEN = "\033[32m" RED = "\033[31m" def progress_bar(current, total, width=20): if total == 0: return f"{'░' * width}" filled = int(width * min(current, total) / total) return f"{'█' * filled}{'░' * (width - filled)}" def cli_output(status): lines = [] lines.append(f"{BOLD}═══ AnythingLLM Persona Monitor ═══{RESET} {DIM}{status['timestamp']}{RESET}") lines.append("") # System status api = f"{GREEN}●{RESET}" if status["api_online"] else f"{RED}●{RESET}" script = f"{GREEN}● running{RESET}" if status["script_running"] else f"{DIM}○ idle{RESET}" lines.append(f" API: {api} Script: {script} " f"LanceDB: {status['lancedb_size_mb']:.0f}MB " f"Docs: {status['docs_size_mb']:.0f}MB") lines.append(f" Uploaded: {status['total_uploaded']} " f"Assigned: {status['total_assigned']} " f"Vectors: {status['personas_with_vectors']}/{status['total_personas']}") lines.append("") # Per-cluster persona table persona_map = {p["codename"]: p for p in status["personas"]} for cluster_name, members in status["clusters"].items(): color = CLUSTER_COLORS.get(cluster_name, "") lines.append(f" {color}{BOLD}{cluster_name.upper()}{RESET}") for codename in members: p = persona_map.get(codename) if not p: continue vec_icon = f"{GREEN}✓{RESET}" if p["has_vectors"] else f"{DIM}○{RESET}" bar = progress_bar(p["assigned"], p["expected"]) pct = (p["assigned"] / p["expected"] * 100) if p["expected"] > 0 else 0 size_str = f"{p['vector_size_mb']:.0f}MB" if p["vector_size_mb"] > 0 else "" lines.append(f" {vec_icon} {codename:<12} {bar} {p['assigned']:>5}/{p['expected']:<5} " f"{pct:>5.0f}% {size_str}") lines.append("") # Log tail log_tail = status.get("log_tail", []) if log_tail: lines.append(f" {BOLD}── Log (setup.log) ──{RESET}") for ll in log_tail: # Colorize log levels if "[ERROR]" in ll: lines.append(f" {RED}{ll}{RESET}") elif "[WARNING]" in ll: lines.append(f" \033[33m{ll}{RESET}") elif "✓" in ll: lines.append(f" {GREEN}{ll}{RESET}") else: lines.append(f" {DIM}{ll}{RESET}") lines.append("") return "\n".join(lines) def cli_mode(watch=False): while True: status = collect_status() if watch: os.system("clear") print(cli_output(status)) if not watch: break time.sleep(2) # ────────────────────────────────────────────────── # WEB DASHBOARD # ────────────────────────────────────────────────── HTML_TEMPLATE = """ AnythingLLM Monitor

AnythingLLM Persona Monitor

Loading...

Log (setup.log)

No log data
""" class MonitorHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/api/status": status = collect_status() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(status).encode()) elif self.path in ("/", "/index.html"): self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(HTML_TEMPLATE.encode()) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # quiet def web_mode(port=8899): server = HTTPServer(("0.0.0.0", port), MonitorHandler) print(f" AnythingLLM Monitor → http://localhost:{port}") print(f" API endpoint → http://localhost:{port}/api/status") print(f" Press Ctrl+C to stop\n") try: server.serve_forever() except KeyboardInterrupt: print("\n Stopped.") server.server_close() if __name__ == "__main__": args = sys.argv[1:] if "--web" in args: idx = args.index("--web") port = int(args[idx + 1]) if idx + 1 < len(args) and args[idx + 1].isdigit() else 8899 web_mode(port) elif "--watch" in args: cli_mode(watch=True) else: cli_mode(watch=False)