Files
anything-llm-rag/setup.py
salvacybersec 9e9b75e0b3 Initial commit: AnythingLLM persona RAG integration
28 persona workspace with document upload, OCR pipeline, and vector embedding
assignment via AnythingLLM API. Supports 5 clusters (intel, cyber, military,
humanities, engineering) with batch processing and resume capability.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:07:44 +03:00

681 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
AnythingLLM × Persona Library Integration Setup
Three-phase pipeline:
Phase A: Upload all text-based files
Phase B: OCR all scanned PDFs in-place
Phase C: Upload newly OCR'd files + assign to workspaces
Usage:
python3 setup.py --storage-setup # Symlink direct-uploads/ to HDD
python3 setup.py --create-workspaces # Create workspaces + load persona prompts
python3 setup.py --upload-documents # Full pipeline: upload → OCR → upload → assign
python3 setup.py --persona frodo # Single persona
python3 setup.py --cluster intel # Intel cluster (frodo,echo,ghost,oracle,wraith,scribe,polyglot)
python3 setup.py --cluster cyber # Cyber cluster
python3 setup.py --cluster military # Military cluster
python3 setup.py --cluster humanities # Humanities cluster
python3 setup.py --cluster engineering # Engineering cluster
python3 setup.py --priority 1 # Only priority 1 (core) folders
python3 setup.py --dry-run # Preview
python3 setup.py --status # Show state
"""
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
import yaml
try:
import requests
except ImportError:
print("pip install requests pyyaml")
sys.exit(1)
CONFIG_PATH = Path(__file__).parent / "config.yaml"
PROGRESS_PATH = Path(__file__).parent / "upload_progress.json"
ANYTHINGLLM_STORAGE = Path.home() / ".config/anythingllm-desktop/storage"
SKIP_EXT = set()
CLUSTERS = {
"intel": ["frodo", "echo", "ghost", "oracle", "wraith", "scribe", "polyglot"],
"cyber": ["neo", "bastion", "sentinel", "specter", "phantom", "cipher", "vortex"],
"military": ["marshal", "centurion", "corsair", "warden", "medic"],
"humanities": ["chronos", "tribune", "arbiter", "ledger", "sage", "herald", "scholar", "gambit"],
"engineering": ["forge", "architect"],
}
def load_config():
with open(CONFIG_PATH) as f:
cfg = yaml.safe_load(f)
global SKIP_EXT
SKIP_EXT = set(cfg["processing"]["skip_extensions"])
return cfg
def load_progress():
if PROGRESS_PATH.exists():
with open(PROGRESS_PATH) as f:
return json.load(f)
return {"uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": []}
def save_progress(progress):
with open(PROGRESS_PATH, "w") as f:
json.dump(progress, f, indent=2, ensure_ascii=False)
# ──────────────────────────────────────────────────────────
# API
# ──────────────────────────────────────────────────────────
def api_request(config, method, endpoint, **kwargs):
url = f"{config['anythingllm']['base_url']}{endpoint}"
headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"}
if "json" in kwargs:
headers["Content-Type"] = "application/json"
resp = getattr(requests, method)(url, headers=headers, **kwargs)
if resp.status_code not in (200, 201):
print(f" API error {resp.status_code}: {resp.text[:300]}")
return None
return resp.json()
def api_upload(config, file_path, folder_name=None):
endpoint = f"/document/upload/{folder_name}" if folder_name else "/document/upload"
url = f"{config['anythingllm']['base_url']}{endpoint}"
headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"}
size_mb = file_path.stat().st_size / (1024 * 1024)
timeout = max(120, int(120 + (size_mb / 10) * 30))
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.name, f, "application/octet-stream")}
resp = requests.post(url, headers=headers, files=files, timeout=timeout)
if resp.status_code not in (200, 201):
return None, resp.text[:200]
data = resp.json()
if data.get("success") and data.get("documents"):
return data["documents"], None
return None, data.get("error", "Unknown error")
except requests.exceptions.Timeout:
return None, f"timeout ({timeout}s)"
except Exception as e:
return None, str(e)
def check_api(config):
try:
return api_request(config, "get", "/auth") is not None
except Exception:
return False
def check_collector_alive():
try:
return requests.get("http://127.0.0.1:8888", timeout=3).status_code == 200
except Exception:
return False
def wait_for_collector(max_wait=90):
print(" ⏳ waiting for collector...", end="", flush=True)
for i in range(max_wait):
if check_collector_alive():
print("")
return True
time.sleep(1)
if i % 10 == 9:
print(".", end="", flush=True)
print(" ✗ timeout")
return False
def get_existing_workspaces(config):
result = api_request(config, "get", "/workspaces")
if result and "workspaces" in result:
return {ws["name"]: ws for ws in result["workspaces"]}
return {}
# ──────────────────────────────────────────────────────────
# PDF DETECTION & OCR
# ──────────────────────────────────────────────────────────
def is_scanned_pdf(file_path):
"""Fast scan detection via pdffonts (~0.04s vs pdftotext ~2s).
No fonts embedded = scanned/image-only PDF."""
if file_path.suffix.lower() != ".pdf":
return False
try:
proc = subprocess.Popen(
["pdffonts", "-l", "3", str(file_path)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
try:
stdout, _ = proc.communicate(timeout=3)
lines = [l for l in stdout.decode(errors="ignore").strip().split("\n") if l.strip()]
return len(lines) <= 2
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
return False # assume text if slow
except Exception:
return False
def ocr_pdf(file_path, language="tur+eng", dpi=200):
"""OCR a scanned PDF in-place. Returns True on success."""
import tempfile
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".pdf", dir=file_path.parent)
os.close(tmp_fd)
tmp_path = Path(tmp_path)
try:
result = subprocess.run(
["ocrmypdf", "--skip-text", "--rotate-pages", "--deskew",
"--jobs", "4", "--image-dpi", str(dpi), "-l", language,
"--output-type", "pdf", "--quiet",
str(file_path), str(tmp_path)],
capture_output=True, text=True, timeout=600,
)
if result.returncode in (0, 6) and tmp_path.exists() and tmp_path.stat().st_size > 0:
tmp_path.replace(file_path)
return True
tmp_path.unlink(missing_ok=True)
return False
except Exception:
tmp_path.unlink(missing_ok=True)
return False
# ──────────────────────────────────────────────────────────
# STEP 1: Storage offload
# ──────────────────────────────────────────────────────────
def storage_setup(config, dry_run=False):
hdd_base = Path(config["storage"]["hdd_storage"])
src = ANYTHINGLLM_STORAGE / "direct-uploads"
dst = hdd_base / "direct-uploads"
print("═══ Storage: direct-uploads/ → HDD ═══\n")
if src.is_symlink():
print(f" ✓ already symlinked → {src.resolve()}")
return
if not dry_run:
dst.mkdir(parents=True, exist_ok=True)
if src.exists():
shutil.copytree(str(src), str(dst), dirs_exist_ok=True)
shutil.rmtree(src)
src.symlink_to(dst)
print(f" ✓ done\n")
# ──────────────────────────────────────────────────────────
# STEP 2: Create workspaces + load prompts
# ──────────────────────────────────────────────────────────
def extract_system_prompt(config, persona_file):
personas_dir = Path(config["storage"]["personas_dir"])
fp = personas_dir / persona_file
if not fp.exists():
alt = personas_dir / persona_file.replace("/general.md", "") / "_meta.yaml"
fp = alt if alt.exists() else None
if not fp:
return None
content = fp.read_text(encoding="utf-8")
if fp.suffix == ".yaml":
meta = yaml.safe_load(content)
return meta.get("system_prompt", meta.get("description", ""))
parts = content.split("---")
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1])
except yaml.YAMLError:
fm = {}
tone = fm.get("tone", "")
body = "---".join(parts[2:]).strip()
return f"Tone: {tone}\n\n{body}" if tone else body
return content
def create_workspaces(config, persona_list=None, dry_run=False):
print("═══ Creating Workspaces ═══\n")
if not config["anythingllm"]["api_key"]:
print(" ✗ API key not set!")
return
existing = get_existing_workspaces(config)
created = skipped = 0
for codename, ws_config in config["workspaces"].items():
if persona_list and codename not in persona_list:
continue
name = ws_config["name"]
persona_file = ws_config.get("persona_file", "")
system_prompt = extract_system_prompt(config, persona_file) if persona_file else ""
if name in existing:
# Update prompt if workspace exists
slug = existing[name].get("slug", "?")
if system_prompt and not dry_run:
api_request(config, "post", f"/workspace/{slug}/update",
json={"openAiPrompt": system_prompt})
print(f"{codename} (prompt: {len(system_prompt or '')} chars)")
skipped += 1
continue
print(f"{codename}: creating '{name}'")
if not dry_run:
result = api_request(config, "post", "/workspace/new", json={"name": name})
if result:
slug = result.get("workspace", {}).get("slug", "?")
if system_prompt:
api_request(config, "post", f"/workspace/{slug}/update",
json={"openAiPrompt": system_prompt})
print(f" ✓ created + prompt ({len(system_prompt)} chars)")
created += 1
else:
print(f" ✗ failed")
print(f"\n Created: {created}, Updated: {skipped}\n")
# ──────────────────────────────────────────────────────────
# STEP 3: Three-phase upload pipeline
# ──────────────────────────────────────────────────────────
def collect_all_files(config, book_library, persona_list=None, priority_filter=None, max_size_mb=100):
"""Scan all folders and classify files as text-based or scanned."""
text_files = {} # folder_name → [paths]
scanned_files = {} # folder_name → [paths]
persona_folders = {} # codename → [folder_names]
folder_to_path = {} # folder_name → source path
for codename, ws_config in config["workspaces"].items():
if persona_list and codename not in persona_list:
continue
persona_folders[codename] = []
for entry in ws_config.get("folders", []):
folder_path = entry["path"]
priority = entry.get("priority", 2)
if priority_filter and priority > priority_filter:
continue
folder_name = folder_path.replace("/", "_")
persona_folders[codename].append(folder_name)
if folder_name in text_files:
continue # already scanned
src = book_library / folder_path
folder_to_path[folder_name] = src
text_files[folder_name] = []
scanned_files[folder_name] = []
if not src.exists():
print(f"{folder_path} not found")
continue
# Use find with -printf for speed on HDD (one syscall, no per-file stat)
try:
find_result = subprocess.run(
["find", str(src), "-type", "f", "-not", "-empty",
"-printf", "%s %p\n"],
capture_output=True, text=True, timeout=120,
)
all_files = []
max_bytes = (max_size_mb or 9999) * 1024 * 1024
for line in find_result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split(" ", 1)
if len(parts) != 2:
continue
size, path = int(parts[0]), Path(parts[1])
if path.suffix.lower() not in SKIP_EXT and size <= max_bytes:
all_files.append(path)
all_files.sort()
except Exception:
all_files = sorted(
f for f in src.rglob("*")
if f.is_file() and f.suffix.lower() not in SKIP_EXT
and f.stat().st_size > 0
and (not max_size_mb or f.stat().st_size / (1024*1024) <= max_size_mb)
)
print(f" {folder_name}: {len(all_files)} files found", flush=True)
# Classify every file (pdffonts is fast: ~0.04s per file)
for i, f in enumerate(all_files):
if is_scanned_pdf(f):
scanned_files[folder_name].append(f)
else:
text_files[folder_name].append(f)
if (i + 1) % 500 == 0:
print(f" {folder_name}: {i+1}/{len(all_files)} classified...", flush=True)
t = len(text_files[folder_name])
s = len(scanned_files[folder_name])
print(f" {folder_name}: {t} text, {s} scanned")
return text_files, scanned_files, persona_folders, folder_to_path
def upload_file_batch(config, folder_name, files, progress, batch_size, delay):
"""Upload a list of files to a folder. Returns (uploaded, failed) counts."""
uploaded = failed = 0
new_files = [f for f in files if str(f) not in progress["uploaded_files"]]
if not new_files:
return 0, 0
print(f"{folder_name}: {len(new_files)} files")
for i, fp in enumerate(new_files):
if uploaded > 0 and uploaded % batch_size == 0:
print(f" ⏸ batch pause ({delay}s)...")
time.sleep(delay)
size_mb = fp.stat().st_size / (1024 * 1024)
print(f" [{i+1}/{len(new_files)}] {fp.name} ({size_mb:.1f}MB)", end="", flush=True)
if not check_collector_alive():
print(f" ⚠ collector down")
if not wait_for_collector():
print(" ✗ stopping — restart AnythingLLM and --resume")
save_progress(progress)
return uploaded, failed
docs, error = None, None
for attempt in range(3):
docs, error = api_upload(config, fp, folder_name=folder_name)
if docs:
break
if error and "not online" in str(error):
print(f"", end="", flush=True)
time.sleep(5)
if not wait_for_collector():
save_progress(progress)
return uploaded, failed
elif attempt < 2:
time.sleep(2)
print(f"", end="", flush=True)
if docs:
progress["uploaded_files"][str(fp)] = {
"location": docs[0].get("location", ""),
"folder": folder_name,
"name": docs[0].get("name", ""),
}
uploaded += 1
print(f"")
else:
failed += 1
progress.setdefault("failed_files", {})[str(fp)] = {
"folder": folder_name, "error": str(error)[:200],
}
print(f"{error}")
if uploaded % 10 == 0:
save_progress(progress)
save_progress(progress)
return uploaded, failed
def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
"""Phase C2: assign uploaded docs to persona workspaces."""
print("── Assigning to workspaces ──\n")
existing_ws = get_existing_workspaces(config)
for codename, folders in sorted(persona_folders.items()):
ws_name = config["workspaces"][codename]["name"]
ws_info = existing_ws.get(ws_name)
if not ws_info:
continue
slug = ws_info["slug"]
doc_locs = []
for fn in folders:
for fpath, info in progress["uploaded_files"].items():
if info.get("folder") == fn and info.get("location"):
doc_locs.append(info["location"])
already = set(progress.get("workspace_docs", {}).get(codename, []))
new_docs = [loc for loc in doc_locs if loc not in already]
if not new_docs:
if doc_locs:
print(f"{codename}: {len(doc_locs)} docs assigned")
continue
print(f"{codename} ({slug}): {len(new_docs)} docs")
for bs in range(0, len(new_docs), batch_size):
batch = new_docs[bs:bs + batch_size]
result = api_request(config, "post", f"/workspace/{slug}/update-embeddings",
json={"adds": batch, "deletes": []})
if result:
progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch)
print(f"{len(batch)} docs embedded")
else:
print(f" ✗ batch failed")
if bs + batch_size < len(new_docs):
time.sleep(delay)
save_progress(progress)
print()
def upload_documents(config, persona_list=None, priority_filter=None,
dry_run=False, resume=False, max_size_mb=100):
"""Three-phase pipeline: text upload → OCR → OCR upload → assign."""
print("═══ Upload Pipeline ═══\n")
if not check_api(config):
print(" ✗ AnythingLLM API not reachable.")
return
book_library = Path(config["storage"]["book_library"])
batch_size = config["processing"]["batch_size"]
delay = config["processing"]["delay_between_batches"]
progress = load_progress() if resume else {
"uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": [],
}
# Scan & classify
print(" Scanning folders...\n")
text_files, scanned_files, persona_folders, _ = collect_all_files(
config, book_library, persona_list, priority_filter, max_size_mb,
)
total_text = sum(len(v) for v in text_files.values())
total_scanned = sum(len(v) for v in scanned_files.values())
already = len(progress["uploaded_files"])
print(f"\n Text-based files: {total_text}")
print(f" Scanned PDFs: {total_scanned}")
print(f" Already uploaded: {already}")
print(f" Personas: {len(persona_folders)}\n")
if dry_run:
for fn in sorted(set(list(text_files.keys()) + list(scanned_files.keys()))):
t = len([f for f in text_files.get(fn, []) if str(f) not in progress["uploaded_files"]])
s = len([f for f in scanned_files.get(fn, []) if str(f) not in progress["ocr_done"]])
if t or s:
print(f" {fn}: {t} text, {s} scanned")
print(f"\n Personas:")
for c, flds in sorted(persona_folders.items()):
print(f" {c}: {', '.join(flds)}")
return
# ── Phase A: Upload text-based files ──
print("══ Phase A: Upload text-based files ══\n")
total_up = total_fail = 0
for fn, files in sorted(text_files.items()):
up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay)
total_up += up
total_fail += fail
print(f"\n Phase A done: {total_up} uploaded, {total_fail} failed\n")
# ── Phase B: OCR scanned PDFs ──
total_scanned_remaining = sum(
1 for files in scanned_files.values()
for f in files if str(f) not in progress.get("ocr_done", [])
)
if total_scanned_remaining > 0:
print(f"══ Phase B: OCR {total_scanned_remaining} scanned PDFs ══\n")
ocr_ok = ocr_fail = 0
for fn, files in sorted(scanned_files.items()):
pending = [f for f in files if str(f) not in progress.get("ocr_done", [])]
if not pending:
continue
print(f"{fn}: {len(pending)} PDFs")
for i, pdf in enumerate(pending):
size_mb = pdf.stat().st_size / (1024 * 1024)
print(f" [{i+1}/{len(pending)}] {pdf.name} ({size_mb:.1f}MB)", end="", flush=True)
if ocr_pdf(pdf):
progress.setdefault("ocr_done", []).append(str(pdf))
ocr_ok += 1
print(f"")
else:
progress.setdefault("ocr_failed", []).append(str(pdf))
ocr_fail += 1
print(f"")
if (ocr_ok + ocr_fail) % 5 == 0:
save_progress(progress)
save_progress(progress)
print(f"\n Phase B done: {ocr_ok} OCR'd, {ocr_fail} failed\n")
# ── Phase C: Upload OCR'd files ──
ocr_to_upload = {fn: [f for f in files if str(f) in progress.get("ocr_done", [])]
for fn, files in scanned_files.items()}
total_ocr_upload = sum(
1 for files in ocr_to_upload.values()
for f in files if str(f) not in progress["uploaded_files"]
)
if total_ocr_upload > 0:
print(f"══ Phase C: Upload {total_ocr_upload} OCR'd files ══\n")
total_up2 = total_fail2 = 0
for fn, files in sorted(ocr_to_upload.items()):
if not files:
continue
up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay)
total_up2 += up
total_fail2 += fail
print(f"\n Phase C done: {total_up2} uploaded, {total_fail2} failed\n")
# ── Assign to workspaces ──
assign_to_workspaces(config, persona_folders, progress, batch_size, delay)
# ──────────────────────────────────────────────────────────
# STATUS
# ──────────────────────────────────────────────────────────
def show_status(config):
print("═══ Integration Status ═══\n")
# Storage
du = ANYTHINGLLM_STORAGE / "direct-uploads"
if du.is_symlink():
print(f" ✓ direct-uploads/ → {du.resolve()} (HDD)")
elif du.exists():
print(f" ⚠ direct-uploads/ on SSD — run --storage-setup")
for d in ["documents", "lancedb", "vector-cache"]:
p = ANYTHINGLLM_STORAGE / d
if p.exists():
try:
sz = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) / (1024**2)
print(f"{d}/ ({sz:.0f} MB)")
except Exception:
print(f"{d}/")
api_ok = check_api(config) if config["anythingllm"]["api_key"] else False
collector_ok = check_collector_alive()
print(f"\n API: {'' if api_ok else ''} Collector: {'' if collector_ok else ''}")
progress = load_progress()
uploaded = len(progress.get("uploaded_files", {}))
ocr_done = len(progress.get("ocr_done", []))
assigned = len(progress.get("workspace_docs", {}))
if uploaded or ocr_done:
print(f"\n Uploaded: {uploaded} OCR'd: {ocr_done} Personas assigned: {assigned}")
folders = {}
for info in progress.get("uploaded_files", {}).values():
f = info.get("folder", "?")
folders[f] = folders.get(f, 0) + 1
for f, c in sorted(folders.items(), key=lambda x: -x[1])[:20]:
print(f" {c:4d} {f}")
print()
# ──────────────────────────────────────────────────────────
# MAIN
# ──────────────────────────────────────────────────────────
def resolve_persona_list(args, config):
"""Resolve --persona / --cluster to a list of codenames."""
if args.persona:
return [args.persona]
if args.cluster:
cl = CLUSTERS.get(args.cluster)
if not cl:
print(f"Unknown cluster: {args.cluster}")
print(f"Available: {', '.join(CLUSTERS.keys())}")
sys.exit(1)
return cl
return None # all
def main():
parser = argparse.ArgumentParser(description="AnythingLLM × Persona Integration")
parser.add_argument("--storage-setup", action="store_true")
parser.add_argument("--create-workspaces", action="store_true")
parser.add_argument("--upload-documents", action="store_true")
parser.add_argument("--all", action="store_true", help="Run all steps")
parser.add_argument("--status", action="store_true")
parser.add_argument("--persona", type=str, help="Single persona filter")
parser.add_argument("--cluster", type=str, help="Cluster filter: intel, cyber, military, humanities, engineering")
parser.add_argument("--priority", type=int, help="Max priority (1=core)")
parser.add_argument("--max-size", type=int, default=100, help="Max file MB (default: 100)")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--resume", action="store_true")
args = parser.parse_args()
config = load_config()
if not any([args.storage_setup, args.create_workspaces,
args.upload_documents, args.all, args.status]):
parser.print_help()
return
persona_list = resolve_persona_list(args, config)
if args.status:
show_status(config)
return
if args.storage_setup or args.all:
storage_setup(config, dry_run=args.dry_run)
if args.create_workspaces or args.all:
create_workspaces(config, persona_list=persona_list, dry_run=args.dry_run)
if args.upload_documents or args.all:
upload_documents(config, persona_list=persona_list,
priority_filter=args.priority,
dry_run=args.dry_run,
resume=args.resume or args.all,
max_size_mb=args.max_size)
if __name__ == "__main__":
main()