Closes 2 verification gaps: 1. --sources=github,gitlab flag filters registered sources before sweep 2. Findings persisted to SQLite via storage.SaveFinding after dedup Also adds Engine.Get() method for source lookup by name.
114 lines
2.6 KiB
Go
114 lines
2.6 KiB
Go
package recon
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
)
|
|
|
|
// Engine orchestrates parallel fanout across registered ReconSource implementations.
|
|
//
|
|
// Sources are registered once at startup (typically by phase-specific init()
|
|
// functions). SweepAll runs every enabled source in its own goroutine via an
|
|
// ants worker pool, aggregates their findings, and returns the combined slice.
|
|
// Deduplication is intentionally NOT performed here — callers use
|
|
// pkg/recon.Dedup (plan 09-03) on the returned slice.
|
|
type Engine struct {
|
|
mu sync.RWMutex
|
|
sources map[string]ReconSource
|
|
}
|
|
|
|
// NewEngine returns an Engine with an empty source registry.
|
|
func NewEngine() *Engine {
|
|
return &Engine{sources: make(map[string]ReconSource)}
|
|
}
|
|
|
|
// Register adds a source, keyed by s.Name(). Registering the same name twice
|
|
// replaces the previous instance (idempotent for identical sources).
|
|
func (e *Engine) Register(s ReconSource) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.sources[s.Name()] = s
|
|
}
|
|
|
|
// Get returns a registered source by name and true, or nil and false.
|
|
func (e *Engine) Get(name string) (ReconSource, bool) {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
s, ok := e.sources[name]
|
|
return s, ok
|
|
}
|
|
|
|
// List returns registered source names in sorted order.
|
|
func (e *Engine) List() []string {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
names := make([]string, 0, len(e.sources))
|
|
for n := range e.sources {
|
|
names = append(names, n)
|
|
}
|
|
sort.Strings(names)
|
|
return names
|
|
}
|
|
|
|
// SweepAll fans out to every enabled source in parallel via an ants pool and
|
|
// returns the aggregated findings. It respects ctx cancellation at the
|
|
// collection loop. Deduplication is performed by callers.
|
|
func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) {
|
|
e.mu.RLock()
|
|
active := make([]ReconSource, 0, len(e.sources))
|
|
for _, s := range e.sources {
|
|
if s.Enabled(cfg) {
|
|
active = append(active, s)
|
|
}
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
if len(active) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
pool, err := ants.NewPool(len(active))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer pool.Release()
|
|
|
|
out := make(chan Finding, 256)
|
|
var wg sync.WaitGroup
|
|
for _, s := range active {
|
|
s := s
|
|
wg.Add(1)
|
|
if err := pool.Submit(func() {
|
|
defer wg.Done()
|
|
_ = s.Sweep(ctx, cfg.Query, out)
|
|
}); err != nil {
|
|
wg.Done()
|
|
}
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(out)
|
|
}()
|
|
|
|
var all []Finding
|
|
for {
|
|
select {
|
|
case f, ok := <-out:
|
|
if !ok {
|
|
return all, nil
|
|
}
|
|
all = append(all, f)
|
|
case <-ctx.Done():
|
|
// Drain remaining so goroutines don't block on send.
|
|
go func() {
|
|
for range out {
|
|
}
|
|
}()
|
|
return all, ctx.Err()
|
|
}
|
|
}
|
|
}
|