diff --git a/pkg/recon/engine.go b/pkg/recon/engine.go new file mode 100644 index 0000000..67fb42c --- /dev/null +++ b/pkg/recon/engine.go @@ -0,0 +1,105 @@ +package recon + +import ( + "context" + "sort" + "sync" + + "github.com/panjf2000/ants/v2" +) + +// Engine orchestrates parallel fanout across registered ReconSource implementations. +// +// Sources are registered once at startup (typically by phase-specific init() +// functions). SweepAll runs every enabled source in its own goroutine via an +// ants worker pool, aggregates their findings, and returns the combined slice. +// Deduplication is intentionally NOT performed here — callers use +// pkg/recon.Dedup (plan 09-03) on the returned slice. +type Engine struct { + mu sync.RWMutex + sources map[string]ReconSource +} + +// NewEngine returns an Engine with an empty source registry. +func NewEngine() *Engine { + return &Engine{sources: make(map[string]ReconSource)} +} + +// Register adds a source, keyed by s.Name(). Registering the same name twice +// replaces the previous instance (idempotent for identical sources). +func (e *Engine) Register(s ReconSource) { + e.mu.Lock() + defer e.mu.Unlock() + e.sources[s.Name()] = s +} + +// List returns registered source names in sorted order. +func (e *Engine) List() []string { + e.mu.RLock() + defer e.mu.RUnlock() + names := make([]string, 0, len(e.sources)) + for n := range e.sources { + names = append(names, n) + } + sort.Strings(names) + return names +} + +// SweepAll fans out to every enabled source in parallel via an ants pool and +// returns the aggregated findings. It respects ctx cancellation at the +// collection loop. Deduplication is performed by callers. +func (e *Engine) SweepAll(ctx context.Context, cfg Config) ([]Finding, error) { + e.mu.RLock() + active := make([]ReconSource, 0, len(e.sources)) + for _, s := range e.sources { + if s.Enabled(cfg) { + active = append(active, s) + } + } + e.mu.RUnlock() + + if len(active) == 0 { + return nil, nil + } + + pool, err := ants.NewPool(len(active)) + if err != nil { + return nil, err + } + defer pool.Release() + + out := make(chan Finding, 256) + var wg sync.WaitGroup + for _, s := range active { + s := s + wg.Add(1) + if err := pool.Submit(func() { + defer wg.Done() + _ = s.Sweep(ctx, cfg.Query, out) + }); err != nil { + wg.Done() + } + } + go func() { + wg.Wait() + close(out) + }() + + var all []Finding + for { + select { + case f, ok := <-out: + if !ok { + return all, nil + } + all = append(all, f) + case <-ctx.Done(): + // Drain remaining so goroutines don't block on send. + go func() { + for range out { + } + }() + return all, ctx.Err() + } + } +} diff --git a/pkg/recon/engine_test.go b/pkg/recon/engine_test.go new file mode 100644 index 0000000..4eb4ad5 --- /dev/null +++ b/pkg/recon/engine_test.go @@ -0,0 +1,57 @@ +package recon + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRegisterList(t *testing.T) { + e := NewEngine() + require.Empty(t, e.List()) + + e.Register(ExampleSource{}) + require.Equal(t, []string{"example"}, e.List()) + + // Re-registering the same name is idempotent. + e.Register(ExampleSource{}) + require.Equal(t, []string{"example"}, e.List()) +} + +func TestSweepAll(t *testing.T) { + e := NewEngine() + e.Register(ExampleSource{}) + + findings, err := e.SweepAll(context.Background(), Config{Query: "ignored"}) + require.NoError(t, err) + require.Len(t, findings, 2, "ExampleSource should emit exactly 2 deterministic findings") + + for _, f := range findings { + require.Equal(t, "recon:example", f.SourceType) + require.NotEmpty(t, f.ProviderName) + require.NotEmpty(t, f.KeyMasked) + require.NotEmpty(t, f.Source) + } +} + +func TestSweepAll_NoSources(t *testing.T) { + e := NewEngine() + findings, err := e.SweepAll(context.Background(), Config{}) + require.NoError(t, err) + require.Nil(t, findings) +} + +// disabledSource tests that Enabled() filtering works. +type disabledSource struct{ ExampleSource } + +func (disabledSource) Name() string { return "disabled" } +func (disabledSource) Enabled(Config) bool { return false } + +func TestSweepAll_FiltersDisabled(t *testing.T) { + e := NewEngine() + e.Register(disabledSource{}) + findings, err := e.SweepAll(context.Background(), Config{}) + require.NoError(t, err) + require.Empty(t, findings) +} diff --git a/pkg/recon/example.go b/pkg/recon/example.go new file mode 100644 index 0000000..7c5940b --- /dev/null +++ b/pkg/recon/example.go @@ -0,0 +1,61 @@ +package recon + +import ( + "context" + "time" + + "golang.org/x/time/rate" +) + +// ExampleSource is a deterministic stub ReconSource used to prove the Engine +// fanout pipeline end-to-end. Phases 10-16 replace this with real sources +// (Shodan, GitHub, Pastebin, ...). +type ExampleSource struct{} + +// Name returns the stable identifier used for SourceType routing. +func (ExampleSource) Name() string { return "example" } + +// RateLimit returns a conservative 10 req/s (only matters once wired to a +// LimiterRegistry in plan 09-02; ExampleSource does no real network I/O). +func (ExampleSource) RateLimit() rate.Limit { return rate.Limit(10) } + +// Burst returns the token bucket burst size. +func (ExampleSource) Burst() int { return 1 } + +// RespectsRobots reports false: this stub performs no HTTP fetches. +func (ExampleSource) RespectsRobots() bool { return false } + +// Enabled always returns true so the stub participates in SweepAll by default. +func (ExampleSource) Enabled(_ Config) bool { return true } + +// Sweep emits two deterministic fake Findings tagged with SourceType +// "recon:example". The query argument is intentionally ignored. +func (ExampleSource) Sweep(ctx context.Context, _ string, out chan<- Finding) error { + now := time.Now() + fakes := []Finding{ + { + ProviderName: "openai", + KeyMasked: "sk-examp...AAAA", + Confidence: "low", + Source: "https://example.invalid/a", + SourceType: "recon:example", + DetectedAt: now, + }, + { + ProviderName: "anthropic", + KeyMasked: "sk-ant-e...BBBB", + Confidence: "low", + Source: "https://example.invalid/b", + SourceType: "recon:example", + DetectedAt: now, + }, + } + for _, f := range fakes { + select { + case out <- f: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +}