diff --git a/setup.py b/setup.py index 80cdf61..52f4ce6 100644 --- a/setup.py +++ b/setup.py @@ -116,7 +116,7 @@ def save_progress(progress): # API # ────────────────────────────────────────────────────────── -def api_request(config, method, endpoint, timeout=120, retries=3, **kwargs): +def api_request(config, method, endpoint, timeout=120, retries=5, **kwargs): url = f"{config['anythingllm']['base_url']}{endpoint}" headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"} if "json" in kwargs: @@ -124,27 +124,40 @@ def api_request(config, method, endpoint, timeout=120, retries=3, **kwargs): for attempt in range(retries): try: - log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1})") + log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1}/{retries})") resp = getattr(requests, method)(url, headers=headers, timeout=timeout, **kwargs) if resp.status_code not in (200, 201): - log.error(f"API {resp.status_code}: {resp.text[:300]}") + log.warning(f"API {resp.status_code} on attempt {attempt+1}: {resp.text[:200]}") if attempt < retries - 1: - time.sleep(3) + time.sleep(3 + attempt * 2) continue return None - log.debug(f"API {method.upper()} {endpoint} → {resp.status_code}") - return resp.json() + data = resp.json() + # Check for embedded error in response body (API returns 200 but embed failed) + if data.get("error"): + err = data["error"] + log.warning(f"API 200 but body error on attempt {attempt+1}: {err}") + if attempt < retries - 1: + time.sleep(3 + attempt * 2) + continue + return None + log.debug(f"API {method.upper()} {endpoint} → OK") + return data except requests.exceptions.Timeout: - log.warning(f"API timeout ({timeout}s) on {endpoint} (attempt {attempt+1}/{retries})") + log.warning(f"API timeout ({timeout}s) attempt {attempt+1}/{retries}") if attempt < retries - 1: - time.sleep(5) + time.sleep(5 + attempt * 3) except requests.exceptions.ConnectionError as e: - log.error(f"API connection error: {e}") + log.warning(f"API connection error attempt {attempt+1}/{retries}: {e}") if attempt < retries - 1: - time.sleep(5) + time.sleep(5 + attempt * 3) except Exception as e: log.error(f"API unexpected error: {e}") + if attempt < retries - 1: + time.sleep(3) + continue return None + log.error(f"API {method.upper()} {endpoint} FAILED after {retries} attempts") return None @@ -260,9 +273,9 @@ def verify_workspace_vectors(config, slug, test_query="test"): """Verify a workspace actually has working vectors by doing a test query.""" result = api_request(config, "post", f"/workspace/{slug}/chat", json={"message": test_query, "mode": "query"}, - timeout=30, retries=1) + timeout=60, retries=5) if not result: - return False, "API request failed" + return False, "API request failed after 5 retries" if result.get("error"): return False, result["error"] @@ -271,10 +284,41 @@ def verify_workspace_vectors(config, slug, test_query="test"): if sources: return True, f"{len(sources)} sources found" else: - # No sources could mean no relevant docs or embedding failure return False, "no sources returned" +def verify_lancedb_physical(slug): + """Check that LanceDB actually has data files for this workspace.""" + lancedb_path = ANYTHINGLLM_STORAGE / "lancedb" + for d in lancedb_path.iterdir() if lancedb_path.exists() else []: + if d.is_dir() and slug in d.name and d.name.endswith(".lance"): + size = sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) + file_count = sum(1 for f in d.rglob("*") if f.is_file()) + return True, f"{size / 1024:.0f}KB, {file_count} files" + return False, "no .lance directory found" + + +def verify_workspace_docs_api(config, slug): + """Check workspace document count via AnythingLLM API.""" + result = api_request(config, "get", f"/workspace/{slug}", timeout=15, retries=3) + if not result: + return None + ws = result.get("workspace", [{}]) + if isinstance(ws, list): + ws = ws[0] if ws else {} + docs = ws.get("documents", []) + return len(docs) if isinstance(docs, list) else 0 + + +def get_lancedb_size(slug): + """Get current LanceDB size for a workspace slug.""" + lancedb_path = ANYTHINGLLM_STORAGE / "lancedb" + for d in lancedb_path.iterdir() if lancedb_path.exists() else []: + if d.is_dir() and slug in d.name and d.name.endswith(".lance"): + return sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) + return 0 + + # ────────────────────────────────────────────────────────── # PDF DETECTION & OCR # ────────────────────────────────────────────────────────── @@ -616,6 +660,7 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay): persona_ok = 0 persona_fail = 0 consecutive_fails = 0 + lance_size_before = get_lancedb_size(slug) for bs in range(0, len(new_docs), embed_batch): batch = new_docs[bs:bs + embed_batch] @@ -624,52 +669,61 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay): log.debug(f" {codename} batch {batch_num}/{total_batches} ({len(batch)} docs)") + # Each embed call gets 5 retries (Olla may route to instance without model) result = api_request(config, "post", f"/workspace/{slug}/update-embeddings", json={"adds": batch, "deletes": []}, - timeout=300, retries=3) + timeout=300, retries=5) if not result: persona_fail += len(batch) consecutive_fails += 1 - log.error(f" ✗ {codename} batch {batch_num}/{total_batches}: API FAILED") - if consecutive_fails >= 3: - log.error(f" ✗ {codename}: 3 consecutive failures, skipping remaining") + log.error(f" ✗ {codename} batch {batch_num}/{total_batches}: " + f"FAILED after 5 retries") + if consecutive_fails >= 5: + log.error(f" ✗ {codename}: 5 consecutive failures, skipping") break time.sleep(delay) continue - # Check for embedding errors in the response - ws_data = result.get("workspace", {}) - embed_error = result.get("error") - if embed_error: - persona_fail += len(batch) - consecutive_fails += 1 - log.error(f" ✗ {codename} batch {batch_num}/{total_batches}: {embed_error}") - if "not found" in str(embed_error).lower(): - log.error(f" ✗ ABORTING: Embedding model error — fix config and retry") - save_progress(progress) - return - if consecutive_fails >= 3: - log.error(f" ✗ {codename}: 3 consecutive failures, skipping remaining") - break - time.sleep(delay) - continue - - # Verify first batch actually created vectors (one-time per persona) + # Verify first batch: check LanceDB physically grew + vectors searchable if batch_num == 1: - time.sleep(2) - vec_ok, vec_msg = verify_workspace_vectors(config, slug) - if not vec_ok: - log.error(f" ✗ {codename}: first batch embed returned 200 but " - f"vectors NOT searchable: {vec_msg}") - if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower(): - log.error(f" ✗ ABORTING: Embedding model broken") - save_progress(progress) - return - # might be "no sources" for unrelated query — continue but warn - log.warning(f" ⚠ {codename}: continuing despite verification warning") + time.sleep(3) + lance_size_after = get_lancedb_size(slug) + lance_grew = lance_size_after > lance_size_before + lance_ok, lance_msg = verify_lancedb_physical(slug) + + if not lance_ok or not lance_grew: + log.error(f" ✗ {codename}: API returned 200 but LanceDB did NOT grow " + f"(before={lance_size_before}, after={lance_size_after})") + # Try query verification as second check + vec_ok, vec_msg = verify_workspace_vectors(config, slug) + if not vec_ok: + log.error(f" ✗ {codename}: query verification also failed: {vec_msg}") + if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower(): + log.error(f" ✗ ABORTING: Embedding model broken — " + f"vectors NOT being created") + save_progress(progress) + return + else: + log.warning(f" ⚠ {codename}: LanceDB didn't grow but query works — " + f"continuing cautiously") else: - log.info(f" ✓ {codename}: first batch verified — vectors searchable") + log.info(f" ✓ {codename}: first batch VERIFIED — " + f"LanceDB {lance_msg}, grew {lance_size_after - lance_size_before} bytes") + lance_size_before = lance_size_after + + # Periodic LanceDB growth check (every 10 batches) + elif batch_num % 10 == 0: + lance_size_now = get_lancedb_size(slug) + if lance_size_now <= lance_size_before: + log.warning(f" ⚠ {codename} batch {batch_num}: LanceDB NOT growing " + f"({lance_size_before} → {lance_size_now})") + consecutive_fails += 1 + else: + growth = lance_size_now - lance_size_before + log.debug(f" {codename} batch {batch_num}: LanceDB +{growth} bytes") + lance_size_before = lance_size_now + consecutive_fails = 0 progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch) persona_ok += len(batch) @@ -677,12 +731,34 @@ def assign_to_workspaces(config, persona_folders, progress, batch_size, delay): log.info(f" ✓ {codename} batch {batch_num}/{total_batches}: " f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})") - # Save after every batch save_progress(progress) if bs + embed_batch < len(new_docs): time.sleep(delay) + # Final triple verification for this persona + if persona_ok > 0: + lance_ok, lance_msg = verify_lancedb_physical(slug) + ws_doc_count = verify_workspace_docs_api(config, slug) + vec_ok, vec_msg = verify_workspace_vectors(config, slug, test_query="bilgi") + + checks = [] + if lance_ok: + checks.append(f"LanceDB={lance_msg}") + else: + checks.append(f"LanceDB=FAIL({lance_msg})") + if ws_doc_count is not None: + checks.append(f"WorkspaceDocs={ws_doc_count}") + if vec_ok: + checks.append(f"Search={vec_msg}") + else: + checks.append(f"Search=FAIL({vec_msg})") + + all_ok = lance_ok and vec_ok + icon = "✓" if all_ok else "⚠" + level = "info" if all_ok else "warning" + getattr(log, level)(f" {icon} {codename} FINAL: {' | '.join(checks)}") + total_embedded += persona_ok total_failed += persona_fail