package recon import ( "context" "sort" "sync" "github.com/panjf2000/ants/v2" ) // Engine orchestrates parallel fanout across registered ReconSource implementations. // // Sources are registered once at startup (typically by phase-specific init() // functions). SweepAll runs every enabled source in its own goroutine via an // ants worker pool, aggregates their findings, and returns the combined slice. // Deduplication is intentionally NOT performed here — callers use // pkg/recon.Dedup (plan 09-03) on the returned slice. type Engine struct { mu sync.RWMutex sources map[string]ReconSource } // NewEngine returns an Engine with an empty source registry. func NewEngine() *Engine { return &Engine{sources: make(map[string]ReconSource)} } // Register adds a source, keyed by s.Name(). Registering the same name twice // replaces the previous instance (idempotent for identical sources). func (e *Engine) Register(s ReconSource) { e.mu.Lock() defer e.mu.Unlock() e.sources[s.Name()] = s } // Get returns a registered source by name and true, or nil and false. func (e *Engine) Get(name string) (ReconSource, bool) { e.mu.RLock() defer e.mu.RUnlock() s, ok := e.sources[name] return s, ok } // List returns registered source names in sorted order. func (e *Engine) List() []string { e.mu.RLock() defer e.mu.RUnlock() names := make([]string, 0, len(e.sources)) for n := range e.sources { names = append(names, n) } sort.Strings(names) return names } // SweepAll fans out to every enabled source in parallel via an ants pool and // returns the aggregated findings. It respects ctx cancellation at the // collection loop. Deduplication is performed by callers. func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) { e.mu.RLock() active := make([]ReconSource, 0, len(e.sources)) for _, s := range e.sources { if s.Enabled(cfg) { active = append(active, s) } } e.mu.RUnlock() if len(active) == 0 { return nil, nil } pool, err := ants.NewPool(len(active)) if err != nil { return nil, err } defer pool.Release() out := make(chan Finding, 256) var wg sync.WaitGroup for _, s := range active { s := s wg.Add(1) if err := pool.Submit(func() { defer wg.Done() _ = s.Sweep(ctx, cfg.Query, out) }); err != nil { wg.Done() } } go func() { wg.Wait() close(out) }() var all []Finding for { select { case f, ok := <-out: if !ok { return all, nil } all = append(all, f) case <-ctx.Done(): // Drain remaining so goroutines don't block on send. go func() { for range out { } }() return all, ctx.Err() } } }