diff --git a/monitor.py b/monitor.py index f061b9f..4b50387 100755 --- a/monitor.py +++ b/monitor.py @@ -171,19 +171,63 @@ def collect_status(): # Read last N lines from setup.log log_lines = [] + batch_times = [] if LOG_PATH.exists(): try: with open(LOG_PATH, "r", encoding="utf-8") as f: all_lines = f.readlines() log_lines = [l.rstrip() for l in all_lines[-15:]] + + # Parse batch timestamps to calculate ETA + # Format: "01:32:34 [INFO] ✓ arbiter batch 1/80: 5 embedded (5/396)" + import re + for line in all_lines: + m = re.match(r'^(\d{2}:\d{2}:\d{2}) \[INFO\]\s+✓\s+\w+ batch \d+/\d+:', line) + if m: + batch_times.append(m.group(1)) except Exception: pass + # Calculate ETA from batch speed + total_expected = sum(p["expected"] for p in personas) + total_assigned = sum(len(v) for v in ws_docs.values()) + remaining = max(0, total_expected - total_assigned) + + eta_seconds = None + avg_batch_seconds = None + if len(batch_times) >= 2: + try: + from datetime import datetime + times = [] + for t in batch_times[-20:]: # last 20 batches for avg + dt = datetime.strptime(t, "%H:%M:%S") + times.append(dt.hour * 3600 + dt.minute * 60 + dt.second) + if len(times) >= 2: + deltas = [times[i+1] - times[i] for i in range(len(times)-1)] + # Handle midnight wrap + deltas = [d if d > 0 else d + 86400 for d in deltas] + avg_batch_seconds = sum(deltas) / len(deltas) + remaining_batches = remaining / 5 # batch size = 5 + eta_seconds = int(remaining_batches * avg_batch_seconds) + except Exception: + pass + + # Format ETA + eta_str = None + if eta_seconds is not None: + hours = eta_seconds // 3600 + minutes = (eta_seconds % 3600) // 60 + if hours > 0: + eta_str = f"{hours}h {minutes}m" + else: + eta_str = f"{minutes}m" + return { "personas": personas, "clusters": clusters, "total_uploaded": len(uploaded), - "total_assigned": sum(len(v) for v in ws_docs.values()), + "total_assigned": total_assigned, + "total_expected": total_expected, "total_personas": len(workspaces), "personas_with_vectors": sum(1 for p in personas if p["has_vectors"]), "lancedb_size_mb": dir_size_mb(LANCEDB_PATH), @@ -193,6 +237,9 @@ def collect_status(): "script_running": script_running, "timestamp": time.strftime("%H:%M:%S"), "log_tail": log_lines, + "eta": eta_str, + "avg_batch_seconds": round(avg_batch_seconds, 1) if avg_batch_seconds else None, + "remaining_docs": remaining, } @@ -232,9 +279,14 @@ def cli_output(status): lines.append(f" API: {api} Script: {script} " f"LanceDB: {status['lancedb_size_mb']:.0f}MB " f"Docs: {status['docs_size_mb']:.0f}MB") + eta_str = "" + if status.get("eta"): + eta_str = f" ETA: {GREEN}{status['eta']}{RESET} ({status['remaining_docs']} left, {status['avg_batch_seconds']}s/batch)" lines.append(f" Uploaded: {status['total_uploaded']} " - f"Assigned: {status['total_assigned']} " + f"Assigned: {status['total_assigned']}/{status.get('total_expected', '?')} " f"Vectors: {status['personas_with_vectors']}/{status['total_personas']}") + if eta_str: + lines.append(eta_str) lines.append("") # Per-cluster persona table @@ -373,6 +425,7 @@ function render(data) {
Vectors
${data.personas_with_vectors}/${data.total_personas}
${pctVec}%
LanceDB
${data.lancedb_size_mb < 1 ? Math.round(data.lancedb_size_mb * 1024) + 'KB' : Math.round(data.lancedb_size_mb) + 'MB'}
Documents
${Math.round(data.docs_size_mb)}MB
+ ${data.eta ? `
ETA
${data.eta}
${data.remaining_docs} docs left · ${data.avg_batch_seconds}s/batch
` : ''} `; // Status bar