From be0a3331347fff2a1b3e942e6c90b3a17d4c4c19 Mon Sep 17 00:00:00 2001 From: salvacybersec Date: Tue, 7 Apr 2026 02:00:20 +0300 Subject: [PATCH] Add ETA calculation to web dashboard and CLI monitor Parses batch timestamps from setup.log, averages last 20 batches, calculates remaining time. Shows ETA, docs remaining, and avg seconds per batch in both web summary cards and CLI header. Co-Authored-By: Claude Opus 4.6 (1M context) --- monitor.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/monitor.py b/monitor.py index f061b9f..4b50387 100755 --- a/monitor.py +++ b/monitor.py @@ -171,19 +171,63 @@ def collect_status(): # Read last N lines from setup.log log_lines = [] + batch_times = [] if LOG_PATH.exists(): try: with open(LOG_PATH, "r", encoding="utf-8") as f: all_lines = f.readlines() log_lines = [l.rstrip() for l in all_lines[-15:]] + + # Parse batch timestamps to calculate ETA + # Format: "01:32:34 [INFO] ✓ arbiter batch 1/80: 5 embedded (5/396)" + import re + for line in all_lines: + m = re.match(r'^(\d{2}:\d{2}:\d{2}) \[INFO\]\s+✓\s+\w+ batch \d+/\d+:', line) + if m: + batch_times.append(m.group(1)) except Exception: pass + # Calculate ETA from batch speed + total_expected = sum(p["expected"] for p in personas) + total_assigned = sum(len(v) for v in ws_docs.values()) + remaining = max(0, total_expected - total_assigned) + + eta_seconds = None + avg_batch_seconds = None + if len(batch_times) >= 2: + try: + from datetime import datetime + times = [] + for t in batch_times[-20:]: # last 20 batches for avg + dt = datetime.strptime(t, "%H:%M:%S") + times.append(dt.hour * 3600 + dt.minute * 60 + dt.second) + if len(times) >= 2: + deltas = [times[i+1] - times[i] for i in range(len(times)-1)] + # Handle midnight wrap + deltas = [d if d > 0 else d + 86400 for d in deltas] + avg_batch_seconds = sum(deltas) / len(deltas) + remaining_batches = remaining / 5 # batch size = 5 + eta_seconds = int(remaining_batches * avg_batch_seconds) + except Exception: + pass + + # Format ETA + eta_str = None + if eta_seconds is not None: + hours = eta_seconds // 3600 + minutes = (eta_seconds % 3600) // 60 + if hours > 0: + eta_str = f"{hours}h {minutes}m" + else: + eta_str = f"{minutes}m" + return { "personas": personas, "clusters": clusters, "total_uploaded": len(uploaded), - "total_assigned": sum(len(v) for v in ws_docs.values()), + "total_assigned": total_assigned, + "total_expected": total_expected, "total_personas": len(workspaces), "personas_with_vectors": sum(1 for p in personas if p["has_vectors"]), "lancedb_size_mb": dir_size_mb(LANCEDB_PATH), @@ -193,6 +237,9 @@ def collect_status(): "script_running": script_running, "timestamp": time.strftime("%H:%M:%S"), "log_tail": log_lines, + "eta": eta_str, + "avg_batch_seconds": round(avg_batch_seconds, 1) if avg_batch_seconds else None, + "remaining_docs": remaining, } @@ -232,9 +279,14 @@ def cli_output(status): lines.append(f" API: {api} Script: {script} " f"LanceDB: {status['lancedb_size_mb']:.0f}MB " f"Docs: {status['docs_size_mb']:.0f}MB") + eta_str = "" + if status.get("eta"): + eta_str = f" ETA: {GREEN}{status['eta']}{RESET} ({status['remaining_docs']} left, {status['avg_batch_seconds']}s/batch)" lines.append(f" Uploaded: {status['total_uploaded']} " - f"Assigned: {status['total_assigned']} " + f"Assigned: {status['total_assigned']}/{status.get('total_expected', '?')} " f"Vectors: {status['personas_with_vectors']}/{status['total_personas']}") + if eta_str: + lines.append(eta_str) lines.append("") # Per-cluster persona table @@ -373,6 +425,7 @@ function render(data) {
Vectors
${data.personas_with_vectors}/${data.total_personas}
${pctVec}%
LanceDB
${data.lancedb_size_mb < 1 ? Math.round(data.lancedb_size_mb * 1024) + 'KB' : Math.round(data.lancedb_size_mb) + 'MB'}
Documents
${Math.round(data.docs_size_mb)}MB
+ ${data.eta ? `
ETA
${data.eta}
${data.remaining_docs} docs left · ${data.avg_batch_seconds}s/batch
` : ''} `; // Status bar