- pkg/engine/sources/source.go: Source interface using pkg/types.Chunk
- pkg/engine/sources/file.go: FileSource with overlapping chunk reads
- pkg/engine/filter.go: KeywordFilter using Aho-Corasick pre-filter
- pkg/engine/detector.go: Detect with regex matching + Shannon entropy check
- pkg/engine/engine.go: Engine.Scan orchestrating 3-stage pipeline with ants pool
- pkg/engine/scanner_test.go: filled test stubs with pipeline integration tests
- testdata/samples: fixed anthropic key lengths to match {93,} regex pattern
96 lines
2.4 KiB
Go
96 lines
2.4 KiB
Go
// Package engine implements the core scanning pipeline.
|
|
// Three stages: Aho-Corasick keyword pre-filter, regex + entropy detector workers, results channel.
|
|
package engine
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"sync"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
"github.com/salvacybersec/keyhunter/pkg/engine/sources"
|
|
"github.com/salvacybersec/keyhunter/pkg/providers"
|
|
"github.com/salvacybersec/keyhunter/pkg/types"
|
|
)
|
|
|
|
// ScanConfig controls scan execution parameters.
|
|
type ScanConfig struct {
|
|
Workers int // number of detector goroutines; defaults to runtime.NumCPU() * 8
|
|
Verify bool // opt-in active verification (Phase 5)
|
|
Unmask bool // include full key in Finding.KeyValue
|
|
}
|
|
|
|
// Engine orchestrates the three-stage scanning pipeline.
|
|
type Engine struct {
|
|
registry *providers.Registry
|
|
}
|
|
|
|
// NewEngine creates an Engine backed by the given provider registry.
|
|
func NewEngine(registry *providers.Registry) *Engine {
|
|
return &Engine{registry: registry}
|
|
}
|
|
|
|
// Scan runs the three-stage pipeline against src and returns a channel of Findings.
|
|
// The channel is closed when all chunks have been processed.
|
|
// The caller must drain the channel fully or cancel ctx to avoid goroutine leaks.
|
|
func (e *Engine) Scan(ctx context.Context, src sources.Source, cfg ScanConfig) (<-chan Finding, error) {
|
|
workers := cfg.Workers
|
|
if workers <= 0 {
|
|
workers = runtime.NumCPU() * 8
|
|
}
|
|
|
|
chunksChan := make(chan types.Chunk, 1000)
|
|
detectableChan := make(chan types.Chunk, 500)
|
|
resultsChan := make(chan Finding, 100)
|
|
|
|
// Stage 1: source -> chunksChan
|
|
go func() {
|
|
defer close(chunksChan)
|
|
_ = src.Chunks(ctx, chunksChan)
|
|
}()
|
|
|
|
// Stage 2: keyword pre-filter -> detectableChan
|
|
go func() {
|
|
defer close(detectableChan)
|
|
KeywordFilter(e.registry.AC(), chunksChan, detectableChan)
|
|
}()
|
|
|
|
// Stage 3: detector workers -> resultsChan
|
|
pool, err := ants.NewPool(workers)
|
|
if err != nil {
|
|
close(resultsChan)
|
|
return nil, err
|
|
}
|
|
providerList := e.registry.List()
|
|
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
|
|
go func() {
|
|
defer func() {
|
|
wg.Wait()
|
|
close(resultsChan)
|
|
pool.Release()
|
|
}()
|
|
|
|
for chunk := range detectableChan {
|
|
c := chunk // capture loop variable
|
|
wg.Add(1)
|
|
_ = pool.Submit(func() {
|
|
defer wg.Done()
|
|
found := Detect(c, providerList)
|
|
mu.Lock()
|
|
for _, f := range found {
|
|
select {
|
|
case resultsChan <- f:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
})
|
|
}
|
|
}()
|
|
|
|
return resultsChan, nil
|
|
}
|