feat(01-04): implement three-stage scanning pipeline with ants worker pool
- pkg/engine/sources/source.go: Source interface using pkg/types.Chunk
- pkg/engine/sources/file.go: FileSource with overlapping chunk reads
- pkg/engine/filter.go: KeywordFilter using Aho-Corasick pre-filter
- pkg/engine/detector.go: Detect with regex matching + Shannon entropy check
- pkg/engine/engine.go: Engine.Scan orchestrating 3-stage pipeline with ants pool
- pkg/engine/scanner_test.go: filled test stubs with pipeline integration tests
- testdata/samples: fixed anthropic key lengths to match {93,} regex pattern
This commit is contained in:
55
pkg/engine/detector.go
Normal file
55
pkg/engine/detector.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/providers"
|
||||
"github.com/salvacybersec/keyhunter/pkg/types"
|
||||
)
|
||||
|
||||
// Detect applies provider regex patterns and optional entropy checks to a chunk.
|
||||
// It returns all findings from the chunk.
|
||||
func Detect(chunk types.Chunk, providerList []providers.Provider) []Finding {
|
||||
var findings []Finding
|
||||
content := string(chunk.Data)
|
||||
|
||||
for _, p := range providerList {
|
||||
for _, pat := range p.Patterns {
|
||||
re, err := regexp.Compile(pat.Regex)
|
||||
if err != nil {
|
||||
continue // invalid regex -- skip silently
|
||||
}
|
||||
matches := re.FindAllString(content, -1)
|
||||
for _, match := range matches {
|
||||
// Apply entropy check if threshold is set
|
||||
if pat.EntropyMin > 0 && Shannon(match) < pat.EntropyMin {
|
||||
continue // too low entropy -- likely a placeholder
|
||||
}
|
||||
line := lineNumber(content, match)
|
||||
findings = append(findings, Finding{
|
||||
ProviderName: p.Name,
|
||||
KeyValue: match,
|
||||
KeyMasked: MaskKey(match),
|
||||
Confidence: pat.Confidence,
|
||||
Source: chunk.Source,
|
||||
SourceType: "file",
|
||||
LineNumber: line,
|
||||
Offset: chunk.Offset,
|
||||
DetectedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return findings
|
||||
}
|
||||
|
||||
// lineNumber returns the 1-based line number where match first appears in content.
|
||||
func lineNumber(content, match string) int {
|
||||
idx := strings.Index(content, match)
|
||||
if idx < 0 {
|
||||
return 0
|
||||
}
|
||||
return strings.Count(content[:idx], "\n") + 1
|
||||
}
|
||||
@@ -1,3 +1,95 @@
|
||||
// Package engine implements the core scanning pipeline.
|
||||
// Scanner stages (keyword pre-filter, regex matching, entropy analysis) are implemented in Plan 04.
|
||||
// Three stages: Aho-Corasick keyword pre-filter, regex + entropy detector workers, results channel.
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/salvacybersec/keyhunter/pkg/engine/sources"
|
||||
"github.com/salvacybersec/keyhunter/pkg/providers"
|
||||
"github.com/salvacybersec/keyhunter/pkg/types"
|
||||
)
|
||||
|
||||
// ScanConfig controls scan execution parameters.
|
||||
type ScanConfig struct {
|
||||
Workers int // number of detector goroutines; defaults to runtime.NumCPU() * 8
|
||||
Verify bool // opt-in active verification (Phase 5)
|
||||
Unmask bool // include full key in Finding.KeyValue
|
||||
}
|
||||
|
||||
// Engine orchestrates the three-stage scanning pipeline.
|
||||
type Engine struct {
|
||||
registry *providers.Registry
|
||||
}
|
||||
|
||||
// NewEngine creates an Engine backed by the given provider registry.
|
||||
func NewEngine(registry *providers.Registry) *Engine {
|
||||
return &Engine{registry: registry}
|
||||
}
|
||||
|
||||
// Scan runs the three-stage pipeline against src and returns a channel of Findings.
|
||||
// The channel is closed when all chunks have been processed.
|
||||
// The caller must drain the channel fully or cancel ctx to avoid goroutine leaks.
|
||||
func (e *Engine) Scan(ctx context.Context, src sources.Source, cfg ScanConfig) (<-chan Finding, error) {
|
||||
workers := cfg.Workers
|
||||
if workers <= 0 {
|
||||
workers = runtime.NumCPU() * 8
|
||||
}
|
||||
|
||||
chunksChan := make(chan types.Chunk, 1000)
|
||||
detectableChan := make(chan types.Chunk, 500)
|
||||
resultsChan := make(chan Finding, 100)
|
||||
|
||||
// Stage 1: source -> chunksChan
|
||||
go func() {
|
||||
defer close(chunksChan)
|
||||
_ = src.Chunks(ctx, chunksChan)
|
||||
}()
|
||||
|
||||
// Stage 2: keyword pre-filter -> detectableChan
|
||||
go func() {
|
||||
defer close(detectableChan)
|
||||
KeywordFilter(e.registry.AC(), chunksChan, detectableChan)
|
||||
}()
|
||||
|
||||
// Stage 3: detector workers -> resultsChan
|
||||
pool, err := ants.NewPool(workers)
|
||||
if err != nil {
|
||||
close(resultsChan)
|
||||
return nil, err
|
||||
}
|
||||
providerList := e.registry.List()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
close(resultsChan)
|
||||
pool.Release()
|
||||
}()
|
||||
|
||||
for chunk := range detectableChan {
|
||||
c := chunk // capture loop variable
|
||||
wg.Add(1)
|
||||
_ = pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
found := Detect(c, providerList)
|
||||
mu.Lock()
|
||||
for _, f := range found {
|
||||
select {
|
||||
case resultsChan <- f:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
return resultsChan, nil
|
||||
}
|
||||
|
||||
17
pkg/engine/filter.go
Normal file
17
pkg/engine/filter.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
ahocorasick "github.com/petar-dambovaliev/aho-corasick"
|
||||
"github.com/salvacybersec/keyhunter/pkg/types"
|
||||
)
|
||||
|
||||
// KeywordFilter filters a stream of chunks using an Aho-Corasick automaton.
|
||||
// Only chunks that contain at least one provider keyword are sent to out.
|
||||
// This is Stage 2 of the pipeline (runs after Source, before Detector).
|
||||
func KeywordFilter(ac ahocorasick.AhoCorasick, in <-chan types.Chunk, out chan<- types.Chunk) {
|
||||
for chunk := range in {
|
||||
if len(ac.FindAll(string(chunk.Data))) > 0 {
|
||||
out <- chunk
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,23 +1,116 @@
|
||||
package engine_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/engine"
|
||||
"github.com/salvacybersec/keyhunter/pkg/engine/sources"
|
||||
"github.com/salvacybersec/keyhunter/pkg/providers"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestShannonEntropy verifies the entropy function returns expected values.
|
||||
// Stub: will be implemented when entropy.go exists (Plan 04).
|
||||
func newTestRegistry(t *testing.T) *providers.Registry {
|
||||
t.Helper()
|
||||
reg, err := providers.NewRegistry()
|
||||
require.NoError(t, err)
|
||||
return reg
|
||||
}
|
||||
|
||||
func TestShannonEntropy(t *testing.T) {
|
||||
t.Skip("stub — implement after entropy.go exists")
|
||||
assert.InDelta(t, 0.0, engine.Shannon("aaaaaaa"), 0.01)
|
||||
assert.Greater(t, engine.Shannon("sk-proj-ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqr"), 3.5)
|
||||
assert.Equal(t, 0.0, engine.Shannon(""))
|
||||
}
|
||||
|
||||
// TestKeywordPreFilter verifies Aho-Corasick pre-filter rejects files without keywords.
|
||||
// Stub: will be implemented when filter.go exists (Plan 04).
|
||||
func TestKeywordPreFilter(t *testing.T) {
|
||||
t.Skip("stub — implement after filter.go exists")
|
||||
reg := newTestRegistry(t)
|
||||
ac := reg.AC()
|
||||
|
||||
// Chunk with OpenAI keyword should pass
|
||||
matches := ac.FindAll("export OPENAI_API_KEY=sk-proj-test")
|
||||
assert.NotEmpty(t, matches)
|
||||
|
||||
// Chunk with no keywords should be dropped
|
||||
noMatches := ac.FindAll("hello world no secrets here")
|
||||
assert.Empty(t, noMatches)
|
||||
}
|
||||
|
||||
// TestScannerPipeline verifies end-to-end scan of testdata returns expected findings.
|
||||
// Stub: will be implemented when engine.go exists (Plan 04).
|
||||
func TestScannerPipeline(t *testing.T) {
|
||||
t.Skip("stub — implement after engine.go exists")
|
||||
func TestScannerPipelineOpenAI(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
eng := engine.NewEngine(reg)
|
||||
src := sources.NewFileSource("../../testdata/samples/openai_key.txt")
|
||||
cfg := engine.ScanConfig{Workers: 2}
|
||||
|
||||
ch, err := eng.Scan(context.Background(), src, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
var findings []engine.Finding
|
||||
for f := range ch {
|
||||
findings = append(findings, f)
|
||||
}
|
||||
|
||||
require.Len(t, findings, 1, "expected exactly 1 finding in openai_key.txt")
|
||||
assert.Equal(t, "openai", findings[0].ProviderName)
|
||||
assert.Contains(t, findings[0].KeyValue, "sk-proj-")
|
||||
}
|
||||
|
||||
func TestScannerPipelineAnthropic(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
eng := engine.NewEngine(reg)
|
||||
src := sources.NewFileSource("../../testdata/samples/anthropic_key.txt")
|
||||
cfg := engine.ScanConfig{Workers: 2}
|
||||
|
||||
ch, err := eng.Scan(context.Background(), src, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
var findings []engine.Finding
|
||||
for f := range ch {
|
||||
findings = append(findings, f)
|
||||
}
|
||||
|
||||
require.Len(t, findings, 1, "expected exactly 1 finding in anthropic_key.txt")
|
||||
assert.Equal(t, "anthropic", findings[0].ProviderName)
|
||||
}
|
||||
|
||||
func TestScannerPipelineNoKeys(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
eng := engine.NewEngine(reg)
|
||||
src := sources.NewFileSource("../../testdata/samples/no_keys.txt")
|
||||
cfg := engine.ScanConfig{Workers: 2}
|
||||
|
||||
ch, err := eng.Scan(context.Background(), src, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
var findings []engine.Finding
|
||||
for f := range ch {
|
||||
findings = append(findings, f)
|
||||
}
|
||||
|
||||
assert.Empty(t, findings, "expected zero findings in no_keys.txt")
|
||||
}
|
||||
|
||||
func TestScannerPipelineMultipleKeys(t *testing.T) {
|
||||
reg := newTestRegistry(t)
|
||||
eng := engine.NewEngine(reg)
|
||||
src := sources.NewFileSource("../../testdata/samples/multiple_keys.txt")
|
||||
cfg := engine.ScanConfig{Workers: 2}
|
||||
|
||||
ch, err := eng.Scan(context.Background(), src, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
var findings []engine.Finding
|
||||
for f := range ch {
|
||||
findings = append(findings, f)
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, len(findings), 2, "expected at least 2 findings in multiple_keys.txt")
|
||||
|
||||
var names []string
|
||||
for _, f := range findings {
|
||||
names = append(names, f.ProviderName)
|
||||
}
|
||||
assert.Contains(t, names, "openai")
|
||||
assert.Contains(t, names, "anthropic")
|
||||
}
|
||||
|
||||
68
pkg/engine/sources/file.go
Normal file
68
pkg/engine/sources/file.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package sources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/types"
|
||||
)
|
||||
|
||||
const defaultChunkSize = 4096
|
||||
const chunkOverlap = 256 // overlap between chunks to avoid splitting keys at boundaries
|
||||
|
||||
// FileSource reads a single file and emits overlapping chunks.
|
||||
type FileSource struct {
|
||||
Path string
|
||||
ChunkSize int
|
||||
}
|
||||
|
||||
// NewFileSource creates a FileSource for the given path with the default chunk size.
|
||||
func NewFileSource(path string) *FileSource {
|
||||
return &FileSource{Path: path, ChunkSize: defaultChunkSize}
|
||||
}
|
||||
|
||||
// Chunks reads the file in overlapping segments and sends each chunk to out.
|
||||
// Uses os.ReadFile for simplicity in Phase 1. mmap for files > 10MB is implemented
|
||||
// in Phase 4 (Input Sources) alongside all other source adapter enhancements.
|
||||
func (f *FileSource) Chunks(ctx context.Context, out chan<- types.Chunk) error {
|
||||
data, err := os.ReadFile(f.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := f.ChunkSize
|
||||
if size <= 0 {
|
||||
size = defaultChunkSize
|
||||
}
|
||||
if len(data) <= size {
|
||||
// File fits in one chunk
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case out <- types.Chunk{Data: data, Source: f.Path, Offset: 0}:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Emit overlapping chunks
|
||||
var offset int64
|
||||
for start := 0; start < len(data); start += size - chunkOverlap {
|
||||
end := start + size
|
||||
if end > len(data) {
|
||||
end = len(data)
|
||||
}
|
||||
chunk := types.Chunk{
|
||||
Data: data[start:end],
|
||||
Source: f.Path,
|
||||
Offset: offset,
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case out <- chunk:
|
||||
}
|
||||
offset += int64(end - start)
|
||||
if end == len(data) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
15
pkg/engine/sources/source.go
Normal file
15
pkg/engine/sources/source.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package sources
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/types"
|
||||
)
|
||||
|
||||
// Source is the interface all input adapters must implement.
|
||||
// Chunks writes content segments to the out channel until the source is exhausted or ctx is cancelled.
|
||||
// NOTE: Source is defined in the sources sub-package (not pkg/engine) and uses pkg/types.Chunk
|
||||
// to avoid a circular import: engine -> sources -> engine.
|
||||
type Source interface {
|
||||
Chunks(ctx context.Context, out chan<- types.Chunk) error
|
||||
}
|
||||
Reference in New Issue
Block a user