- Reduce embed batch to 5 — AnythingLLM hangs on batches >10 - Fix check_script_running() to properly detect setup.py process (was returning false because pgrep matched monitor.py too) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
840 lines
34 KiB
Python
840 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
AnythingLLM × Persona Library Integration Setup
|
||
|
||
Three-phase pipeline:
|
||
Phase A: Upload all text-based files
|
||
Phase B: OCR all scanned PDFs in-place
|
||
Phase C: Upload newly OCR'd files + assign to workspaces
|
||
|
||
Usage:
|
||
python3 setup.py --storage-setup # Symlink direct-uploads/ to HDD
|
||
python3 setup.py --create-workspaces # Create workspaces + load persona prompts
|
||
python3 setup.py --upload-documents # Full pipeline: upload → OCR → upload → assign
|
||
python3 setup.py --reassign # Re-assign existing docs to workspaces (no scan/upload)
|
||
python3 setup.py --reassign --reset # Reset assignment tracking + re-assign all
|
||
python3 setup.py --persona frodo # Single persona
|
||
python3 setup.py --cluster intel # Intel cluster (frodo,echo,ghost,oracle,wraith,scribe,polyglot)
|
||
python3 setup.py --cluster cyber # Cyber cluster
|
||
python3 setup.py --cluster military # Military cluster
|
||
python3 setup.py --cluster humanities # Humanities cluster
|
||
python3 setup.py --cluster engineering # Engineering cluster
|
||
python3 setup.py --priority 1 # Only priority 1 (core) folders
|
||
python3 setup.py --dry-run # Preview
|
||
python3 setup.py --status # Show state
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import yaml
|
||
|
||
try:
|
||
import requests
|
||
except ImportError:
|
||
print("pip install requests pyyaml")
|
||
sys.exit(1)
|
||
|
||
|
||
CONFIG_PATH = Path(__file__).parent / "config.yaml"
|
||
PROGRESS_PATH = Path(__file__).parent / "upload_progress.json"
|
||
LOG_PATH = Path(__file__).parent / "setup.log"
|
||
ANYTHINGLLM_STORAGE = Path.home() / ".config/anythingllm-desktop/storage"
|
||
SKIP_EXT = set()
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# LOGGING
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
log = logging.getLogger("anythingllm")
|
||
|
||
|
||
def setup_logging(verbose=False):
|
||
log.setLevel(logging.DEBUG)
|
||
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
|
||
|
||
# File handler — always debug level
|
||
fh = logging.FileHandler(LOG_PATH, encoding="utf-8")
|
||
fh.setLevel(logging.DEBUG)
|
||
fh.setFormatter(fmt)
|
||
log.addHandler(fh)
|
||
|
||
# Console handler — info or debug based on --verbose
|
||
ch = logging.StreamHandler(sys.stdout)
|
||
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
|
||
ch.setFormatter(fmt)
|
||
log.addHandler(ch)
|
||
|
||
# Flush on every write (critical for background/piped execution)
|
||
for h in log.handlers:
|
||
if hasattr(h, 'stream'):
|
||
h.stream = os.fdopen(os.dup(h.stream.fileno()), 'w', buffering=1)
|
||
|
||
|
||
def log_print(msg, level="info"):
|
||
"""Log and print (backward compat for functions that still use print)."""
|
||
getattr(log, level)(msg)
|
||
|
||
CLUSTERS = {
|
||
"intel": ["frodo", "echo", "ghost", "oracle", "wraith", "scribe", "polyglot"],
|
||
"cyber": ["neo", "bastion", "sentinel", "specter", "phantom", "cipher", "vortex"],
|
||
"military": ["marshal", "centurion", "corsair", "warden", "medic"],
|
||
"humanities": ["chronos", "tribune", "arbiter", "ledger", "sage", "herald", "scholar", "gambit"],
|
||
"engineering": ["forge", "architect"],
|
||
}
|
||
|
||
|
||
def load_config():
|
||
with open(CONFIG_PATH) as f:
|
||
cfg = yaml.safe_load(f)
|
||
global SKIP_EXT
|
||
SKIP_EXT = set(cfg["processing"]["skip_extensions"])
|
||
return cfg
|
||
|
||
|
||
def load_progress():
|
||
if PROGRESS_PATH.exists():
|
||
with open(PROGRESS_PATH) as f:
|
||
return json.load(f)
|
||
return {"uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": []}
|
||
|
||
|
||
def save_progress(progress):
|
||
with open(PROGRESS_PATH, "w") as f:
|
||
json.dump(progress, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# API
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def api_request(config, method, endpoint, timeout=120, retries=3, **kwargs):
|
||
url = f"{config['anythingllm']['base_url']}{endpoint}"
|
||
headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"}
|
||
if "json" in kwargs:
|
||
headers["Content-Type"] = "application/json"
|
||
|
||
for attempt in range(retries):
|
||
try:
|
||
log.debug(f"API {method.upper()} {endpoint} (attempt {attempt+1})")
|
||
resp = getattr(requests, method)(url, headers=headers, timeout=timeout, **kwargs)
|
||
if resp.status_code not in (200, 201):
|
||
log.error(f"API {resp.status_code}: {resp.text[:300]}")
|
||
if attempt < retries - 1:
|
||
time.sleep(3)
|
||
continue
|
||
return None
|
||
log.debug(f"API {method.upper()} {endpoint} → {resp.status_code}")
|
||
return resp.json()
|
||
except requests.exceptions.Timeout:
|
||
log.warning(f"API timeout ({timeout}s) on {endpoint} (attempt {attempt+1}/{retries})")
|
||
if attempt < retries - 1:
|
||
time.sleep(5)
|
||
except requests.exceptions.ConnectionError as e:
|
||
log.error(f"API connection error: {e}")
|
||
if attempt < retries - 1:
|
||
time.sleep(5)
|
||
except Exception as e:
|
||
log.error(f"API unexpected error: {e}")
|
||
return None
|
||
return None
|
||
|
||
|
||
def api_upload(config, file_path, folder_name=None):
|
||
endpoint = f"/document/upload/{folder_name}" if folder_name else "/document/upload"
|
||
url = f"{config['anythingllm']['base_url']}{endpoint}"
|
||
headers = {"Authorization": f"Bearer {config['anythingllm']['api_key']}"}
|
||
size_mb = file_path.stat().st_size / (1024 * 1024)
|
||
timeout = max(120, int(120 + (size_mb / 10) * 30))
|
||
try:
|
||
with open(file_path, "rb") as f:
|
||
files = {"file": (file_path.name, f, "application/octet-stream")}
|
||
resp = requests.post(url, headers=headers, files=files, timeout=timeout)
|
||
if resp.status_code not in (200, 201):
|
||
return None, resp.text[:200]
|
||
data = resp.json()
|
||
if data.get("success") and data.get("documents"):
|
||
return data["documents"], None
|
||
return None, data.get("error", "Unknown error")
|
||
except requests.exceptions.Timeout:
|
||
return None, f"timeout ({timeout}s)"
|
||
except Exception as e:
|
||
return None, str(e)
|
||
|
||
|
||
def check_api(config):
|
||
try:
|
||
return api_request(config, "get", "/auth") is not None
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def check_collector_alive():
|
||
try:
|
||
return requests.get("http://127.0.0.1:8888", timeout=3).status_code == 200
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def wait_for_collector(max_wait=90):
|
||
print(" ⏳ waiting for collector...", end="", flush=True)
|
||
for i in range(max_wait):
|
||
if check_collector_alive():
|
||
print(" ✓")
|
||
return True
|
||
time.sleep(1)
|
||
if i % 10 == 9:
|
||
print(".", end="", flush=True)
|
||
print(" ✗ timeout")
|
||
return False
|
||
|
||
|
||
def get_existing_workspaces(config):
|
||
result = api_request(config, "get", "/workspaces")
|
||
if result and "workspaces" in result:
|
||
return {ws["name"]: ws for ws in result["workspaces"]}
|
||
return {}
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# PDF DETECTION & OCR
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def is_scanned_pdf(file_path):
|
||
"""Fast scan detection via pdffonts (~0.04s vs pdftotext ~2s).
|
||
No fonts embedded = scanned/image-only PDF."""
|
||
if file_path.suffix.lower() != ".pdf":
|
||
return False
|
||
try:
|
||
proc = subprocess.Popen(
|
||
["pdffonts", "-l", "3", str(file_path)],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||
)
|
||
try:
|
||
stdout, _ = proc.communicate(timeout=3)
|
||
lines = [l for l in stdout.decode(errors="ignore").strip().split("\n") if l.strip()]
|
||
return len(lines) <= 2
|
||
except subprocess.TimeoutExpired:
|
||
proc.kill()
|
||
proc.wait()
|
||
return False # assume text if slow
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def ocr_pdf(file_path, language="tur+eng", dpi=200):
|
||
"""OCR a scanned PDF in-place. Returns True on success."""
|
||
import tempfile
|
||
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".pdf", dir=file_path.parent)
|
||
os.close(tmp_fd)
|
||
tmp_path = Path(tmp_path)
|
||
try:
|
||
result = subprocess.run(
|
||
["ocrmypdf", "--skip-text", "--rotate-pages", "--deskew",
|
||
"--jobs", "4", "--image-dpi", str(dpi), "-l", language,
|
||
"--output-type", "pdf", "--quiet",
|
||
str(file_path), str(tmp_path)],
|
||
capture_output=True, text=True, timeout=600,
|
||
)
|
||
if result.returncode in (0, 6) and tmp_path.exists() and tmp_path.stat().st_size > 0:
|
||
tmp_path.replace(file_path)
|
||
return True
|
||
tmp_path.unlink(missing_ok=True)
|
||
return False
|
||
except Exception:
|
||
tmp_path.unlink(missing_ok=True)
|
||
return False
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# STEP 1: Storage offload
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def storage_setup(config, dry_run=False):
|
||
hdd_base = Path(config["storage"]["hdd_storage"])
|
||
src = ANYTHINGLLM_STORAGE / "direct-uploads"
|
||
dst = hdd_base / "direct-uploads"
|
||
|
||
print("═══ Storage: direct-uploads/ → HDD ═══\n")
|
||
if src.is_symlink():
|
||
print(f" ✓ already symlinked → {src.resolve()}")
|
||
return
|
||
if not dry_run:
|
||
dst.mkdir(parents=True, exist_ok=True)
|
||
if src.exists():
|
||
shutil.copytree(str(src), str(dst), dirs_exist_ok=True)
|
||
shutil.rmtree(src)
|
||
src.symlink_to(dst)
|
||
print(f" ✓ done\n")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# STEP 2: Create workspaces + load prompts
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def extract_system_prompt(config, persona_file):
|
||
personas_dir = Path(config["storage"]["personas_dir"])
|
||
fp = personas_dir / persona_file
|
||
if not fp.exists():
|
||
alt = personas_dir / persona_file.replace("/general.md", "") / "_meta.yaml"
|
||
fp = alt if alt.exists() else None
|
||
if not fp:
|
||
return None
|
||
|
||
content = fp.read_text(encoding="utf-8")
|
||
if fp.suffix == ".yaml":
|
||
meta = yaml.safe_load(content)
|
||
return meta.get("system_prompt", meta.get("description", ""))
|
||
|
||
parts = content.split("---")
|
||
if len(parts) >= 3:
|
||
try:
|
||
fm = yaml.safe_load(parts[1])
|
||
except yaml.YAMLError:
|
||
fm = {}
|
||
tone = fm.get("tone", "")
|
||
body = "---".join(parts[2:]).strip()
|
||
return f"Tone: {tone}\n\n{body}" if tone else body
|
||
return content
|
||
|
||
|
||
def create_workspaces(config, persona_list=None, dry_run=False):
|
||
print("═══ Creating Workspaces ═══\n")
|
||
if not config["anythingllm"]["api_key"]:
|
||
print(" ✗ API key not set!")
|
||
return
|
||
|
||
existing = get_existing_workspaces(config)
|
||
created = skipped = 0
|
||
|
||
for codename, ws_config in config["workspaces"].items():
|
||
if persona_list and codename not in persona_list:
|
||
continue
|
||
name = ws_config["name"]
|
||
persona_file = ws_config.get("persona_file", "")
|
||
system_prompt = extract_system_prompt(config, persona_file) if persona_file else ""
|
||
|
||
if name in existing:
|
||
# Update prompt if workspace exists
|
||
slug = existing[name].get("slug", "?")
|
||
if system_prompt and not dry_run:
|
||
api_request(config, "post", f"/workspace/{slug}/update",
|
||
json={"openAiPrompt": system_prompt})
|
||
print(f" ✓ {codename} (prompt: {len(system_prompt or '')} chars)")
|
||
skipped += 1
|
||
continue
|
||
|
||
print(f" → {codename}: creating '{name}'")
|
||
if not dry_run:
|
||
result = api_request(config, "post", "/workspace/new", json={"name": name})
|
||
if result:
|
||
slug = result.get("workspace", {}).get("slug", "?")
|
||
if system_prompt:
|
||
api_request(config, "post", f"/workspace/{slug}/update",
|
||
json={"openAiPrompt": system_prompt})
|
||
print(f" ✓ created + prompt ({len(system_prompt)} chars)")
|
||
created += 1
|
||
else:
|
||
print(f" ✗ failed")
|
||
|
||
print(f"\n Created: {created}, Updated: {skipped}\n")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# STEP 3: Three-phase upload pipeline
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def collect_all_files(config, book_library, persona_list=None, priority_filter=None, max_size_mb=100):
|
||
"""Scan all folders and classify files as text-based or scanned."""
|
||
text_files = {} # folder_name → [paths]
|
||
scanned_files = {} # folder_name → [paths]
|
||
persona_folders = {} # codename → [folder_names]
|
||
folder_to_path = {} # folder_name → source path
|
||
|
||
for codename, ws_config in config["workspaces"].items():
|
||
if persona_list and codename not in persona_list:
|
||
continue
|
||
persona_folders[codename] = []
|
||
|
||
for entry in ws_config.get("folders", []):
|
||
folder_path = entry["path"]
|
||
priority = entry.get("priority", 2)
|
||
if priority_filter and priority > priority_filter:
|
||
continue
|
||
|
||
folder_name = folder_path.replace("/", "_")
|
||
persona_folders[codename].append(folder_name)
|
||
|
||
if folder_name in text_files:
|
||
continue # already scanned
|
||
|
||
src = book_library / folder_path
|
||
folder_to_path[folder_name] = src
|
||
text_files[folder_name] = []
|
||
scanned_files[folder_name] = []
|
||
|
||
if not src.exists():
|
||
print(f" ⚠ {folder_path} not found")
|
||
continue
|
||
|
||
# Use find with -printf for speed on HDD (one syscall, no per-file stat)
|
||
try:
|
||
find_result = subprocess.run(
|
||
["find", str(src), "-type", "f", "-not", "-empty",
|
||
"-printf", "%s %p\n"],
|
||
capture_output=True, text=True, timeout=120,
|
||
)
|
||
all_files = []
|
||
max_bytes = (max_size_mb or 9999) * 1024 * 1024
|
||
for line in find_result.stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
parts = line.split(" ", 1)
|
||
if len(parts) != 2:
|
||
continue
|
||
size, path = int(parts[0]), Path(parts[1])
|
||
if path.suffix.lower() not in SKIP_EXT and size <= max_bytes:
|
||
all_files.append(path)
|
||
all_files.sort()
|
||
except Exception:
|
||
all_files = sorted(
|
||
f for f in src.rglob("*")
|
||
if f.is_file() and f.suffix.lower() not in SKIP_EXT
|
||
and f.stat().st_size > 0
|
||
and (not max_size_mb or f.stat().st_size / (1024*1024) <= max_size_mb)
|
||
)
|
||
print(f" {folder_name}: {len(all_files)} files found", flush=True)
|
||
|
||
# Classify every file (pdffonts is fast: ~0.04s per file)
|
||
for i, f in enumerate(all_files):
|
||
if is_scanned_pdf(f):
|
||
scanned_files[folder_name].append(f)
|
||
else:
|
||
text_files[folder_name].append(f)
|
||
if (i + 1) % 500 == 0:
|
||
print(f" {folder_name}: {i+1}/{len(all_files)} classified...", flush=True)
|
||
|
||
t = len(text_files[folder_name])
|
||
s = len(scanned_files[folder_name])
|
||
print(f" {folder_name}: {t} text, {s} scanned")
|
||
|
||
return text_files, scanned_files, persona_folders, folder_to_path
|
||
|
||
|
||
def upload_file_batch(config, folder_name, files, progress, batch_size, delay):
|
||
"""Upload a list of files to a folder. Returns (uploaded, failed) counts."""
|
||
uploaded = failed = 0
|
||
new_files = [f for f in files if str(f) not in progress["uploaded_files"]]
|
||
if not new_files:
|
||
return 0, 0
|
||
|
||
print(f" → {folder_name}: {len(new_files)} files")
|
||
|
||
for i, fp in enumerate(new_files):
|
||
if uploaded > 0 and uploaded % batch_size == 0:
|
||
print(f" ⏸ batch pause ({delay}s)...")
|
||
time.sleep(delay)
|
||
|
||
size_mb = fp.stat().st_size / (1024 * 1024)
|
||
print(f" [{i+1}/{len(new_files)}] {fp.name} ({size_mb:.1f}MB)", end="", flush=True)
|
||
|
||
if not check_collector_alive():
|
||
print(f" ⚠ collector down")
|
||
if not wait_for_collector():
|
||
print(" ✗ stopping — restart AnythingLLM and --resume")
|
||
save_progress(progress)
|
||
return uploaded, failed
|
||
|
||
docs, error = None, None
|
||
for attempt in range(3):
|
||
docs, error = api_upload(config, fp, folder_name=folder_name)
|
||
if docs:
|
||
break
|
||
if error and "not online" in str(error):
|
||
print(f" ↻", end="", flush=True)
|
||
time.sleep(5)
|
||
if not wait_for_collector():
|
||
save_progress(progress)
|
||
return uploaded, failed
|
||
elif attempt < 2:
|
||
time.sleep(2)
|
||
print(f" ↻", end="", flush=True)
|
||
|
||
if docs:
|
||
progress["uploaded_files"][str(fp)] = {
|
||
"location": docs[0].get("location", ""),
|
||
"folder": folder_name,
|
||
"name": docs[0].get("name", ""),
|
||
}
|
||
uploaded += 1
|
||
print(f" ✓")
|
||
else:
|
||
failed += 1
|
||
progress.setdefault("failed_files", {})[str(fp)] = {
|
||
"folder": folder_name, "error": str(error)[:200],
|
||
}
|
||
print(f" ✗ {error}")
|
||
|
||
if uploaded % 10 == 0:
|
||
save_progress(progress)
|
||
|
||
save_progress(progress)
|
||
return uploaded, failed
|
||
|
||
|
||
def assign_to_workspaces(config, persona_folders, progress, batch_size, delay):
|
||
"""Phase C2: assign uploaded docs to persona workspaces."""
|
||
log.info("── Assigning to workspaces ──")
|
||
existing_ws = get_existing_workspaces(config)
|
||
|
||
if not existing_ws:
|
||
log.error("Could not fetch workspaces from API")
|
||
return
|
||
|
||
total_personas = len(persona_folders)
|
||
total_embedded = 0
|
||
total_failed = 0
|
||
|
||
for idx, (codename, folders) in enumerate(sorted(persona_folders.items()), 1):
|
||
ws_name = config["workspaces"][codename]["name"]
|
||
ws_info = existing_ws.get(ws_name)
|
||
if not ws_info:
|
||
log.warning(f"[{idx}/{total_personas}] {codename}: workspace '{ws_name}' not found, skipping")
|
||
continue
|
||
|
||
slug = ws_info["slug"]
|
||
doc_locs = []
|
||
for fn in folders:
|
||
folder_docs = 0
|
||
for fpath, info in progress["uploaded_files"].items():
|
||
if info.get("folder") == fn and info.get("location"):
|
||
doc_locs.append(info["location"])
|
||
folder_docs += 1
|
||
if folder_docs > 0:
|
||
log.debug(f" {codename}/{fn}: {folder_docs} docs")
|
||
|
||
already = set(progress.get("workspace_docs", {}).get(codename, []))
|
||
new_docs = [loc for loc in doc_locs if loc not in already]
|
||
if not new_docs:
|
||
if doc_locs:
|
||
log.info(f"[{idx}/{total_personas}] ✓ {codename}: {len(doc_locs)} docs already assigned")
|
||
else:
|
||
log.info(f"[{idx}/{total_personas}] ○ {codename}: no uploaded docs found")
|
||
continue
|
||
|
||
log.info(f"[{idx}/{total_personas}] → {codename} ({slug}): {len(new_docs)} docs to embed")
|
||
|
||
# Use small batches for embedding — AnythingLLM hangs on large batches
|
||
embed_batch = min(batch_size, 5)
|
||
persona_ok = 0
|
||
persona_fail = 0
|
||
|
||
for bs in range(0, len(new_docs), embed_batch):
|
||
batch = new_docs[bs:bs + embed_batch]
|
||
batch_num = bs // embed_batch + 1
|
||
total_batches = (len(new_docs) + embed_batch - 1) // embed_batch
|
||
|
||
log.debug(f" {codename} batch {batch_num}/{total_batches} ({len(batch)} docs)")
|
||
|
||
result = api_request(config, "post", f"/workspace/{slug}/update-embeddings",
|
||
json={"adds": batch, "deletes": []},
|
||
timeout=300, retries=3)
|
||
if result:
|
||
progress.setdefault("workspace_docs", {}).setdefault(codename, []).extend(batch)
|
||
persona_ok += len(batch)
|
||
log.info(f" ✓ {codename} batch {batch_num}/{total_batches}: "
|
||
f"{len(batch)} embedded ({persona_ok}/{len(new_docs)})")
|
||
else:
|
||
persona_fail += len(batch)
|
||
log.error(f" ✗ {codename} batch {batch_num}/{total_batches}: FAILED")
|
||
|
||
# Save after every batch
|
||
save_progress(progress)
|
||
|
||
if bs + embed_batch < len(new_docs):
|
||
time.sleep(delay)
|
||
|
||
total_embedded += persona_ok
|
||
total_failed += persona_fail
|
||
log.info(f" {codename} done: {persona_ok} ok, {persona_fail} failed")
|
||
|
||
log.info(f"── Assignment complete: {total_embedded} embedded, {total_failed} failed ──")
|
||
|
||
|
||
def upload_documents(config, persona_list=None, priority_filter=None,
|
||
dry_run=False, resume=False, max_size_mb=100):
|
||
"""Three-phase pipeline: text upload → OCR → OCR upload → assign."""
|
||
print("═══ Upload Pipeline ═══\n")
|
||
|
||
if not check_api(config):
|
||
print(" ✗ AnythingLLM API not reachable.")
|
||
return
|
||
|
||
book_library = Path(config["storage"]["book_library"])
|
||
batch_size = config["processing"]["batch_size"]
|
||
delay = config["processing"]["delay_between_batches"]
|
||
progress = load_progress() if resume else {
|
||
"uploaded_files": {}, "workspace_docs": {}, "ocr_done": [], "ocr_failed": [],
|
||
}
|
||
|
||
# Scan & classify
|
||
print(" Scanning folders...\n")
|
||
text_files, scanned_files, persona_folders, _ = collect_all_files(
|
||
config, book_library, persona_list, priority_filter, max_size_mb,
|
||
)
|
||
|
||
total_text = sum(len(v) for v in text_files.values())
|
||
total_scanned = sum(len(v) for v in scanned_files.values())
|
||
already = len(progress["uploaded_files"])
|
||
|
||
print(f"\n Text-based files: {total_text}")
|
||
print(f" Scanned PDFs: {total_scanned}")
|
||
print(f" Already uploaded: {already}")
|
||
print(f" Personas: {len(persona_folders)}\n")
|
||
|
||
if dry_run:
|
||
for fn in sorted(set(list(text_files.keys()) + list(scanned_files.keys()))):
|
||
t = len([f for f in text_files.get(fn, []) if str(f) not in progress["uploaded_files"]])
|
||
s = len([f for f in scanned_files.get(fn, []) if str(f) not in progress["ocr_done"]])
|
||
if t or s:
|
||
print(f" {fn}: {t} text, {s} scanned")
|
||
print(f"\n Personas:")
|
||
for c, flds in sorted(persona_folders.items()):
|
||
print(f" {c}: {', '.join(flds)}")
|
||
return
|
||
|
||
# ── Phase A: Upload text-based files ──
|
||
print("══ Phase A: Upload text-based files ══\n")
|
||
total_up = total_fail = 0
|
||
for fn, files in sorted(text_files.items()):
|
||
up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay)
|
||
total_up += up
|
||
total_fail += fail
|
||
print(f"\n Phase A done: {total_up} uploaded, {total_fail} failed\n")
|
||
|
||
# ── Phase B: OCR scanned PDFs ──
|
||
total_scanned_remaining = sum(
|
||
1 for files in scanned_files.values()
|
||
for f in files if str(f) not in progress.get("ocr_done", [])
|
||
)
|
||
if total_scanned_remaining > 0:
|
||
print(f"══ Phase B: OCR {total_scanned_remaining} scanned PDFs ══\n")
|
||
ocr_ok = ocr_fail = 0
|
||
for fn, files in sorted(scanned_files.items()):
|
||
pending = [f for f in files if str(f) not in progress.get("ocr_done", [])]
|
||
if not pending:
|
||
continue
|
||
print(f" → {fn}: {len(pending)} PDFs")
|
||
for i, pdf in enumerate(pending):
|
||
size_mb = pdf.stat().st_size / (1024 * 1024)
|
||
print(f" [{i+1}/{len(pending)}] {pdf.name} ({size_mb:.1f}MB)", end="", flush=True)
|
||
if ocr_pdf(pdf):
|
||
progress.setdefault("ocr_done", []).append(str(pdf))
|
||
ocr_ok += 1
|
||
print(f" ✓")
|
||
else:
|
||
progress.setdefault("ocr_failed", []).append(str(pdf))
|
||
ocr_fail += 1
|
||
print(f" ✗")
|
||
if (ocr_ok + ocr_fail) % 5 == 0:
|
||
save_progress(progress)
|
||
save_progress(progress)
|
||
print(f"\n Phase B done: {ocr_ok} OCR'd, {ocr_fail} failed\n")
|
||
|
||
# ── Phase C: Upload OCR'd files ──
|
||
ocr_to_upload = {fn: [f for f in files if str(f) in progress.get("ocr_done", [])]
|
||
for fn, files in scanned_files.items()}
|
||
total_ocr_upload = sum(
|
||
1 for files in ocr_to_upload.values()
|
||
for f in files if str(f) not in progress["uploaded_files"]
|
||
)
|
||
if total_ocr_upload > 0:
|
||
print(f"══ Phase C: Upload {total_ocr_upload} OCR'd files ══\n")
|
||
total_up2 = total_fail2 = 0
|
||
for fn, files in sorted(ocr_to_upload.items()):
|
||
if not files:
|
||
continue
|
||
up, fail = upload_file_batch(config, fn, files, progress, batch_size, delay)
|
||
total_up2 += up
|
||
total_fail2 += fail
|
||
print(f"\n Phase C done: {total_up2} uploaded, {total_fail2} failed\n")
|
||
|
||
# ── Assign to workspaces ──
|
||
assign_to_workspaces(config, persona_folders, progress, batch_size, delay)
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# STATUS
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def show_status(config):
|
||
print("═══ Integration Status ═══\n")
|
||
|
||
# Storage
|
||
du = ANYTHINGLLM_STORAGE / "direct-uploads"
|
||
if du.is_symlink():
|
||
print(f" ✓ direct-uploads/ → {du.resolve()} (HDD)")
|
||
elif du.exists():
|
||
print(f" ⚠ direct-uploads/ on SSD — run --storage-setup")
|
||
|
||
for d in ["documents", "lancedb", "vector-cache"]:
|
||
p = ANYTHINGLLM_STORAGE / d
|
||
if p.exists():
|
||
try:
|
||
sz = sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) / (1024**2)
|
||
print(f" ● {d}/ ({sz:.0f} MB)")
|
||
except Exception:
|
||
print(f" ● {d}/")
|
||
|
||
api_ok = check_api(config) if config["anythingllm"]["api_key"] else False
|
||
collector_ok = check_collector_alive()
|
||
print(f"\n API: {'✓' if api_ok else '✗'} Collector: {'✓' if collector_ok else '✗'}")
|
||
|
||
progress = load_progress()
|
||
uploaded = len(progress.get("uploaded_files", {}))
|
||
ocr_done = len(progress.get("ocr_done", []))
|
||
assigned = len(progress.get("workspace_docs", {}))
|
||
|
||
if uploaded or ocr_done:
|
||
print(f"\n Uploaded: {uploaded} OCR'd: {ocr_done} Personas assigned: {assigned}")
|
||
folders = {}
|
||
for info in progress.get("uploaded_files", {}).values():
|
||
f = info.get("folder", "?")
|
||
folders[f] = folders.get(f, 0) + 1
|
||
for f, c in sorted(folders.items(), key=lambda x: -x[1])[:20]:
|
||
print(f" {c:4d} {f}")
|
||
|
||
print()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# MAIN
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
def reassign_workspaces(config, persona_list=None, reset=False, dry_run=False):
|
||
"""Re-assign already-uploaded docs to workspaces without scanning/uploading.
|
||
Skips the slow folder scan — uses upload_progress.json directly."""
|
||
log.info("═══ Re-assign Workspaces ═══")
|
||
|
||
if not check_api(config):
|
||
log.error("AnythingLLM API not reachable")
|
||
return
|
||
|
||
progress = load_progress()
|
||
batch_size = config["processing"]["batch_size"]
|
||
delay = config["processing"]["delay_between_batches"]
|
||
|
||
if reset:
|
||
if persona_list:
|
||
for p in persona_list:
|
||
progress.get("workspace_docs", {}).pop(p, None)
|
||
log.info(f"Reset assignments for: {', '.join(persona_list)}")
|
||
else:
|
||
progress["workspace_docs"] = {}
|
||
log.info("Reset all workspace assignments")
|
||
save_progress(progress)
|
||
|
||
# Build persona_folders from config (no disk scan needed)
|
||
persona_folders = {}
|
||
for codename, ws_config in config["workspaces"].items():
|
||
if persona_list and codename not in persona_list:
|
||
continue
|
||
persona_folders[codename] = [
|
||
entry["path"].replace("/", "_")
|
||
for entry in ws_config.get("folders", [])
|
||
]
|
||
|
||
uploaded = len(progress.get("uploaded_files", {}))
|
||
log.info(f"Uploaded files in progress: {uploaded}")
|
||
log.info(f"Personas to assign: {len(persona_folders)}")
|
||
|
||
if dry_run:
|
||
existing_ws = get_existing_workspaces(config)
|
||
for codename, folders in sorted(persona_folders.items()):
|
||
ws_name = config["workspaces"][codename]["name"]
|
||
slug = existing_ws.get(ws_name, {}).get("slug", "?")
|
||
doc_count = 0
|
||
for fn in folders:
|
||
for info in progress.get("uploaded_files", {}).values():
|
||
if info.get("folder") == fn and info.get("location"):
|
||
doc_count += 1
|
||
already = len(progress.get("workspace_docs", {}).get(codename, []))
|
||
log.info(f" {codename} ({slug}): {doc_count} docs, {already} already assigned")
|
||
return
|
||
|
||
assign_to_workspaces(config, persona_folders, progress, batch_size, delay)
|
||
log.info("Re-assign complete.")
|
||
|
||
|
||
def resolve_persona_list(args, config):
|
||
"""Resolve --persona / --cluster to a list of codenames."""
|
||
if args.persona:
|
||
return [args.persona]
|
||
if args.cluster:
|
||
cl = CLUSTERS.get(args.cluster)
|
||
if not cl:
|
||
print(f"Unknown cluster: {args.cluster}")
|
||
print(f"Available: {', '.join(CLUSTERS.keys())}")
|
||
sys.exit(1)
|
||
return cl
|
||
return None # all
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="AnythingLLM × Persona Integration")
|
||
parser.add_argument("--storage-setup", action="store_true")
|
||
parser.add_argument("--create-workspaces", action="store_true")
|
||
parser.add_argument("--upload-documents", action="store_true")
|
||
parser.add_argument("--all", action="store_true", help="Run all steps")
|
||
parser.add_argument("--reassign", action="store_true", help="Re-assign uploaded docs to workspaces (no scan/upload)")
|
||
parser.add_argument("--reset", action="store_true", help="Reset assignment tracking before reassign")
|
||
parser.add_argument("--status", action="store_true")
|
||
parser.add_argument("--persona", type=str, help="Single persona filter")
|
||
parser.add_argument("--cluster", type=str, help="Cluster filter: intel, cyber, military, humanities, engineering")
|
||
parser.add_argument("--priority", type=int, help="Max priority (1=core)")
|
||
parser.add_argument("--max-size", type=int, default=100, help="Max file MB (default: 100)")
|
||
parser.add_argument("--dry-run", action="store_true")
|
||
parser.add_argument("--resume", action="store_true")
|
||
parser.add_argument("--verbose", "-v", action="store_true", help="Debug-level console output")
|
||
|
||
args = parser.parse_args()
|
||
setup_logging(verbose=args.verbose)
|
||
log.info(f"AnythingLLM Integration started — args: {vars(args)}")
|
||
config = load_config()
|
||
|
||
if not any([args.storage_setup, args.create_workspaces,
|
||
args.upload_documents, args.reassign, args.all, args.status]):
|
||
parser.print_help()
|
||
return
|
||
|
||
persona_list = resolve_persona_list(args, config)
|
||
|
||
if args.status:
|
||
show_status(config)
|
||
return
|
||
if args.storage_setup or args.all:
|
||
storage_setup(config, dry_run=args.dry_run)
|
||
if args.create_workspaces or args.all:
|
||
create_workspaces(config, persona_list=persona_list, dry_run=args.dry_run)
|
||
if args.reassign:
|
||
reassign_workspaces(config, persona_list=persona_list,
|
||
reset=args.reset, dry_run=args.dry_run)
|
||
if args.upload_documents or args.all:
|
||
upload_documents(config, persona_list=persona_list,
|
||
priority_filter=args.priority,
|
||
dry_run=args.dry_run,
|
||
resume=args.resume or args.all,
|
||
max_size_mb=args.max_size)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|