feat(09-01): add Engine with parallel fanout and ExampleSource
- Engine.Register/List/SweepAll with ants pool fanout - ExampleSource emits two deterministic findings (SourceType=recon:example) - Tests cover Register/List idempotency, SweepAll aggregation, empty-registry, and Enabled() filtering
This commit is contained in:
105
pkg/recon/engine.go
Normal file
105
pkg/recon/engine.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package recon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
// Engine orchestrates parallel fanout across registered ReconSource implementations.
|
||||
//
|
||||
// Sources are registered once at startup (typically by phase-specific init()
|
||||
// functions). SweepAll runs every enabled source in its own goroutine via an
|
||||
// ants worker pool, aggregates their findings, and returns the combined slice.
|
||||
// Deduplication is intentionally NOT performed here — callers use
|
||||
// pkg/recon.Dedup (plan 09-03) on the returned slice.
|
||||
type Engine struct {
|
||||
mu sync.RWMutex
|
||||
sources map[string]ReconSource
|
||||
}
|
||||
|
||||
// NewEngine returns an Engine with an empty source registry.
|
||||
func NewEngine() *Engine {
|
||||
return &Engine{sources: make(map[string]ReconSource)}
|
||||
}
|
||||
|
||||
// Register adds a source, keyed by s.Name(). Registering the same name twice
|
||||
// replaces the previous instance (idempotent for identical sources).
|
||||
func (e *Engine) Register(s ReconSource) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.sources[s.Name()] = s
|
||||
}
|
||||
|
||||
// List returns registered source names in sorted order.
|
||||
func (e *Engine) List() []string {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
names := make([]string, 0, len(e.sources))
|
||||
for n := range e.sources {
|
||||
names = append(names, n)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names
|
||||
}
|
||||
|
||||
// SweepAll fans out to every enabled source in parallel via an ants pool and
|
||||
// returns the aggregated findings. It respects ctx cancellation at the
|
||||
// collection loop. Deduplication is performed by callers.
|
||||
func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) {
|
||||
e.mu.RLock()
|
||||
active := make([]ReconSource, 0, len(e.sources))
|
||||
for _, s := range e.sources {
|
||||
if s.Enabled(cfg) {
|
||||
active = append(active, s)
|
||||
}
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
|
||||
if len(active) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(len(active))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer pool.Release()
|
||||
|
||||
out := make(chan Finding, 256)
|
||||
var wg sync.WaitGroup
|
||||
for _, s := range active {
|
||||
s := s
|
||||
wg.Add(1)
|
||||
if err := pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
_ = s.Sweep(ctx, cfg.Query, out)
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(out)
|
||||
}()
|
||||
|
||||
var all []Finding
|
||||
for {
|
||||
select {
|
||||
case f, ok := <-out:
|
||||
if !ok {
|
||||
return all, nil
|
||||
}
|
||||
all = append(all, f)
|
||||
case <-ctx.Done():
|
||||
// Drain remaining so goroutines don't block on send.
|
||||
go func() {
|
||||
for range out {
|
||||
}
|
||||
}()
|
||||
return all, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user