diff --git a/.gitignore b/.gitignore index f1cc37c..0afd532 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,7 @@ ocr_output/ __pycache__/ *.pyc .venv/ +quality_analysis.json +quality_analysis.csv +quality_scan.log +upload_run.log diff --git a/quality_analyzer.py b/quality_analyzer.py new file mode 100755 index 0000000..b33b015 --- /dev/null +++ b/quality_analyzer.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +""" +PDF Quality Analyzer — Scan FOIA/CIA folders and classify documents. + +Extracts metadata + first 10 pages text for each PDF, scores quality, +outputs JSON for filtering. No files moved — only analysis. + +Usage: + python3 quality_analyzer.py --scan # Analyze all + python3 quality_analyzer.py --scan --folder Istihbarat/CIA + python3 quality_analyzer.py --scan --folder FOIA/documents + python3 quality_analyzer.py --report # Show stats from scan + python3 quality_analyzer.py --export-csv # Export to CSV + python3 quality_analyzer.py --classify # Add quality labels + python3 quality_analyzer.py --move # Move noise to Arsiv/noise +""" + +import argparse +import csv +import json +import os +import re +import subprocess +import sys +import time +from pathlib import Path + +BOOKS_ROOT = Path("/mnt/storage/Common/Books") +OUTPUT_PATH = Path(__file__).parent / "quality_analysis.json" +CSV_PATH = Path(__file__).parent / "quality_analysis.csv" + + +def extract_pdf_metadata(pdf_path): + """Extract metadata from a single PDF.""" + result = { + "path": str(pdf_path), + "filename": pdf_path.name, + "folder": str(pdf_path.parent.relative_to(BOOKS_ROOT)), + "size_kb": pdf_path.stat().st_size // 1024, + "pages": 0, + "has_fonts": False, + "font_count": 0, + "text_chars": 0, + "text_words": 0, + "text_lines": 0, + "text_sample": "", + "language": "unknown", + "quality_score": 0, + "quality_label": "", + "issues": [], + } + + # Page count via pdfinfo + try: + r = subprocess.run(["pdfinfo", str(pdf_path)], capture_output=True, text=True, timeout=5) + for line in r.stdout.splitlines(): + if line.startswith("Pages:"): + result["pages"] = int(line.split(":")[1].strip()) + break + except Exception: + result["issues"].append("pdfinfo_failed") + + # Font detection via pdffonts + try: + r = subprocess.run(["pdffonts", "-l", "10", str(pdf_path)], + capture_output=True, text=True, timeout=5) + font_lines = [l for l in r.stdout.strip().split("\n") if l.strip()] + result["font_count"] = max(0, len(font_lines) - 2) # subtract header + result["has_fonts"] = result["font_count"] > 0 + except Exception: + result["issues"].append("pdffonts_failed") + + # Text extraction — first 10 pages + try: + r = subprocess.run(["pdftotext", "-l", "10", str(pdf_path), "-"], + capture_output=True, text=True, timeout=15) + text = r.stdout.strip() + result["text_chars"] = len(text) + result["text_words"] = len(text.split()) + result["text_lines"] = len([l for l in text.split("\n") if l.strip()]) + # Keep first 2000 chars as sample + result["text_sample"] = text[:2000] + + # Language detection (simple heuristic) + if text: + arabic_chars = len(re.findall(r'[\u0600-\u06FF]', text)) + cyrillic_chars = len(re.findall(r'[\u0400-\u04FF]', text)) + turkish_chars = len(re.findall(r'[şçğüöıİŞÇĞÜÖ]', text)) + latin_chars = len(re.findall(r'[a-zA-Z]', text)) + + total = arabic_chars + cyrillic_chars + turkish_chars + latin_chars + if total > 0: + if arabic_chars / total > 0.3: + result["language"] = "arabic" + elif cyrillic_chars / total > 0.3: + result["language"] = "russian" + elif turkish_chars / max(1, latin_chars) > 0.02: + result["language"] = "turkish" + elif latin_chars > 0: + result["language"] = "english" + except subprocess.TimeoutExpired: + result["issues"].append("pdftotext_timeout") + except Exception: + result["issues"].append("pdftotext_failed") + + return result + + +def score_quality(doc): + """Score document quality 0-100.""" + score = 0 + issues = list(doc.get("issues", [])) + + # Text content quality (0-40 points) + words = doc["text_words"] + if words > 1000: + score += 40 + elif words > 500: + score += 30 + elif words > 100: + score += 20 + elif words > 20: + score += 10 + else: + issues.append("very_low_text") + + # Has real fonts (0-15 points) + if doc["has_fonts"]: + score += 15 + else: + issues.append("no_fonts_scanned") + + # File size reasonable (0-10 points) + size_kb = doc["size_kb"] + if 50 < size_kb < 50000: + score += 10 + elif size_kb <= 50: + issues.append("too_small") + elif size_kb >= 50000: + score += 5 # large but might be valuable + + # Page count (0-10 points) + pages = doc["pages"] + if pages >= 5: + score += 10 + elif pages >= 2: + score += 5 + else: + issues.append("single_page") + + # Filename quality (0-10 points) + fname = doc["filename"] + if len(fname) > 20 and not fname.startswith("00"): + # Has a real name, not just hash + if any(c.isalpha() and c.isascii() for c in fname[:20]): + score += 10 + elif any('\u0600' <= c <= '\u06FF' for c in fname): + score += 5 # Arabic name, might be ok + else: + if re.match(r'^[0-9A-Fa-f]{10,}', fname): + issues.append("hash_filename") + + # Text coherence — check for OCR garbage (0-15 points) + sample = doc["text_sample"] + if sample: + # Count actual words vs garbage + real_words = len(re.findall(r'\b[a-zA-Z\u0400-\u04FF\u0600-\u06FF]{3,}\b', sample)) + total_tokens = len(sample.split()) + if total_tokens > 0: + coherence = real_words / total_tokens + if coherence > 0.5: + score += 15 + elif coherence > 0.3: + score += 10 + elif coherence > 0.1: + score += 5 + else: + issues.append("garbled_text") + + doc["quality_score"] = min(100, score) + doc["issues"] = issues + return doc + + +def classify(doc): + """Assign quality label based on score.""" + score = doc["quality_score"] + if score >= 70: + doc["quality_label"] = "high" + elif score >= 40: + doc["quality_label"] = "medium" + elif score >= 20: + doc["quality_label"] = "low" + else: + doc["quality_label"] = "noise" + return doc + + +def scan_folder(folder_path, existing=None): + """Scan all PDFs in a folder.""" + existing = existing or {} + pdfs = sorted(folder_path.rglob("*.pdf")) + total = len(pdfs) + results = [] + skipped = 0 + + print(f"Scanning {folder_path.relative_to(BOOKS_ROOT)}: {total} PDFs") + + for i, pdf in enumerate(pdfs): + key = str(pdf) + if key in existing: + results.append(existing[key]) + skipped += 1 + continue + + if (i + 1) % 100 == 0 or i == 0: + print(f" [{i+1}/{total}] {pdf.name[:50]}...", flush=True) + + try: + doc = extract_pdf_metadata(pdf) + doc = score_quality(doc) + doc = classify(doc) + results.append(doc) + except Exception as e: + results.append({ + "path": str(pdf), + "filename": pdf.name, + "folder": str(pdf.parent.relative_to(BOOKS_ROOT)), + "size_kb": pdf.stat().st_size // 1024 if pdf.exists() else 0, + "quality_score": 0, + "quality_label": "error", + "issues": [str(e)], + "text_sample": "", + }) + + print(f" Done: {total - skipped} new, {skipped} cached") + return results + + +def load_existing(): + """Load existing analysis.""" + if OUTPUT_PATH.exists(): + with open(OUTPUT_PATH) as f: + data = json.load(f) + return {d["path"]: d for d in data} + return {} + + +def save_results(results): + """Save analysis to JSON (without text_sample for size).""" + with open(OUTPUT_PATH, "w") as f: + json.dump(results, f, indent=2, ensure_ascii=False) + print(f"Saved {len(results)} docs to {OUTPUT_PATH}") + + +def print_report(results): + """Print quality distribution report.""" + by_folder = {} + by_label = {"high": 0, "medium": 0, "low": 0, "noise": 0, "error": 0} + by_lang = {} + + for doc in results: + folder = doc.get("folder", "?") + label = doc.get("quality_label", "?") + lang = doc.get("language", "?") + + by_folder.setdefault(folder, {"high": 0, "medium": 0, "low": 0, "noise": 0, "total": 0}) + by_folder[folder][label] = by_folder[folder].get(label, 0) + 1 + by_folder[folder]["total"] += 1 + + by_label[label] = by_label.get(label, 0) + 1 + by_lang[lang] = by_lang.get(lang, 0) + 1 + + print(f"\n{'=' * 70}") + print(f"QUALITY REPORT — {len(results)} documents") + print(f"{'=' * 70}\n") + + print("Overall distribution:") + for label in ["high", "medium", "low", "noise", "error"]: + count = by_label.get(label, 0) + pct = count * 100 // max(1, len(results)) + bar = "█" * (pct // 2) + print(f" {label:<8} {count:>6} ({pct:>2}%) {bar}") + + print(f"\nLanguage distribution:") + for lang, count in sorted(by_lang.items(), key=lambda x: -x[1]): + print(f" {lang:<12} {count:>6}") + + print(f"\nPer folder:") + print(f" {'Folder':<40} {'Total':>6} {'High':>6} {'Med':>6} {'Low':>6} {'Noise':>6}") + print(f" {'-' * 72}") + for folder, counts in sorted(by_folder.items(), key=lambda x: -x[1]["total"]): + print(f" {folder:<40} {counts['total']:>6} " + f"{counts.get('high', 0):>6} {counts.get('medium', 0):>6} " + f"{counts.get('low', 0):>6} {counts.get('noise', 0):>6}") + + +def export_csv(results): + """Export to CSV (without text_sample).""" + fields = ["filename", "folder", "size_kb", "pages", "has_fonts", "text_words", + "language", "quality_score", "quality_label", "issues"] + with open(CSV_PATH, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") + writer.writeheader() + for doc in sorted(results, key=lambda x: -x.get("quality_score", 0)): + doc_copy = dict(doc) + doc_copy["issues"] = "; ".join(doc.get("issues", [])) + writer.writerow(doc_copy) + print(f"Exported {len(results)} docs to {CSV_PATH}") + + +def move_classified(results, dry_run=True): + """Move noise files to Arsiv/noise, high quality to special folder.""" + noise_dir = BOOKS_ROOT / "Arsiv" / "noise" + moved = 0 + + for doc in results: + if doc.get("quality_label") != "noise": + continue + src = Path(doc["path"]) + if not src.exists(): + continue + dst = noise_dir / doc.get("folder", "") / src.name + if dry_run: + print(f" [DRY] {src.name} → {dst.parent}") + else: + dst.parent.mkdir(parents=True, exist_ok=True) + src.rename(dst) + moved += 1 + + action = "Would move" if dry_run else "Moved" + print(f"\n{action} {moved} noise files") + if dry_run: + print("Run with --move --confirm to actually move files") + + +def main(): + parser = argparse.ArgumentParser(description="PDF Quality Analyzer") + parser.add_argument("--scan", action="store_true", help="Scan and analyze PDFs") + parser.add_argument("--folder", type=str, help="Specific folder under Books/ to scan") + parser.add_argument("--report", action="store_true", help="Show report from existing analysis") + parser.add_argument("--export-csv", action="store_true", help="Export to CSV") + parser.add_argument("--classify", action="store_true", help="Re-classify existing analysis") + parser.add_argument("--move", action="store_true", help="Move noise to Arsiv/noise (dry-run)") + parser.add_argument("--confirm", action="store_true", help="Actually move files (with --move)") + + args = parser.parse_args() + + if args.scan: + existing = load_existing() + folders = [] + if args.folder: + folders = [BOOKS_ROOT / args.folder] + else: + # Default: scan FOIA + CIA (the questionable ones) + folders = [ + BOOKS_ROOT / "Istihbarat" / "CIA", + BOOKS_ROOT / "FOIA" / "documents", + BOOKS_ROOT / "Istihbarat" / "FOIA-FBI-Vault", + BOOKS_ROOT / "Istihbarat" / "FOIA-IA-CIA-SogukSavas", + BOOKS_ROOT / "Istihbarat" / "FOIA-IA-FBI", + BOOKS_ROOT / "Istihbarat" / "FOIA-IA-CIA-Kuba-OrtaDogu", + BOOKS_ROOT / "Istihbarat" / "FOIA-IA-WWII", + BOOKS_ROOT / "Istihbarat" / "FOIA-CIA-Turkey", + ] + + all_results = [v for v in existing.values() + if not any(str(Path(v["path"]).parent).startswith(str(f)) for f in folders)] + + for folder in folders: + if folder.exists(): + results = scan_folder(folder, existing) + all_results.extend(results) + + save_results(all_results) + print_report(all_results) + + elif args.report: + if not OUTPUT_PATH.exists(): + print("No analysis found. Run --scan first.") + return + with open(OUTPUT_PATH) as f: + results = json.load(f) + print_report(results) + + elif args.export_csv: + if not OUTPUT_PATH.exists(): + print("No analysis found. Run --scan first.") + return + with open(OUTPUT_PATH) as f: + results = json.load(f) + export_csv(results) + + elif args.classify: + if not OUTPUT_PATH.exists(): + print("No analysis found. Run --scan first.") + return + with open(OUTPUT_PATH) as f: + results = json.load(f) + results = [classify(score_quality(d)) for d in results] + save_results(results) + print_report(results) + + elif args.move: + if not OUTPUT_PATH.exists(): + print("No analysis found. Run --scan first.") + return + with open(OUTPUT_PATH) as f: + results = json.load(f) + move_classified(results, dry_run=not args.confirm) + + else: + parser.print_help() + + +if __name__ == "__main__": + main()