merge: phase 17 wave 3 CLI wiring

This commit is contained in:
salvacybersec
2026-04-06 17:48:25 +03:00
11 changed files with 958 additions and 199 deletions

View File

@@ -1,179 +1,170 @@
// Package scheduler implements cron-based recurring scan scheduling for KeyHunter.
// It uses gocron v2 for job management and delegates scan execution to the engine.
package scheduler
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/salvacybersec/keyhunter/pkg/engine"
"github.com/salvacybersec/keyhunter/pkg/storage"
)
// Config holds the dependencies for a Scheduler.
type Config struct {
// DB is the storage backend for persisting jobs and subscribers.
DB *storage.DB
// ScanFunc executes a scan command and returns the finding count.
ScanFunc func(ctx context.Context, scanCommand string) (int, error)
// OnComplete is called after a job finishes. May be nil.
OnComplete func(result JobResult)
}
// Scheduler wraps gocron with SQLite persistence for scheduled scan jobs.
// Scheduler manages recurring scan jobs backed by the database.
type Scheduler struct {
cfg Config
sched gocron.Scheduler
jobs map[string]gocron.Job
mu sync.Mutex
cron gocron.Scheduler
engine *engine.Engine
db *storage.DB
encKey []byte
mu sync.Mutex
jobs map[int64]gocron.Job // DB job ID -> gocron job
// OnFindings is called when a scheduled scan produces findings.
// The caller can wire this to Telegram notifications.
OnFindings func(jobName string, findings []engine.Finding)
}
// New creates a new Scheduler with the given configuration.
func New(cfg Config) (*Scheduler, error) {
s, err := gocron.NewScheduler()
// Deps bundles the dependencies for creating a Scheduler.
type Deps struct {
Engine *engine.Engine
DB *storage.DB
EncKey []byte
}
// New creates a new Scheduler. Call Start() to begin processing jobs.
func New(deps Deps) (*Scheduler, error) {
cron, err := gocron.NewScheduler()
if err != nil {
return nil, fmt.Errorf("creating gocron scheduler: %w", err)
}
return &Scheduler{
cfg: cfg,
sched: s,
jobs: make(map[string]gocron.Job),
cron: cron,
engine: deps.Engine,
db: deps.DB,
encKey: deps.EncKey,
jobs: make(map[int64]gocron.Job),
}, nil
}
// Start loads all enabled jobs from the database and begins scheduling.
func (s *Scheduler) Start(ctx context.Context) error {
jobs, err := s.cfg.DB.ListScheduledJobs()
// LoadAndStart loads all enabled jobs from the database, registers them
// with gocron, and starts the scheduler.
func (s *Scheduler) LoadAndStart() error {
jobs, err := s.db.ListEnabledScheduledJobs()
if err != nil {
return fmt.Errorf("loading scheduled jobs: %w", err)
}
for _, sj := range jobs {
if !sj.Enabled {
continue
}
if err := s.registerJob(ctx, sj); err != nil {
return fmt.Errorf("registering job %q: %w", sj.Name, err)
for _, j := range jobs {
if err := s.registerJob(j); err != nil {
log.Printf("scheduler: failed to register job %d (%s): %v", j.ID, j.Name, err)
}
}
s.sched.Start()
s.cron.Start()
return nil
}
// Stop shuts down the gocron scheduler.
// Stop shuts down the scheduler gracefully.
func (s *Scheduler) Stop() error {
return s.sched.Shutdown()
return s.cron.Shutdown()
}
// AddJob creates a new scheduled job, persists it, and registers it with gocron.
func (s *Scheduler) AddJob(name, cronExpr, scanCommand string, notifyTelegram bool) error {
sj := storage.ScheduledJob{
Name: name,
CronExpr: cronExpr,
ScanCommand: scanCommand,
NotifyTelegram: notifyTelegram,
Enabled: true,
}
if _, err := s.cfg.DB.SaveScheduledJob(sj); err != nil {
return fmt.Errorf("saving job %q: %w", name, err)
}
return s.registerJob(context.Background(), sj)
// AddJob registers a new job from a storage.ScheduledJob and adds it to the
// running scheduler.
func (s *Scheduler) AddJob(job storage.ScheduledJob) error {
return s.registerJob(job)
}
// RemoveJob removes a job from gocron and deletes it from the database.
func (s *Scheduler) RemoveJob(name string) error {
s.mu.Lock()
j, ok := s.jobs[name]
if ok {
delete(s.jobs, name)
}
s.mu.Unlock()
if ok {
if err := s.sched.RemoveJob(j.ID()); err != nil {
return fmt.Errorf("removing gocron job %q: %w", name, err)
}
}
if _, err := s.cfg.DB.DeleteScheduledJob(name); err != nil {
return fmt.Errorf("deleting job %q from DB: %w", name, err)
}
return nil
}
// ListJobs returns all scheduled jobs from the database.
func (s *Scheduler) ListJobs() ([]storage.ScheduledJob, error) {
return s.cfg.DB.ListScheduledJobs()
}
// RunJob manually triggers a job by name. Looks up the job in the DB,
// runs ScanFunc, updates last_run, and calls OnComplete.
func (s *Scheduler) RunJob(ctx context.Context, name string) (JobResult, error) {
sj, err := s.cfg.DB.GetScheduledJob(name)
if err != nil {
return JobResult{}, fmt.Errorf("getting job %q: %w", name, err)
}
start := time.Now()
findings, scanErr := s.cfg.ScanFunc(ctx, sj.ScanCommand)
dur := time.Since(start)
result := JobResult{
JobName: name,
FindingCount: findings,
Duration: dur,
Error: scanErr,
}
// Update last_run regardless of error
now := time.Now().UTC()
_ = s.cfg.DB.UpdateJobLastRun(name, now, nil)
if s.cfg.OnComplete != nil {
s.cfg.OnComplete(result)
}
return result, nil
}
// JobCount returns the number of registered gocron jobs.
func (s *Scheduler) JobCount() int {
// RemoveJob removes a job from the running scheduler by its DB ID.
func (s *Scheduler) RemoveJob(id int64) {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.jobs)
if j, ok := s.jobs[id]; ok {
s.cron.RemoveJob(j.ID())
delete(s.jobs, id)
}
}
// registerJob creates a gocron cron job and stores the handle.
func (s *Scheduler) registerJob(ctx context.Context, sj storage.ScheduledJob) error {
jobName := sj.Name
scanCmd := sj.ScanCommand
notify := sj.NotifyTelegram
// RunNow executes a job immediately (outside of its cron schedule).
func (s *Scheduler) RunNow(job storage.ScheduledJob) ([]engine.Finding, error) {
return s.executeScan(job)
}
j, err := s.sched.NewJob(
gocron.CronJob(sj.CronExpr, false),
// registerJob adds a single scheduled job to gocron.
func (s *Scheduler) registerJob(job storage.ScheduledJob) error {
jobCopy := job // capture for closure
cronJob, err := s.cron.NewJob(
gocron.CronJob(job.CronExpr, false),
gocron.NewTask(func() {
start := time.Now()
findings, scanErr := s.cfg.ScanFunc(ctx, scanCmd)
dur := time.Since(start)
now := time.Now().UTC()
_ = s.cfg.DB.UpdateJobLastRun(jobName, now, nil)
if notify && s.cfg.OnComplete != nil {
s.cfg.OnComplete(JobResult{
JobName: jobName,
FindingCount: findings,
Duration: dur,
Error: scanErr,
})
findings, err := s.executeScan(jobCopy)
if err != nil {
log.Printf("scheduler: job %d (%s) failed: %v", jobCopy.ID, jobCopy.Name, err)
return
}
// Update last run time in DB.
if err := s.db.UpdateJobLastRun(jobCopy.ID, time.Now()); err != nil {
log.Printf("scheduler: failed to update last_run for job %d: %v", jobCopy.ID, err)
}
if len(findings) > 0 && jobCopy.Notify && s.OnFindings != nil {
s.OnFindings(jobCopy.Name, findings)
}
}),
)
if err != nil {
return fmt.Errorf("creating gocron job: %w", err)
return fmt.Errorf("registering cron job %q (%s): %w", job.Name, job.CronExpr, err)
}
s.mu.Lock()
s.jobs[sj.Name] = j
s.jobs[job.ID] = cronJob
s.mu.Unlock()
return nil
}
// executeScan runs a scan against the job's configured path and persists findings.
func (s *Scheduler) executeScan(job storage.ScheduledJob) ([]engine.Finding, error) {
src, err := selectSchedulerSource(job.ScanPath)
if err != nil {
return nil, fmt.Errorf("selecting source for %q: %w", job.ScanPath, err)
}
cfg := engine.ScanConfig{
Workers: 0, // auto
Verify: false,
Unmask: false,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
ch, err := s.engine.Scan(ctx, src, cfg)
if err != nil {
return nil, fmt.Errorf("starting scan: %w", err)
}
var findings []engine.Finding
for f := range ch {
findings = append(findings, f)
}
// Persist findings.
for _, f := range findings {
sf := storage.Finding{
ProviderName: f.ProviderName,
KeyValue: f.KeyValue,
KeyMasked: f.KeyMasked,
Confidence: f.Confidence,
SourcePath: f.Source,
SourceType: f.SourceType,
LineNumber: f.LineNumber,
}
if _, err := s.db.SaveFinding(sf, s.encKey); err != nil {
log.Printf("scheduler: failed to save finding: %v", err)
}
}
return findings, nil
}

21
pkg/scheduler/source.go Normal file
View File

@@ -0,0 +1,21 @@
package scheduler
import (
"fmt"
"os"
"github.com/salvacybersec/keyhunter/pkg/engine/sources"
)
// selectSchedulerSource returns the appropriate Source for a scheduled scan path.
// Only file and directory paths are supported (same as bot scans).
func selectSchedulerSource(path string) (sources.Source, error) {
info, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("stat %q: %w", path, err)
}
if info.IsDir() {
return sources.NewDirSource(path), nil
}
return sources.NewFileSource(path), nil
}