Add quality_analyzer.py — PDF quality scoring for FOIA/CIA filtering

Scans PDF folders and scores each document (0-100) based on:
- Text content (word count, coherence, OCR garbage detection)
- Font presence (scanned vs text-based)
- File size, page count, filename quality
- Language detection (Arabic/Russian/Turkish/English)

Labels: high (70+), medium (40-69), low (20-39), noise (<20)
Outputs JSON + CSV. Can move noise to Arsiv/noise with --move.

Usage: --scan, --report, --export-csv, --move [--confirm]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
salvacybersec
2026-04-07 23:00:34 +03:00
parent 6c5a828b13
commit 803e8be284
2 changed files with 421 additions and 0 deletions

4
.gitignore vendored
View File

@@ -9,3 +9,7 @@ ocr_output/
__pycache__/
*.pyc
.venv/
quality_analysis.json
quality_analysis.csv
quality_scan.log
upload_run.log

417
quality_analyzer.py Executable file
View File

@@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
PDF Quality Analyzer — Scan FOIA/CIA folders and classify documents.
Extracts metadata + first 10 pages text for each PDF, scores quality,
outputs JSON for filtering. No files moved — only analysis.
Usage:
python3 quality_analyzer.py --scan # Analyze all
python3 quality_analyzer.py --scan --folder Istihbarat/CIA
python3 quality_analyzer.py --scan --folder FOIA/documents
python3 quality_analyzer.py --report # Show stats from scan
python3 quality_analyzer.py --export-csv # Export to CSV
python3 quality_analyzer.py --classify # Add quality labels
python3 quality_analyzer.py --move # Move noise to Arsiv/noise
"""
import argparse
import csv
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
BOOKS_ROOT = Path("/mnt/storage/Common/Books")
OUTPUT_PATH = Path(__file__).parent / "quality_analysis.json"
CSV_PATH = Path(__file__).parent / "quality_analysis.csv"
def extract_pdf_metadata(pdf_path):
"""Extract metadata from a single PDF."""
result = {
"path": str(pdf_path),
"filename": pdf_path.name,
"folder": str(pdf_path.parent.relative_to(BOOKS_ROOT)),
"size_kb": pdf_path.stat().st_size // 1024,
"pages": 0,
"has_fonts": False,
"font_count": 0,
"text_chars": 0,
"text_words": 0,
"text_lines": 0,
"text_sample": "",
"language": "unknown",
"quality_score": 0,
"quality_label": "",
"issues": [],
}
# Page count via pdfinfo
try:
r = subprocess.run(["pdfinfo", str(pdf_path)], capture_output=True, text=True, timeout=5)
for line in r.stdout.splitlines():
if line.startswith("Pages:"):
result["pages"] = int(line.split(":")[1].strip())
break
except Exception:
result["issues"].append("pdfinfo_failed")
# Font detection via pdffonts
try:
r = subprocess.run(["pdffonts", "-l", "10", str(pdf_path)],
capture_output=True, text=True, timeout=5)
font_lines = [l for l in r.stdout.strip().split("\n") if l.strip()]
result["font_count"] = max(0, len(font_lines) - 2) # subtract header
result["has_fonts"] = result["font_count"] > 0
except Exception:
result["issues"].append("pdffonts_failed")
# Text extraction — first 10 pages
try:
r = subprocess.run(["pdftotext", "-l", "10", str(pdf_path), "-"],
capture_output=True, text=True, timeout=15)
text = r.stdout.strip()
result["text_chars"] = len(text)
result["text_words"] = len(text.split())
result["text_lines"] = len([l for l in text.split("\n") if l.strip()])
# Keep first 2000 chars as sample
result["text_sample"] = text[:2000]
# Language detection (simple heuristic)
if text:
arabic_chars = len(re.findall(r'[\u0600-\u06FF]', text))
cyrillic_chars = len(re.findall(r'[\u0400-\u04FF]', text))
turkish_chars = len(re.findall(r'[şçğüöıİŞÇĞÜÖ]', text))
latin_chars = len(re.findall(r'[a-zA-Z]', text))
total = arabic_chars + cyrillic_chars + turkish_chars + latin_chars
if total > 0:
if arabic_chars / total > 0.3:
result["language"] = "arabic"
elif cyrillic_chars / total > 0.3:
result["language"] = "russian"
elif turkish_chars / max(1, latin_chars) > 0.02:
result["language"] = "turkish"
elif latin_chars > 0:
result["language"] = "english"
except subprocess.TimeoutExpired:
result["issues"].append("pdftotext_timeout")
except Exception:
result["issues"].append("pdftotext_failed")
return result
def score_quality(doc):
"""Score document quality 0-100."""
score = 0
issues = list(doc.get("issues", []))
# Text content quality (0-40 points)
words = doc["text_words"]
if words > 1000:
score += 40
elif words > 500:
score += 30
elif words > 100:
score += 20
elif words > 20:
score += 10
else:
issues.append("very_low_text")
# Has real fonts (0-15 points)
if doc["has_fonts"]:
score += 15
else:
issues.append("no_fonts_scanned")
# File size reasonable (0-10 points)
size_kb = doc["size_kb"]
if 50 < size_kb < 50000:
score += 10
elif size_kb <= 50:
issues.append("too_small")
elif size_kb >= 50000:
score += 5 # large but might be valuable
# Page count (0-10 points)
pages = doc["pages"]
if pages >= 5:
score += 10
elif pages >= 2:
score += 5
else:
issues.append("single_page")
# Filename quality (0-10 points)
fname = doc["filename"]
if len(fname) > 20 and not fname.startswith("00"):
# Has a real name, not just hash
if any(c.isalpha() and c.isascii() for c in fname[:20]):
score += 10
elif any('\u0600' <= c <= '\u06FF' for c in fname):
score += 5 # Arabic name, might be ok
else:
if re.match(r'^[0-9A-Fa-f]{10,}', fname):
issues.append("hash_filename")
# Text coherence — check for OCR garbage (0-15 points)
sample = doc["text_sample"]
if sample:
# Count actual words vs garbage
real_words = len(re.findall(r'\b[a-zA-Z\u0400-\u04FF\u0600-\u06FF]{3,}\b', sample))
total_tokens = len(sample.split())
if total_tokens > 0:
coherence = real_words / total_tokens
if coherence > 0.5:
score += 15
elif coherence > 0.3:
score += 10
elif coherence > 0.1:
score += 5
else:
issues.append("garbled_text")
doc["quality_score"] = min(100, score)
doc["issues"] = issues
return doc
def classify(doc):
"""Assign quality label based on score."""
score = doc["quality_score"]
if score >= 70:
doc["quality_label"] = "high"
elif score >= 40:
doc["quality_label"] = "medium"
elif score >= 20:
doc["quality_label"] = "low"
else:
doc["quality_label"] = "noise"
return doc
def scan_folder(folder_path, existing=None):
"""Scan all PDFs in a folder."""
existing = existing or {}
pdfs = sorted(folder_path.rglob("*.pdf"))
total = len(pdfs)
results = []
skipped = 0
print(f"Scanning {folder_path.relative_to(BOOKS_ROOT)}: {total} PDFs")
for i, pdf in enumerate(pdfs):
key = str(pdf)
if key in existing:
results.append(existing[key])
skipped += 1
continue
if (i + 1) % 100 == 0 or i == 0:
print(f" [{i+1}/{total}] {pdf.name[:50]}...", flush=True)
try:
doc = extract_pdf_metadata(pdf)
doc = score_quality(doc)
doc = classify(doc)
results.append(doc)
except Exception as e:
results.append({
"path": str(pdf),
"filename": pdf.name,
"folder": str(pdf.parent.relative_to(BOOKS_ROOT)),
"size_kb": pdf.stat().st_size // 1024 if pdf.exists() else 0,
"quality_score": 0,
"quality_label": "error",
"issues": [str(e)],
"text_sample": "",
})
print(f" Done: {total - skipped} new, {skipped} cached")
return results
def load_existing():
"""Load existing analysis."""
if OUTPUT_PATH.exists():
with open(OUTPUT_PATH) as f:
data = json.load(f)
return {d["path"]: d for d in data}
return {}
def save_results(results):
"""Save analysis to JSON (without text_sample for size)."""
with open(OUTPUT_PATH, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"Saved {len(results)} docs to {OUTPUT_PATH}")
def print_report(results):
"""Print quality distribution report."""
by_folder = {}
by_label = {"high": 0, "medium": 0, "low": 0, "noise": 0, "error": 0}
by_lang = {}
for doc in results:
folder = doc.get("folder", "?")
label = doc.get("quality_label", "?")
lang = doc.get("language", "?")
by_folder.setdefault(folder, {"high": 0, "medium": 0, "low": 0, "noise": 0, "total": 0})
by_folder[folder][label] = by_folder[folder].get(label, 0) + 1
by_folder[folder]["total"] += 1
by_label[label] = by_label.get(label, 0) + 1
by_lang[lang] = by_lang.get(lang, 0) + 1
print(f"\n{'=' * 70}")
print(f"QUALITY REPORT — {len(results)} documents")
print(f"{'=' * 70}\n")
print("Overall distribution:")
for label in ["high", "medium", "low", "noise", "error"]:
count = by_label.get(label, 0)
pct = count * 100 // max(1, len(results))
bar = "" * (pct // 2)
print(f" {label:<8} {count:>6} ({pct:>2}%) {bar}")
print(f"\nLanguage distribution:")
for lang, count in sorted(by_lang.items(), key=lambda x: -x[1]):
print(f" {lang:<12} {count:>6}")
print(f"\nPer folder:")
print(f" {'Folder':<40} {'Total':>6} {'High':>6} {'Med':>6} {'Low':>6} {'Noise':>6}")
print(f" {'-' * 72}")
for folder, counts in sorted(by_folder.items(), key=lambda x: -x[1]["total"]):
print(f" {folder:<40} {counts['total']:>6} "
f"{counts.get('high', 0):>6} {counts.get('medium', 0):>6} "
f"{counts.get('low', 0):>6} {counts.get('noise', 0):>6}")
def export_csv(results):
"""Export to CSV (without text_sample)."""
fields = ["filename", "folder", "size_kb", "pages", "has_fonts", "text_words",
"language", "quality_score", "quality_label", "issues"]
with open(CSV_PATH, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
writer.writeheader()
for doc in sorted(results, key=lambda x: -x.get("quality_score", 0)):
doc_copy = dict(doc)
doc_copy["issues"] = "; ".join(doc.get("issues", []))
writer.writerow(doc_copy)
print(f"Exported {len(results)} docs to {CSV_PATH}")
def move_classified(results, dry_run=True):
"""Move noise files to Arsiv/noise, high quality to special folder."""
noise_dir = BOOKS_ROOT / "Arsiv" / "noise"
moved = 0
for doc in results:
if doc.get("quality_label") != "noise":
continue
src = Path(doc["path"])
if not src.exists():
continue
dst = noise_dir / doc.get("folder", "") / src.name
if dry_run:
print(f" [DRY] {src.name}{dst.parent}")
else:
dst.parent.mkdir(parents=True, exist_ok=True)
src.rename(dst)
moved += 1
action = "Would move" if dry_run else "Moved"
print(f"\n{action} {moved} noise files")
if dry_run:
print("Run with --move --confirm to actually move files")
def main():
parser = argparse.ArgumentParser(description="PDF Quality Analyzer")
parser.add_argument("--scan", action="store_true", help="Scan and analyze PDFs")
parser.add_argument("--folder", type=str, help="Specific folder under Books/ to scan")
parser.add_argument("--report", action="store_true", help="Show report from existing analysis")
parser.add_argument("--export-csv", action="store_true", help="Export to CSV")
parser.add_argument("--classify", action="store_true", help="Re-classify existing analysis")
parser.add_argument("--move", action="store_true", help="Move noise to Arsiv/noise (dry-run)")
parser.add_argument("--confirm", action="store_true", help="Actually move files (with --move)")
args = parser.parse_args()
if args.scan:
existing = load_existing()
folders = []
if args.folder:
folders = [BOOKS_ROOT / args.folder]
else:
# Default: scan FOIA + CIA (the questionable ones)
folders = [
BOOKS_ROOT / "Istihbarat" / "CIA",
BOOKS_ROOT / "FOIA" / "documents",
BOOKS_ROOT / "Istihbarat" / "FOIA-FBI-Vault",
BOOKS_ROOT / "Istihbarat" / "FOIA-IA-CIA-SogukSavas",
BOOKS_ROOT / "Istihbarat" / "FOIA-IA-FBI",
BOOKS_ROOT / "Istihbarat" / "FOIA-IA-CIA-Kuba-OrtaDogu",
BOOKS_ROOT / "Istihbarat" / "FOIA-IA-WWII",
BOOKS_ROOT / "Istihbarat" / "FOIA-CIA-Turkey",
]
all_results = [v for v in existing.values()
if not any(str(Path(v["path"]).parent).startswith(str(f)) for f in folders)]
for folder in folders:
if folder.exists():
results = scan_folder(folder, existing)
all_results.extend(results)
save_results(all_results)
print_report(all_results)
elif args.report:
if not OUTPUT_PATH.exists():
print("No analysis found. Run --scan first.")
return
with open(OUTPUT_PATH) as f:
results = json.load(f)
print_report(results)
elif args.export_csv:
if not OUTPUT_PATH.exists():
print("No analysis found. Run --scan first.")
return
with open(OUTPUT_PATH) as f:
results = json.load(f)
export_csv(results)
elif args.classify:
if not OUTPUT_PATH.exists():
print("No analysis found. Run --scan first.")
return
with open(OUTPUT_PATH) as f:
results = json.load(f)
results = [classify(score_quality(d)) for d in results]
save_results(results)
print_report(results)
elif args.move:
if not OUTPUT_PATH.exists():
print("No analysis found. Run --scan first.")
return
with open(OUTPUT_PATH) as f:
results = json.load(f)
move_classified(results, dry_run=not args.confirm)
else:
parser.print_help()
if __name__ == "__main__":
main()