#!/usr/bin/env python3 """ AnythingLLM × Persona Library Integration Setup Three-phase pipeline: Phase A: Upload all text-based files Phase B: OCR all scanned PDFs in-place Phase C: Upload newly OCR'd files + assign to workspaces Usage: python3 setup.py --storage-setup # Symlink direct-uploads/ to HDD python3 setup.py --create-workspaces # Create workspaces + load persona prompts python3 setup.py --upload-documents # Full pipeline: upload → OCR → upload → assign python3 setup.py --reassign # Re-assign existing docs to workspaces (no scan/upload) python3 setup.py --reassign --reset # Reset assignment tracking + re-assign all python3 setup.py --persona frodo # Single persona python3 setup.py --cluster intel # Intel cluster (frodo,echo,ghost,oracle,wraith,scribe,polyglot) python3 setup.py --cluster cyber # Cyber cluster python3 setup.py --cluster military # Military cluster python3 setup.py --cluster humanities # Humanities cluster python3 setup.py --cluster engineering # Engineering cluster python3 setup.py --priority 1 # Only priority 1 (core) folders python3 setup.py --dry-run # Preview python3 setup.py --status # Show state """ import argparse import json import logging import os import shutil import subprocess import sys import time from datetime import datetime from pathlib import Path import yaml try: import requests except ImportError: print("pip install requests pyyaml") sys.exit(1) CONFIG_PATH = Path(__file__).parent / "config.yaml" PROGRESS_PATH = Path(__file__).parent / "upload_progress.json" LOG_PATH = Path(__file__).parent / "setup.log" ANYTHINGLLM_STORAGE = Path.home() / ".config/anythingllm-desktop/storage" SKIP_EXT = set() # ────────────────────────────────────────────────────────── # LOGGING # ────────────────────────────────────────────────────────── log = logging.getLogger("anythingllm") def setup_logging(verbose=False): log.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S") # File handler — always debug level fh = logging.FileHandler(LOG_PATH, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(fmt) log.addHandler(fh) # Console handler — info or debug based on --verbose ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG if verbose else logging.INFO) ch.setFormatter(fmt) log.addHandler(ch) # Flush on every write (critical for background/piped execution) for h in log.handlers: if hasattr(h, 'stream'): h.stream = os.fdopen(os.dup(h.stream.fileno()), 'w', buffering=1) def log_print(msg, level="info"): """Log and print (backward compat for functions that still use print).""" getattr(log, level)(msg) CLUSTERS = { "intel": ["frodo", "echo", "ghost", "oracle", "wraith", "scribe", "polyglot"], "cyber": ["neo", "bastion", "sentinel", "specter", "phantom", "cipher", "vortex"], "military": ["marshal", "centurion", "corsair", "warden", "medic"], "humanities": ["chronos", "tribune", "arbiter", "ledger", "sage", "herald", "scholar", "gambit"], "engineering": ["forge", "architect"], } def load_config(): with open(CONFIG_PATH) as f: cfg = yaml.safe_load(f) global SKIP_EXT SKIP_EXT = set(cfg["processing"]["skip_extensions"]) return cfg def load_progress(): if PROGRESS_PATH.exists(): with open(PROGRESS_PATH) as f: return json.load(f) return {"uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": []} def save_progress(progress): with open(PROGRESS_PATH, "w") as f: json.dump(progress, f, indent=2, ensure_ascii=False) # ────────────────────────────────────────────────────────── # API # ────────────────────────────────────────────────────────── def api_request(config, method, endpoint, timeout=120, retries=5, **kwargs): url = f"{config['anythingllm']['base_url']}{endpoint}" headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"} if "json" in kwargs: headers["Content-Type"] = "application/json" for attempt in range(retries): try: log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1}/{retries})") resp = getattr(requests, method)(url, headers=headers, timeout=timeout, **kwargs) if resp.status_code not in (200, 201): log.warning(f"API {resp.status_code} on attempt {attempt+1}: {resp.text[:200]}") if attempt < retries - 1: time.sleep(3 + attempt * 2) continue return None data = resp.json() # Check for embedded error in response body (API returns 200 but embed failed) if data.get("error"): err = data["error"] log.warning(f"API 200 but body error on attempt {attempt+1}: {err}") if attempt < retries - 1: time.sleep(3 + attempt * 2) continue return None log.debug(f"API {method.upper()} {endpoint} → OK") return data except requests.exceptions.Timeout: log.warning(f"API timeout ({timeout}s) attempt {attempt+1}/{retries}") if attempt < retries - 1: time.sleep(5 + attempt * 3) except requests.exceptions.ConnectionError as e: log.warning(f"API connection error attempt {attempt+1}/{retries}: {e}") if attempt < retries - 1: time.sleep(5 + attempt * 3) except Exception as e: log.error(f"API unexpected error: {e}") if attempt < retries - 1: time.sleep(3) continue return None log.error(f"API {method.upper()} {endpoint} FAILED after {retries} attempts") return None def api_upload(config, file_path, folder_name=None): endpoint = f"/document/upload/{folder_name}" if folder_name else "/document/upload" url = f"{config['anythingllm']['base_url']}{endpoint}" headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"} size_mb = file_path.stat().st_size / (1024 * 1024) timeout = max(120, int(120 + (size_mb / 10) * 30)) try: with open(file_path, "rb") as f: files = {"file": (file_path.name, f, "application/octet-stream")} resp = requests.post(url, headers=headers, files=files, timeout=timeout) if resp.status_code not in (200, 201): return None, resp.text[:200] data = resp.json() if data.get("success") and data.get("documents"): return data["documents"], None return None, data.get("error", "Unknown error") except requests.exceptions.Timeout: return None, f"timeout ({timeout}s)" except Exception as e: return None, str(e) def check_api(config): try: return api_request(config, "get", "/auth") is not None except Exception: return False def check_collector_alive(): try: return requests.get("http://127.0.0.1:8888", timeout=3).status_code == 200 except Exception: return False def wait_for_collector(max_wait=90): print(" ⏳ waiting for collector...", end="", flush=True) for i in range(max_wait): if check_collector_alive(): print(" ✓") return True time.sleep(1) if i % 10 == 9: print(".", end="", flush=True) print(" ✗ timeout") return False def get_existing_workspaces(config): result = api_request(config, "get", "/workspaces") if result and "workspaces" in result: return {ws["name"]: ws for ws in result["workspaces"]} return {} # ────────────────────────────────────────────────────────── # EMBEDDING HEALTH CHECKS # ────────────────────────────────────────────────────────── def verify_embedding_model(config): """Test that the configured embedding model actually works via Olla/Ollama.""" embedding_path = "" model = "" # Read from AnythingLLM .env env_file = ANYTHINGLLM_STORAGE / ".env" if env_file.exists(): for line in env_file.read_text().splitlines(): if line.startswith("EMBEDDING_BASE_PATH="): embedding_path = line.split("=", 1)[1].strip("'\"") elif line.startswith("EMBEDDING_MODEL_PREF="): model = line.split("=", 1)[1].strip("'\"") if not embedding_path or not model: log.warning(f"Cannot determine embedding config (path={embedding_path}, model={model})") return False log.info(f"Testing embedding model: {model} via {embedding_path}") for attempt in range(3): try: log.debug(f" Embed test attempt {attempt + 1}/3") resp = requests.post( f"{embedding_path}/api/embed", json={"model": model, "input": "embedding health check test"}, timeout=120, # first call may need to load model into GPU ) data = resp.json() if "embeddings" in data and len(data["embeddings"]) > 0: dims = len(data["embeddings"][0]) log.info(f" ✓ Embedding model OK: {model} ({dims}d)") return True elif "error" in data: log.warning(f" Attempt {attempt + 1}: {data['error']}") if attempt < 2: time.sleep(5) except requests.exceptions.Timeout: log.warning(f" Attempt {attempt + 1}: timeout (model loading?)") if attempt < 2: time.sleep(10) except Exception as e: log.warning(f" Attempt {attempt + 1}: {e}") if attempt < 2: time.sleep(5) log.error(f" ✗ Embedding model FAILED after 3 attempts") return False def verify_workspace_vectors(config, slug, test_query="test"): """Verify a workspace actually has working vectors by doing a test query.""" result = api_request(config, "post", f"/workspace/{slug}/chat", json={"message": test_query, "mode": "query"}, timeout=60, retries=5) if not result: return False, "API request failed after 5 retries" if result.get("error"): return False, result["error"] sources = result.get("sources", []) if sources: return True, f"{len(sources)} sources found" else: return False, "no sources returned" def verify_lancedb_physical(slug): """Check that LanceDB actually has data files for this workspace.""" lancedb_path = ANYTHINGLLM_STORAGE / "lancedb" for d in lancedb_path.iterdir() if lancedb_path.exists() else []: if d.is_dir() and slug in d.name and d.name.endswith(".lance"): size = sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) file_count = sum(1 for f in d.rglob("*") if f.is_file()) return True, f"{size / 1024:.0f}KB, {file_count} files" return False, "no .lance directory found" def verify_workspace_docs_api(config, slug): """Check workspace document count via AnythingLLM API.""" result = api_request(config, "get", f"/workspace/{slug}", timeout=15, retries=3) if not result: return None ws = result.get("workspace", [{}]) if isinstance(ws, list): ws = ws[0] if ws else {} docs = ws.get("documents", []) return len(docs) if isinstance(docs, list) else 0 def get_lancedb_size(slug): """Get current LanceDB size for a workspace slug.""" lancedb_path = ANYTHINGLLM_STORAGE / "lancedb" for d in lancedb_path.iterdir() if lancedb_path.exists() else []: if d.is_dir() and slug in d.name and d.name.endswith(".lance"): return sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) return 0 # ────────────────────────────────────────────────────────── # PDF DETECTION & OCR # ────────────────────────────────────────────────────────── def is_scanned_pdf(file_path): """Fast scan detection via pdffonts (~0.04s vs pdftotext ~2s). No fonts embedded = scanned/image-only PDF.""" if file_path.suffix.lower() != ".pdf": return False try: proc = subprocess.Popen( ["pdffonts", "-l", "3", str(file_path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: stdout, _ = proc.communicate(timeout=3) lines = [l for l in stdout.decode(errors="ignore").strip().split("\n") if l.strip()] return len(lines) <= 2 except subprocess.TimeoutExpired: proc.kill() proc.wait() return False # assume text if slow except Exception: return False def ocr_pdf(file_path, language="tur+eng", dpi=200): """OCR a scanned PDF in-place. Returns True on success.""" import tempfile tmp_fd, tmp_path = tempfile.mkstemp(suffix=".pdf", dir=file_path.parent) os.close(tmp_fd) tmp_path = Path(tmp_path) try: result = subprocess.run( ["ocrmypdf", "--skip-text", "--rotate-pages", "--deskew", "--jobs", "4", "--image-dpi", str(dpi), "-l", language, "--output-type", "pdf", "--quiet", str(file_path), str(tmp_path)], capture_output=True, text=True, timeout=600, ) if result.returncode in (0, 6) and tmp_path.exists() and tmp_path.stat().st_size > 0: tmp_path.replace(file_path) return True tmp_path.unlink(missing_ok=True) return False except Exception: tmp_path.unlink(missing_ok=True) return False # ────────────────────────────────────────────────────────── # STEP 1: Storage offload # ────────────────────────────────────────────────────────── def storage_setup(config, dry_run=False): hdd_base = Path(config["storage"]["hdd_storage"]) src = ANYTHINGLLM_STORAGE / "direct-uploads" dst = hdd_base / "direct-uploads" print("═══ Storage: direct-uploads/ → HDD ═══\n") if src.is_symlink(): print(f" ✓ already symlinked → {src.resolve()}") return if not dry_run: dst.mkdir(parents=True, exist_ok=True) if src.exists(): shutil.copytree(str(src), str(dst), dirs_exist_ok=True) shutil.rmtree(src) src.symlink_to(dst) print(f" ✓ done\n") # ────────────────────────────────────────────────────────── # STEP 2: Create workspaces + load prompts # ────────────────────────────────────────────────────────── def extract_system_prompt(config, persona_file): personas_dir = Path(config["storage"]["personas_dir"]) fp = personas_dir / persona_file if not fp.exists(): alt = personas_dir / persona_file.replace("/general.md", "") / "_meta.yaml" fp = alt if alt.exists() else None if not fp: return None content = fp.read_text(encoding="utf-8") if fp.suffix == ".yaml": meta = yaml.safe_load(content) return meta.get("system_prompt", meta.get("description", "")) parts = content.split("---") if len(parts) >= 3: try: fm = yaml.safe_load(parts[1]) except yaml.YAMLError: fm = {} tone = fm.get("tone", "") body = "---".join(parts[2:]).strip() return f"Tone: {tone}\n\n{body}" if tone else body return content def create_workspaces(config, persona_list=None, dry_run=False): print("═══ Creating Workspaces ═══\n") if not config["anythingllm"]["api_key"]: print(" ✗ API key not set!") return existing = get_existing_workspaces(config) created = skipped = 0 for codename, ws_config in config["workspaces"].items(): if persona_list and codename not in persona_list: continue name = ws_config["name"] persona_file = ws_config.get("persona_file", "") system_prompt = extract_system_prompt(config, persona_file) if persona_file else "" if name in existing: # Update prompt if workspace exists slug = existing[name].get("slug", "?") if system_prompt and not dry_run: api_request(config, "post", f"/workspace/{slug}/update", json={"openAiPrompt": system_prompt}) print(f" ✓ {codename} (prompt: {len(system_prompt or '')} chars)") skipped += 1 continue print(f" → {codename}: creating '{name}'") if not dry_run: result = api_request(config, "post", "/workspace/new", json={"name": name}) if result: slug = result.get("workspace", {}).get("slug", "?") if system_prompt: api_request(config, "post", f"/workspace/{slug}/update", json={"openAiPrompt": system_prompt}) print(f" ✓ created + prompt ({len(system_prompt)} chars)") created += 1 else: print(f" ✗ failed") print(f"\n Created: {created}, Updated: {skipped}\n") # ────────────────────────────────────────────────────────── # STEP 3: Three-phase upload pipeline # ────────────────────────────────────────────────────────── def collect_all_files(config, book_library, persona_list=None, priority_filter=None, max_size_mb=100): """Scan all folders and classify files as text-based or scanned.""" text_files = {} # folder_name → [paths] scanned_files = {} # folder_name → [paths] persona_folders = {} # codename → [folder_names] folder_to_path = {} # folder_name → source path for codename, ws_config in config["workspaces"].items(): if persona_list and codename not in persona_list: continue persona_folders[codename] = [] for entry in ws_config.get("folders", []): folder_path = entry["path"] priority = entry.get("priority", 2) if priority_filter and priority > priority_filter: continue folder_name = folder_path.replace("/", "_") persona_folders[codename].append(folder_name) if folder_name in text_files: continue # already scanned src = book_library / folder_path folder_to_path[folder_name] = src text_files[folder_name] = [] scanned_files[folder_name] = [] if not src.exists(): print(f" ⚠ {folder_path} not found") continue # Use find with -printf for speed on HDD (one syscall, no per-file stat) try: find_result = subprocess.run( ["find", str(src), "-type", "f", "-not", "-empty", "-printf", "%s %p\n"], capture_output=True, text=True, timeout=120, ) all_files = [] max_bytes = (max_size_mb or 9999) * 1024 * 1024 for line in find_result.stdout.strip().split("\n"): if not line: continue parts = line.split(" ", 1) if len(parts) != 2: continue size, path = int(parts[0]), Path(parts[1]) if path.suffix.lower() not in SKIP_EXT and size <= max_bytes: all_files.append(path) all_files.sort() except Exception: all_files = sorted( f for f in src.rglob("*") if f.is_file() and f.suffix.lower() not in SKIP_EXT and f.stat().st_size > 0 and (not max_size_mb or f.stat().st_size / (1024*1024) <= max_size_mb) ) print(f" {folder_name}: {len(all_files)} files found", flush=True) # Classify every file (pdffonts is fast: ~0.04s per file) for i, f in enumerate(all_files): if is_scanned_pdf(f): scanned_files[folder_name].append(f) else: text_files[folder_name].append(f) if (i + 1) % 500 == 0: print(f" {folder_name}: {i+1}/{len(all_files)} classified...", flush=True) t = len(text_files[folder_name]) s = len(scanned_files[folder_name]) print(f" {folder_name}: {t} text, {s} scanned") return text_files, scanned_files, persona_folders, folder_to_path def upload_file_batch(config, folder_name, files, progress, batch_size, delay): """Upload a list of files to a folder. Returns (uploaded, failed) counts.""" uploaded = failed = 0 new_files = [f for f in files if str(f) not in progress["uploaded_files"]] if not new_files: return 0, 0 print(f" → {folder_name}: {len(new_files)} files") for i, fp in enumerate(new_files): if uploaded > 0 and uploaded % batch_size == 0: print(f" ⏸ batch pause ({delay}s)...") time.sleep(delay) size_mb = fp.stat().st_size / (1024 * 1024) print(f" [{i+1}/{len(new_files)}] {fp.name} ({size_mb:.1f}MB)", end="", flush=True) if not check_collector_alive(): print(f" ⚠ collector down") if not wait_for_collector(): print(" ✗ stopping — restart AnythingLLM and --resume") save_progress(progress) return uploaded, failed docs, error = None, None for attempt in range(3): docs, error = api_upload(config, fp, folder_name=folder_name) if docs: break if error and "not online" in str(error): print(f" ↻", end="", flush=True) time.sleep(5) if not wait_for_collector(): save_progress(progress) return uploaded, failed elif attempt < 2: time.sleep(2) print(f" ↻", end="", flush=True) if docs: progress["uploaded_files"][str(fp)] = { "location": docs[0].get("location", ""), "folder": folder_name, "name": docs[0].get("name", ""), } uploaded += 1 print(f" ✓") else: failed += 1 progress.setdefault("failed_files", {})[str(fp)] = { "folder": folder_name, "error": str(error)[:200], } print(f" ✗ {error}") if uploaded % 10 == 0: save_progress(progress) save_progress(progress) return uploaded, failed def assign_to_workspaces(config, persona_folders, progress, batch_size, delay): """Phase C2: assign uploaded docs to persona workspaces.""" log.info("── Assigning to workspaces ──") # Pre-flight: verify embedding model works if not verify_embedding_model(config): log.error("ABORTING: Embedding model is not working. Fix model config and retry.") return existing_ws = get_existing_workspaces(config) if not existing_ws: log.error("Could not fetch workspaces from API") return total_personas = len(persona_folders) total_embedded = 0 total_failed = 0 verified_workspaces = 0 failed_verification = [] for idx, (codename, folders) in enumerate(sorted(persona_folders.items()), 1): ws_name = config["workspaces"][codename]["name"] ws_info = existing_ws.get(ws_name) if not ws_info: log.warning(f"[{idx}/{total_personas}] {codename}: workspace '{ws_name}' not found, skipping") continue slug = ws_info["slug"] doc_locs = [] for fn in folders: folder_docs = 0 for fpath, info in progress["uploaded_files"].items(): if info.get("folder") == fn and info.get("location"): doc_locs.append(info["location"]) folder_docs += 1 if folder_docs > 0: log.debug(f" {codename}/{fn}: {folder_docs} docs") already = set(progress.get("workspace_docs", {}).get(codename, [])) new_docs = [loc for loc in doc_locs if loc not in already] if not new_docs: if doc_locs: log.info(f"[{idx}/{total_personas}] ✓ {codename}: {len(doc_locs)} docs already assigned") else: log.info(f"[{idx}/{total_personas}] ○ {codename}: no uploaded docs found") continue log.info(f"[{idx}/{total_personas}] → {codename} ({slug}): {len(new_docs)} docs to embed") # Use small batches for embedding — AnythingLLM hangs on large batches embed_batch = min(batch_size, 5) persona_ok = 0 persona_fail = 0 consecutive_fails = 0 lance_size_before = get_lancedb_size(slug) for bs in range(0, len(new_docs), embed_batch): batch = new_docs[bs:bs + embed_batch] batch_num = bs // embed_batch + 1 total_batches = (len(new_docs) + embed_batch - 1) // embed_batch log.debug(f" {codename} batch {batch_num}/{total_batches} ({len(batch)} docs)") # Each embed call gets 5 retries (Olla may route to instance without model) result = api_request(config, "post", f"/workspace/{slug}/update-embeddings", json={"adds": batch, "deletes": []}, timeout=300, retries=5) if not result: persona_fail += len(batch) consecutive_fails += 1 log.error(f" ✗ {codename} batch {batch_num}/{total_batches}: " f"FAILED after 5 retries") if consecutive_fails >= 5: log.error(f" ✗ {codename}: 5 consecutive failures, skipping") break time.sleep(delay) continue # Verify first batch: check LanceDB physically grew + vectors searchable if batch_num == 1: time.sleep(3) lance_size_after = get_lancedb_size(slug) lance_grew = lance_size_after > lance_size_before lance_ok, lance_msg = verify_lancedb_physical(slug) if not lance_ok or not lance_grew: log.error(f" ✗ {codename}: API returned 200 but LanceDB did NOT grow " f"(before={lance_size_before}, after={lance_size_after})") # Try query verification as second check vec_ok, vec_msg = verify_workspace_vectors(config, slug) if not vec_ok: log.error(f" ✗ {codename}: query verification also failed: {vec_msg}") if "not found" in vec_msg.lower() or "failed to embed" in vec_msg.lower(): log.error(f" ✗ ABORTING: Embedding model broken — " f"vectors NOT being created") save_progress(progress) return else: log.warning(f" ⚠ {codename}: LanceDB didn't grow but query works — " f"continuing cautiously") else: log.info(f" ✓ {codename}: first batch VERIFIED — " f"LanceDB {lance_msg}, grew {lance_size_after - lance_size_before} bytes") lance_size_before = lance_size_after # Periodic LanceDB growth check (every 10 batches) elif batch_num % 10 == 0: lance_size_now = get_lancedb_size(slug) if lance_size_now <= lance_size_before: log.warning(f" ⚠ {codename} batch {batch_num}: LanceDB NOT growing " f"({lance_size_before} → {lance_size_now})") consecutive_fails += 1 else: growth = lance_size_now - lance_size_before log.debug(f" {codename} batch {batch_num}: LanceDB +{growth} bytes") lance_size_before = lance_size_now consecutive_fails = 0 progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch) persona_ok += len(batch) consecutive_fails = 0 log.info(f" ✓ {codename} batch {batch_num}/{total_batches}: " f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})") save_progress(progress) if bs + embed_batch < len(new_docs): time.sleep(delay) # Final triple verification for this persona if persona_ok > 0: lance_ok, lance_msg = verify_lancedb_physical(slug) ws_doc_count = verify_workspace_docs_api(config, slug) vec_ok, vec_msg = verify_workspace_vectors(config, slug, test_query="bilgi") checks = [] if lance_ok: checks.append(f"LanceDB={lance_msg}") else: checks.append(f"LanceDB=FAIL({lance_msg})") if ws_doc_count is not None: checks.append(f"WorkspaceDocs={ws_doc_count}") if vec_ok: checks.append(f"Search={vec_msg}") else: checks.append(f"Search=FAIL({vec_msg})") all_ok = lance_ok and vec_ok icon = "✓" if all_ok else "⚠" level = "info" if all_ok else "warning" getattr(log, level)(f" {icon} {codename} FINAL: {' | '.join(checks)}") total_embedded += persona_ok total_failed += persona_fail if persona_ok > 0: verified_workspaces += 1 log.info(f" {codename} done: {persona_ok} ok, {persona_fail} failed ✓") elif persona_fail > 0: failed_verification.append(codename) log.error(f" {codename} done: {persona_ok} ok, {persona_fail} FAILED ✗") else: log.info(f" {codename} done: no docs") log.info(f"── Assignment complete ──") log.info(f" Embedded: {total_embedded}, Failed: {total_failed}") log.info(f" Verified: {verified_workspaces}/{total_personas}") if failed_verification: log.error(f" FAILED verification: {', '.join(failed_verification)}") def upload_documents(config, persona_list=None, priority_filter=None, dry_run=False, resume=False, max_size_mb=100): """Three-phase pipeline: text upload → OCR → OCR upload → assign.""" print("═══ Upload Pipeline ═══\n") if not check_api(config): print(" ✗ AnythingLLM API not reachable.") return book_library = Path(config["storage"]["book_library"]) batch_size = config["processing"]["batch_size"] delay = config["processing"]["delay_between_batches"] progress = load_progress() if resume else { "uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": [], } # Scan & classify print(" Scanning folders...\n") text_files, scanned_files, persona_folders, _ = collect_all_files( config, book_library, persona_list, priority_filter, max_size_mb, ) total_text = sum(len(v) for v in text_files.values()) total_scanned = sum(len(v) for v in scanned_files.values()) already = len(progress["uploaded_files"]) print(f"\n Text-based files: {total_text}") print(f" Scanned PDFs: {total_scanned}") print(f" Already uploaded: {already}") print(f" Personas: {len(persona_folders)}\n") if dry_run: for fn in sorted(set(list(text_files.keys()) + list(scanned_files.keys()))): t = len([f for f in text_files.get(fn, []) if str(f) not in progress["uploaded_files"]]) s = len([f for f in scanned_files.get(fn, []) if str(f) not in progress["ocr_done"]]) if t or s: print(f" {fn}: {t} text, {s} scanned") print(f"\n Personas:") for c, flds in sorted(persona_folders.items()): print(f" {c}: {', '.join(flds)}") return # ── Phase A: Upload text-based files ── print("══ Phase A: Upload text-based files ══\n") total_up = total_fail = 0 for fn, files in sorted(text_files.items()): up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay) total_up += up total_fail += fail print(f"\n Phase A done: {total_up} uploaded, {total_fail} failed\n") # ── Phase B: OCR scanned PDFs ── total_scanned_remaining = sum( 1 for files in scanned_files.values() for f in files if str(f) not in progress.get("ocr_done", []) ) if total_scanned_remaining > 0: print(f"══ Phase B: OCR {total_scanned_remaining} scanned PDFs ══\n") ocr_ok = ocr_fail = 0 for fn, files in sorted(scanned_files.items()): pending = [f for f in files if str(f) not in progress.get("ocr_done", [])] if not pending: continue print(f" → {fn}: {len(pending)} PDFs") for i, pdf in enumerate(pending): size_mb = pdf.stat().st_size / (1024 * 1024) print(f" [{i+1}/{len(pending)}] {pdf.name} ({size_mb:.1f}MB)", end="", flush=True) if ocr_pdf(pdf): progress.setdefault("ocr_done", []).append(str(pdf)) ocr_ok += 1 print(f" ✓") else: progress.setdefault("ocr_failed", []).append(str(pdf)) ocr_fail += 1 print(f" ✗") if (ocr_ok + ocr_fail) % 5 == 0: save_progress(progress) save_progress(progress) print(f"\n Phase B done: {ocr_ok} OCR'd, {ocr_fail} failed\n") # ── Phase C: Upload OCR'd files ── ocr_to_upload = {fn: [f for f in files if str(f) in progress.get("ocr_done", [])] for fn, files in scanned_files.items()} total_ocr_upload = sum( 1 for files in ocr_to_upload.values() for f in files if str(f) not in progress["uploaded_files"] ) if total_ocr_upload > 0: print(f"══ Phase C: Upload {total_ocr_upload} OCR'd files ══\n") total_up2 = total_fail2 = 0 for fn, files in sorted(ocr_to_upload.items()): if not files: continue up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay) total_up2 += up total_fail2 += fail print(f"\n Phase C done: {total_up2} uploaded, {total_fail2} failed\n") # ── Assign to workspaces ── assign_to_workspaces(config, persona_folders, progress, batch_size, delay) # ────────────────────────────────────────────────────────── # STATUS # ────────────────────────────────────────────────────────── def show_status(config): print("═══ Integration Status ═══\n") # Storage du = ANYTHINGLLM_STORAGE / "direct-uploads" if du.is_symlink(): print(f" ✓ direct-uploads/ → {du.resolve()} (HDD)") elif du.exists(): print(f" ⚠ direct-uploads/ on SSD — run --storage-setup") for d in ["documents", "lancedb", "vector-cache"]: p = ANYTHINGLLM_STORAGE / d if p.exists(): try: sz = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) / (1024**2) print(f" ● {d}/ ({sz:.0f} MB)") except Exception: print(f" ● {d}/") api_ok = check_api(config) if config["anythingllm"]["api_key"] else False collector_ok = check_collector_alive() print(f"\n API: {'✓' if api_ok else '✗'} Collector: {'✓' if collector_ok else '✗'}") progress = load_progress() uploaded = len(progress.get("uploaded_files", {})) ocr_done = len(progress.get("ocr_done", [])) assigned = len(progress.get("workspace_docs", {})) if uploaded or ocr_done: print(f"\n Uploaded: {uploaded} OCR'd: {ocr_done} Personas assigned: {assigned}") folders = {} for info in progress.get("uploaded_files", {}).values(): f = info.get("folder", "?") folders[f] = folders.get(f, 0) + 1 for f, c in sorted(folders.items(), key=lambda x: -x[1])[:20]: print(f" {c:4d} {f}") print() # ────────────────────────────────────────────────────────── # MAIN # ────────────────────────────────────────────────────────── def reassign_workspaces(config, persona_list=None, reset=False, dry_run=False): """Re-assign already-uploaded docs to workspaces without scanning/uploading. Skips the slow folder scan — uses upload_progress.json directly.""" log.info("═══ Re-assign Workspaces ═══") if not check_api(config): log.error("AnythingLLM API not reachable") return progress = load_progress() batch_size = config["processing"]["batch_size"] delay = config["processing"]["delay_between_batches"] if reset: if persona_list: for p in persona_list: progress.get("workspace_docs", {}).pop(p, None) log.info(f"Reset assignments for: {', '.join(persona_list)}") else: progress["workspace_docs"] = {} log.info("Reset all workspace assignments") save_progress(progress) # Build persona_folders from config (no disk scan needed) persona_folders = {} for codename, ws_config in config["workspaces"].items(): if persona_list and codename not in persona_list: continue persona_folders[codename] = [ entry["path"].replace("/", "_") for entry in ws_config.get("folders", []) ] uploaded = len(progress.get("uploaded_files", {})) log.info(f"Uploaded files in progress: {uploaded}") log.info(f"Personas to assign: {len(persona_folders)}") if dry_run: existing_ws = get_existing_workspaces(config) for codename, folders in sorted(persona_folders.items()): ws_name = config["workspaces"][codename]["name"] slug = existing_ws.get(ws_name, {}).get("slug", "?") doc_count = 0 for fn in folders: for info in progress.get("uploaded_files", {}).values(): if info.get("folder") == fn and info.get("location"): doc_count += 1 already = len(progress.get("workspace_docs", {}).get(codename, [])) log.info(f" {codename} ({slug}): {doc_count} docs, {already} already assigned") return assign_to_workspaces(config, persona_folders, progress, batch_size, delay) log.info("Re-assign complete.") def resolve_persona_list(args, config): """Resolve --persona / --cluster to a list of codenames.""" if args.persona: return [args.persona] if args.cluster: cl = CLUSTERS.get(args.cluster) if not cl: print(f"Unknown cluster: {args.cluster}") print(f"Available: {', '.join(CLUSTERS.keys())}") sys.exit(1) return cl return None # all def main(): parser = argparse.ArgumentParser(description="AnythingLLM × Persona Integration") parser.add_argument("--storage-setup", action="store_true") parser.add_argument("--create-workspaces", action="store_true") parser.add_argument("--upload-documents", action="store_true") parser.add_argument("--all", action="store_true", help="Run all steps") parser.add_argument("--reassign", action="store_true", help="Re-assign uploaded docs to workspaces (no scan/upload)") parser.add_argument("--reset", action="store_true", help="Reset assignment tracking before reassign") parser.add_argument("--status", action="store_true") parser.add_argument("--persona", type=str, help="Single persona filter") parser.add_argument("--cluster", type=str, help="Cluster filter: intel, cyber, military, humanities, engineering") parser.add_argument("--priority", type=int, help="Max priority (1=core)") parser.add_argument("--max-size", type=int, default=100, help="Max file MB (default: 100)") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--resume", action="store_true") parser.add_argument("--verbose", "-v", action="store_true", help="Debug-level console output") args = parser.parse_args() setup_logging(verbose=args.verbose) log.info(f"AnythingLLM Integration started — args: {vars(args)}") config = load_config() if not any([args.storage_setup, args.create_workspaces, args.upload_documents, args.reassign, args.all, args.status]): parser.print_help() return persona_list = resolve_persona_list(args, config) if args.status: show_status(config) return if args.storage_setup or args.all: storage_setup(config, dry_run=args.dry_run) if args.create_workspaces or args.all: create_workspaces(config, persona_list=persona_list, dry_run=args.dry_run) if args.reassign: reassign_workspaces(config, persona_list=persona_list, reset=args.reset, dry_run=args.dry_run) if args.upload_documents or args.all: upload_documents(config, persona_list=persona_list, priority_filter=args.priority, dry_run=args.dry_run, resume=args.resume or args.all, max_size_mb=args.max_size) if __name__ == "__main__": main()