Add robust embedding verification — no silent failures

- Pre-flight: test embedding model with 3 retries (120s timeout for cold start)
- First-batch verify: after batch 1, query workspace to confirm vectors searchable
- Abort on model errors: "not found" or "failed to embed" stops immediately
- Consecutive failure guard: 3 fails in a row → skip persona, continue others
- Response error check: API 200 but embed error in body → caught and logged
- Never record progress for failed embeds

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
salvacybersec
2026-04-07 01:26:41 +03:00
parent fd29c0efb6
commit 3d4654f454

151
setup.py
View File

@@ -204,6 +204,77 @@ def get_existing_workspaces(config):
return {}
# ──────────────────────────────────────────────────────────
# EMBEDDING HEALTH CHECKS
# ──────────────────────────────────────────────────────────
def verify_embedding_model(config):
"""Test that the configured embedding model actually works via Olla/Ollama."""
embedding_path = ""
model = ""
# Read from AnythingLLM .env
env_file = ANYTHINGLLM_STORAGE / ".env"
if env_file.exists():
for line in env_file.read_text().splitlines():
if line.startswith("EMBEDDING_BASE_PATH="):
embedding_path = line.split("=", 1)[1].strip("'\"")
elif line.startswith("EMBEDDING_MODEL_PREF="):
model = line.split("=", 1)[1].strip("'\"")
if not embedding_path or not model:
log.warning(f"Cannot determine embedding config (path={embedding_path}, model={model})")
return False
log.info(f"Testing embedding model: {model} via {embedding_path}")
for attempt in range(3):
try:
log.debug(f" Embed test attempt {attempt + 1}/3")
resp = requests.post(
f"{embedding_path}/api/embed",
json={"model": model, "input": "embedding health check test"},
timeout=120, # first call may need to load model into GPU
)
data = resp.json()
if "embeddings" in data and len(data["embeddings"]) > 0:
dims = len(data["embeddings"][0])
log.info(f" ✓ Embedding model OK: {model} ({dims}d)")
return True
elif "error" in data:
log.warning(f" Attempt {attempt + 1}: {data['error']}")
if attempt < 2:
time.sleep(5)
except requests.exceptions.Timeout:
log.warning(f" Attempt {attempt + 1}: timeout (model loading?)")
if attempt < 2:
time.sleep(10)
except Exception as e:
log.warning(f" Attempt {attempt + 1}: {e}")
if attempt < 2:
time.sleep(5)
log.error(f" ✗ Embedding model FAILED after 3 attempts")
return False
def verify_workspace_vectors(config, slug, test_query="test"):
"""Verify a workspace actually has working vectors by doing a test query."""
result = api_request(config, "post", f"/workspace/{slug}/chat",
json={"message": test_query, "mode": "query"},
timeout=30, retries=1)
if not result:
return False, "API request failed"
if result.get("error"):
return False, result["error"]
sources = result.get("sources", [])
if sources:
return True, f"{len(sources)} sources found"
else:
# No sources could mean no relevant docs or embedding failure
return False, "no sources returned"
# ──────────────────────────────────────────────────────────
# PDF DETECTION & OCR
# ──────────────────────────────────────────────────────────
@@ -493,6 +564,12 @@ def upload_file_batch(config, folder_name, files, progress, batch_size, delay):
def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
"""Phase C2: assign uploaded docs to persona workspaces."""
log.info("── Assigning to workspaces ──")
# Pre-flight: verify embedding model works
if not verify_embedding_model(config):
log.error("ABORTING: Embedding model is not working. Fix model config and retry.")
return
existing_ws = get_existing_workspaces(config)
if not existing_ws:
@@ -502,6 +579,8 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
total_personas = len(persona_folders)
total_embedded = 0
total_failed = 0
verified_workspaces = 0
failed_verification = []
for idx, (codename, folders) in enumerate(sorted(persona_folders.items()), 1):
ws_name = config["workspaces"][codename]["name"]
@@ -536,6 +615,7 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
embed_batch = min(batch_size, 5)
persona_ok = 0
persona_fail = 0
consecutive_fails = 0
for bs in range(0, len(new_docs), embed_batch):
batch = new_docs[bs:bs + embed_batch]
@@ -547,14 +627,55 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
result = api_request(config, "post", f"/workspace/{slug}/update-embeddings",
json={"adds": batch, "deletes": []},
timeout=300, retries=3)
if result:
progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch)
persona_ok += len(batch)
log.info(f"{codename} batch {batch_num}/{total_batches}: "
f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})")
else:
if not result:
persona_fail += len(batch)
log.error(f"{codename} batch {batch_num}/{total_batches}: FAILED")
consecutive_fails += 1
log.error(f"{codename} batch {batch_num}/{total_batches}: API FAILED")
if consecutive_fails >= 3:
log.error(f"{codename}: 3 consecutive failures, skipping remaining")
break
time.sleep(delay)
continue
# Check for embedding errors in the response
ws_data = result.get("workspace", {})
embed_error = result.get("error")
if embed_error:
persona_fail += len(batch)
consecutive_fails += 1
log.error(f"{codename} batch {batch_num}/{total_batches}: {embed_error}")
if "not found" in str(embed_error).lower():
log.error(f" ✗ ABORTING: Embedding model error — fix config and retry")
save_progress(progress)
return
if consecutive_fails >= 3:
log.error(f"{codename}: 3 consecutive failures, skipping remaining")
break
time.sleep(delay)
continue
# Verify first batch actually created vectors (one-time per persona)
if batch_num == 1:
time.sleep(2)
vec_ok, vec_msg = verify_workspace_vectors(config, slug)
if not vec_ok:
log.error(f"{codename}: first batch embed returned 200 but "
f"vectors NOT searchable: {vec_msg}")
if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower():
log.error(f" ✗ ABORTING: Embedding model broken")
save_progress(progress)
return
# might be "no sources" for unrelated query — continue but warn
log.warning(f"{codename}: continuing despite verification warning")
else:
log.info(f"{codename}: first batch verified — vectors searchable")
progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch)
persona_ok += len(batch)
consecutive_fails = 0
log.info(f"{codename} batch {batch_num}/{total_batches}: "
f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})")
# Save after every batch
save_progress(progress)
@@ -564,9 +685,21 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
total_embedded += persona_ok
total_failed += persona_fail
log.info(f" {codename} done: {persona_ok} ok, {persona_fail} failed")
log.info(f"── Assignment complete: {total_embedded} embedded, {total_failed} failed ──")
if persona_ok > 0:
verified_workspaces += 1
log.info(f" {codename} done: {persona_ok} ok, {persona_fail} failed ✓")
elif persona_fail > 0:
failed_verification.append(codename)
log.error(f" {codename} done: {persona_ok} ok, {persona_fail} FAILED ✗")
else:
log.info(f" {codename} done: no docs")
log.info(f"── Assignment complete ──")
log.info(f" Embedded: {total_embedded}, Failed: {total_failed}")
log.info(f" Verified: {verified_workspaces}/{total_personas}")
if failed_verification:
log.error(f" FAILED verification: {', '.join(failed_verification)}")
def upload_documents(config, persona_list=None, priority_filter=None,