- Engine.Register/List/SweepAll with ants pool fanout - ExampleSource emits two deterministic findings (SourceType=recon:example) - Tests cover Register/List idempotency, SweepAll aggregation, empty-registry, and Enabled() filtering
106 lines
2.4 KiB
Go
106 lines
2.4 KiB
Go
package recon
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
)
|
|
|
|
// Engine orchestrates parallel fanout across registered ReconSource implementations.
|
|
//
|
|
// Sources are registered once at startup (typically by phase-specific init()
|
|
// functions). SweepAll runs every enabled source in its own goroutine via an
|
|
// ants worker pool, aggregates their findings, and returns the combined slice.
|
|
// Deduplication is intentionally NOT performed here — callers use
|
|
// pkg/recon.Dedup (plan 09-03) on the returned slice.
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
sources map[string]ReconSource
|
|
}
|
|
|
|
// NewEngine returns an Engine with an empty source registry.
|
|
func NewEngine() *Engine {
|
|
return &Engine{sources: make(map[string]ReconSource)}
|
|
}
|
|
|
|
// Register adds a source, keyed by s.Name(). Registering the same name twice
|
|
// replaces the previous instance (idempotent for identical sources).
|
|
func (e *Engine) Register(s ReconSource) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.sources[s.Name()] = s
|
|
}
|
|
|
|
// List returns registered source names in sorted order.
|
|
func (e *Engine) List() []string {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
names := make([]string, 0, len(e.sources))
|
|
for n := range e.sources {
|
|
names = append(names, n)
|
|
}
|
|
sort.Strings(names)
|
|
return names
|
|
}
|
|
|
|
// SweepAll fans out to every enabled source in parallel via an ants pool and
|
|
// returns the aggregated findings. It respects ctx cancellation at the
|
|
// collection loop. Deduplication is performed by callers.
|
|
func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) {
|
|
e.mu.RLock()
|
|
active := make([]ReconSource, 0, len(e.sources))
|
|
for _, s := range e.sources {
|
|
if s.Enabled(cfg) {
|
|
active = append(active, s)
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if len(active) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
pool, err := ants.NewPool(len(active))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer pool.Release()
|
|
|
|
out := make(chan Finding, 256)
|
|
var wg sync.WaitGroup
|
|
for _, s := range active {
|
|
s := s
|
|
wg.Add(1)
|
|
if err := pool.Submit(func() {
|
|
defer wg.Done()
|
|
_ = s.Sweep(ctx, cfg.Query, out)
|
|
}); err != nil {
|
|
wg.Done()
|
|
}
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(out)
|
|
}()
|
|
|
|
var all []Finding
|
|
for {
|
|
select {
|
|
case f, ok := <-out:
|
|
if !ok {
|
|
return all, nil
|
|
}
|
|
all = append(all, f)
|
|
case <-ctx.Done():
|
|
// Drain remaining so goroutines don't block on send.
|
|
go func() {
|
|
for range out {
|
|
}
|
|
}()
|
|
return all, ctx.Err()
|
|
}
|
|
}
|
|
}
|