Files
keyhunter/pkg/recon/engine.go
salvacybersec 851b2432b8 feat(09-01): add Engine with parallel fanout and ExampleSource
- Engine.Register/List/SweepAll with ants pool fanout
- ExampleSource emits two deterministic findings (SourceType=recon:example)
- Tests cover Register/List idempotency, SweepAll aggregation, empty-registry,
  and Enabled() filtering
2026-04-06 00:42:51 +03:00

106 lines
2.4 KiB
Go

package recon
import (
"context"
"sort"
"sync"
"github.com/panjf2000/ants/v2"
)
// Engine orchestrates parallel fanout across registered ReconSource implementations.
//
// Sources are registered once at startup (typically by phase-specific init()
// functions). SweepAll runs every enabled source in its own goroutine via an
// ants worker pool, aggregates their findings, and returns the combined slice.
// Deduplication is intentionally NOT performed here — callers use
// pkg/recon.Dedup (plan 09-03) on the returned slice.
type Engine struct {
mu sync.RWMutex
sources map[string]ReconSource
}
// NewEngine returns an Engine with an empty source registry.
func NewEngine() *Engine {
return &Engine{sources: make(map[string]ReconSource)}
}
// Register adds a source, keyed by s.Name(). Registering the same name twice
// replaces the previous instance (idempotent for identical sources).
func (e *Engine) Register(s ReconSource) {
e.mu.Lock()
defer e.mu.Unlock()
e.sources[s.Name()] = s
}
// List returns registered source names in sorted order.
func (e *Engine) List() []string {
e.mu.RLock()
defer e.mu.RUnlock()
names := make([]string, 0, len(e.sources))
for n := range e.sources {
names = append(names, n)
}
sort.Strings(names)
return names
}
// SweepAll fans out to every enabled source in parallel via an ants pool and
// returns the aggregated findings. It respects ctx cancellation at the
// collection loop. Deduplication is performed by callers.
func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) {
e.mu.RLock()
active := make([]ReconSource, 0, len(e.sources))
for _, s := range e.sources {
if s.Enabled(cfg) {
active = append(active, s)
}
}
e.mu.RUnlock()
if len(active) == 0 {
return nil, nil
}
pool, err := ants.NewPool(len(active))
if err != nil {
return nil, err
}
defer pool.Release()
out := make(chan Finding, 256)
var wg sync.WaitGroup
for _, s := range active {
s := s
wg.Add(1)
if err := pool.Submit(func() {
defer wg.Done()
_ = s.Sweep(ctx, cfg.Query, out)
}); err != nil {
wg.Done()
}
}
go func() {
wg.Wait()
close(out)
}()
var all []Finding
for {
select {
case f, ok := <-out:
if !ok {
return all, nil
}
all = append(all, f)
case <-ctx.Done():
// Drain remaining so goroutines don't block on send.
go func() {
for range out {
}
}()
return all, ctx.Err()
}
}
}