Robust embed verification: 5 retries per call + LanceDB physical checks

Every API call:
- 5 retries with progressive backoff (Olla routes to random instances)
- Body error detection (API 200 but embed error in response)

Per persona verification:
- First batch: LanceDB must physically grow + query must return sources
- Every 10th batch: LanceDB growth check
- Final: triple check (LanceDB size + workspace doc count API + search query)
- Abort on model-not-found errors, skip after 5 consecutive failures

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
salvacybersec
2026-04-07 01:30:09 +03:00
parent 3d4654f454
commit dfde494bd4

172
setup.py
View File

@@ -116,7 +116,7 @@ def save_progress(progress):
# API
# ──────────────────────────────────────────────────────────
def api_request(config, method, endpoint, timeout=120, retries=3, **kwargs):
def api_request(config, method, endpoint, timeout=120, retries=5, **kwargs):
url = f"{config['anythingllm']['base_url']}{endpoint}"
headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"}
if "json" in kwargs:
@@ -124,27 +124,40 @@ def api_request(config, method, endpoint, timeout=120, retries=3, **kwargs):
for attempt in range(retries):
try:
log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1})")
log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1}/{retries})")
resp = getattr(requests, method)(url, headers=headers, timeout=timeout, **kwargs)
if resp.status_code not in (200, 201):
log.error(f"API {resp.status_code}: {resp.text[:300]}")
log.warning(f"API {resp.status_code} on attempt {attempt+1}: {resp.text[:200]}")
if attempt < retries - 1:
time.sleep(3)
time.sleep(3 + attempt * 2)
continue
return None
log.debug(f"API {method.upper()} {endpoint}{resp.status_code}")
return resp.json()
data = resp.json()
# Check for embedded error in response body (API returns 200 but embed failed)
if data.get("error"):
err = data["error"]
log.warning(f"API 200 but body error on attempt {attempt+1}: {err}")
if attempt < retries - 1:
time.sleep(3 + attempt * 2)
continue
return None
log.debug(f"API {method.upper()} {endpoint} → OK")
return data
except requests.exceptions.Timeout:
log.warning(f"API timeout ({timeout}s) on {endpoint} (attempt {attempt+1}/{retries})")
log.warning(f"API timeout ({timeout}s) attempt {attempt+1}/{retries}")
if attempt < retries - 1:
time.sleep(5)
time.sleep(5 + attempt * 3)
except requests.exceptions.ConnectionError as e:
log.error(f"API connection error: {e}")
log.warning(f"API connection error attempt {attempt+1}/{retries}: {e}")
if attempt < retries - 1:
time.sleep(5)
time.sleep(5 + attempt * 3)
except Exception as e:
log.error(f"API unexpected error: {e}")
if attempt < retries - 1:
time.sleep(3)
continue
return None
log.error(f"API {method.upper()} {endpoint} FAILED after {retries} attempts")
return None
@@ -260,9 +273,9 @@ def verify_workspace_vectors(config, slug, test_query="test"):
"""Verify a workspace actually has working vectors by doing a test query."""
result = api_request(config, "post", f"/workspace/{slug}/chat",
json={"message": test_query, "mode": "query"},
timeout=30, retries=1)
timeout=60, retries=5)
if not result:
return False, "API request failed"
return False, "API request failed after 5 retries"
if result.get("error"):
return False, result["error"]
@@ -271,10 +284,41 @@ def verify_workspace_vectors(config, slug, test_query="test"):
if sources:
return True, f"{len(sources)} sources found"
else:
# No sources could mean no relevant docs or embedding failure
return False, "no sources returned"
def verify_lancedb_physical(slug):
"""Check that LanceDB actually has data files for this workspace."""
lancedb_path = ANYTHINGLLM_STORAGE / "lancedb"
for d in lancedb_path.iterdir() if lancedb_path.exists() else []:
if d.is_dir() and slug in d.name and d.name.endswith(".lance"):
size = sum(f.stat().st_size for f in d.rglob("*") if f.is_file())
file_count = sum(1 for f in d.rglob("*") if f.is_file())
return True, f"{size / 1024:.0f}KB, {file_count} files"
return False, "no .lance directory found"
def verify_workspace_docs_api(config, slug):
"""Check workspace document count via AnythingLLM API."""
result = api_request(config, "get", f"/workspace/{slug}", timeout=15, retries=3)
if not result:
return None
ws = result.get("workspace", [{}])
if isinstance(ws, list):
ws = ws[0] if ws else {}
docs = ws.get("documents", [])
return len(docs) if isinstance(docs, list) else 0
def get_lancedb_size(slug):
"""Get current LanceDB size for a workspace slug."""
lancedb_path = ANYTHINGLLM_STORAGE / "lancedb"
for d in lancedb_path.iterdir() if lancedb_path.exists() else []:
if d.is_dir() and slug in d.name and d.name.endswith(".lance"):
return sum(f.stat().st_size for f in d.rglob("*") if f.is_file())
return 0
# ──────────────────────────────────────────────────────────
# PDF DETECTION & OCR
# ──────────────────────────────────────────────────────────
@@ -616,6 +660,7 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
persona_ok = 0
persona_fail = 0
consecutive_fails = 0
lance_size_before = get_lancedb_size(slug)
for bs in range(0, len(new_docs), embed_batch):
batch = new_docs[bs:bs + embed_batch]
@@ -624,52 +669,61 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
log.debug(f" {codename} batch {batch_num}/{total_batches} ({len(batch)} docs)")
# Each embed call gets 5 retries (Olla may route to instance without model)
result = api_request(config, "post", f"/workspace/{slug}/update-embeddings",
json={"adds": batch, "deletes": []},
timeout=300, retries=3)
timeout=300, retries=5)
if not result:
persona_fail += len(batch)
consecutive_fails += 1
log.error(f"{codename} batch {batch_num}/{total_batches}: API FAILED")
if consecutive_fails >= 3:
log.error(f"{codename}: 3 consecutive failures, skipping remaining")
log.error(f"{codename} batch {batch_num}/{total_batches}: "
f"FAILED after 5 retries")
if consecutive_fails >= 5:
log.error(f"{codename}: 5 consecutive failures, skipping")
break
time.sleep(delay)
continue
# Check for embedding errors in the response
ws_data = result.get("workspace", {})
embed_error = result.get("error")
if embed_error:
persona_fail += len(batch)
consecutive_fails += 1
log.error(f"{codename} batch {batch_num}/{total_batches}: {embed_error}")
if "not found" in str(embed_error).lower():
log.error(f" ✗ ABORTING: Embedding model error — fix config and retry")
save_progress(progress)
return
if consecutive_fails >= 3:
log.error(f"{codename}: 3 consecutive failures, skipping remaining")
break
time.sleep(delay)
continue
# Verify first batch actually created vectors (one-time per persona)
# Verify first batch: check LanceDB physically grew + vectors searchable
if batch_num == 1:
time.sleep(2)
vec_ok, vec_msg = verify_workspace_vectors(config, slug)
if not vec_ok:
log.error(f"{codename}: first batch embed returned 200 but "
f"vectors NOT searchable: {vec_msg}")
if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower():
log.error(f"ABORTING: Embedding model broken")
save_progress(progress)
return
# might be "no sources" for unrelated query — continue but warn
log.warning(f"{codename}: continuing despite verification warning")
time.sleep(3)
lance_size_after = get_lancedb_size(slug)
lance_grew = lance_size_after > lance_size_before
lance_ok, lance_msg = verify_lancedb_physical(slug)
if not lance_ok or not lance_grew:
log.error(f"{codename}: API returned 200 but LanceDB did NOT grow "
f"(before={lance_size_before}, after={lance_size_after})")
# Try query verification as second check
vec_ok, vec_msg = verify_workspace_vectors(config, slug)
if not vec_ok:
log.error(f"{codename}: query verification also failed: {vec_msg}")
if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower():
log.error(f" ✗ ABORTING: Embedding model broken — "
f"vectors NOT being created")
save_progress(progress)
return
else:
log.warning(f"{codename}: LanceDB didn't grow but query works — "
f"continuing cautiously")
else:
log.info(f"{codename}: first batch verified — vectors searchable")
log.info(f"{codename}: first batch VERIFIED — "
f"LanceDB {lance_msg}, grew {lance_size_after - lance_size_before} bytes")
lance_size_before = lance_size_after
# Periodic LanceDB growth check (every 10 batches)
elif batch_num % 10 == 0:
lance_size_now = get_lancedb_size(slug)
if lance_size_now <= lance_size_before:
log.warning(f"{codename} batch {batch_num}: LanceDB NOT growing "
f"({lance_size_before}{lance_size_now})")
consecutive_fails += 1
else:
growth = lance_size_now - lance_size_before
log.debug(f" {codename} batch {batch_num}: LanceDB +{growth} bytes")
lance_size_before = lance_size_now
consecutive_fails = 0
progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch)
persona_ok += len(batch)
@@ -677,12 +731,34 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
log.info(f"{codename} batch {batch_num}/{total_batches}: "
f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})")
# Save after every batch
save_progress(progress)
if bs + embed_batch < len(new_docs):
time.sleep(delay)
# Final triple verification for this persona
if persona_ok > 0:
lance_ok, lance_msg = verify_lancedb_physical(slug)
ws_doc_count = verify_workspace_docs_api(config, slug)
vec_ok, vec_msg = verify_workspace_vectors(config, slug, test_query="bilgi")
checks = []
if lance_ok:
checks.append(f"LanceDB={lance_msg}")
else:
checks.append(f"LanceDB=FAIL({lance_msg})")
if ws_doc_count is not None:
checks.append(f"WorkspaceDocs={ws_doc_count}")
if vec_ok:
checks.append(f"Search={vec_msg}")
else:
checks.append(f"Search=FAIL({vec_msg})")
all_ok = lance_ok and vec_ok
icon = "" if all_ok else ""
level = "info" if all_ok else "warning"
getattr(log, level)(f" {icon} {codename} FINAL: {' | '.join(checks)}")
total_embedded += persona_ok
total_failed += persona_fail