feat(17-05): implement serve and schedule commands replacing stubs
- cmd/serve.go: starts scheduler, optionally starts Telegram bot with --telegram flag - cmd/schedule.go: add/list/remove/run subcommands for scheduled scan job CRUD - pkg/scheduler/: gocron v2 based scheduler with DB-backed jobs and scan execution - pkg/storage/scheduled_jobs.go: scheduled_jobs table CRUD with tests - Remove serve and schedule stubs from cmd/stubs.go Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
170
pkg/scheduler/scheduler.go
Normal file
170
pkg/scheduler/scheduler.go
Normal file
@@ -0,0 +1,170 @@
|
||||
// Package scheduler implements cron-based recurring scan scheduling for KeyHunter.
|
||||
// It uses gocron v2 for job management and delegates scan execution to the engine.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/engine"
|
||||
"github.com/salvacybersec/keyhunter/pkg/storage"
|
||||
)
|
||||
|
||||
// Scheduler manages recurring scan jobs backed by the database.
|
||||
type Scheduler struct {
|
||||
cron gocron.Scheduler
|
||||
engine *engine.Engine
|
||||
db *storage.DB
|
||||
encKey []byte
|
||||
|
||||
mu sync.Mutex
|
||||
jobs map[int64]gocron.Job // DB job ID -> gocron job
|
||||
|
||||
// OnFindings is called when a scheduled scan produces findings.
|
||||
// The caller can wire this to Telegram notifications.
|
||||
OnFindings func(jobName string, findings []engine.Finding)
|
||||
}
|
||||
|
||||
// Deps bundles the dependencies for creating a Scheduler.
|
||||
type Deps struct {
|
||||
Engine *engine.Engine
|
||||
DB *storage.DB
|
||||
EncKey []byte
|
||||
}
|
||||
|
||||
// New creates a new Scheduler. Call Start() to begin processing jobs.
|
||||
func New(deps Deps) (*Scheduler, error) {
|
||||
cron, err := gocron.NewScheduler()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating gocron scheduler: %w", err)
|
||||
}
|
||||
return &Scheduler{
|
||||
cron: cron,
|
||||
engine: deps.Engine,
|
||||
db: deps.DB,
|
||||
encKey: deps.EncKey,
|
||||
jobs: make(map[int64]gocron.Job),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// LoadAndStart loads all enabled jobs from the database, registers them
|
||||
// with gocron, and starts the scheduler.
|
||||
func (s *Scheduler) LoadAndStart() error {
|
||||
jobs, err := s.db.ListEnabledScheduledJobs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("loading scheduled jobs: %w", err)
|
||||
}
|
||||
for _, j := range jobs {
|
||||
if err := s.registerJob(j); err != nil {
|
||||
log.Printf("scheduler: failed to register job %d (%s): %v", j.ID, j.Name, err)
|
||||
}
|
||||
}
|
||||
s.cron.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop shuts down the scheduler gracefully.
|
||||
func (s *Scheduler) Stop() error {
|
||||
return s.cron.Shutdown()
|
||||
}
|
||||
|
||||
// AddJob registers a new job from a storage.ScheduledJob and adds it to the
|
||||
// running scheduler.
|
||||
func (s *Scheduler) AddJob(job storage.ScheduledJob) error {
|
||||
return s.registerJob(job)
|
||||
}
|
||||
|
||||
// RemoveJob removes a job from the running scheduler by its DB ID.
|
||||
func (s *Scheduler) RemoveJob(id int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if j, ok := s.jobs[id]; ok {
|
||||
s.cron.RemoveJob(j.ID())
|
||||
delete(s.jobs, id)
|
||||
}
|
||||
}
|
||||
|
||||
// RunNow executes a job immediately (outside of its cron schedule).
|
||||
func (s *Scheduler) RunNow(job storage.ScheduledJob) ([]engine.Finding, error) {
|
||||
return s.executeScan(job)
|
||||
}
|
||||
|
||||
// registerJob adds a single scheduled job to gocron.
|
||||
func (s *Scheduler) registerJob(job storage.ScheduledJob) error {
|
||||
jobCopy := job // capture for closure
|
||||
cronJob, err := s.cron.NewJob(
|
||||
gocron.CronJob(job.CronExpr, false),
|
||||
gocron.NewTask(func() {
|
||||
findings, err := s.executeScan(jobCopy)
|
||||
if err != nil {
|
||||
log.Printf("scheduler: job %d (%s) failed: %v", jobCopy.ID, jobCopy.Name, err)
|
||||
return
|
||||
}
|
||||
// Update last run time in DB.
|
||||
if err := s.db.UpdateJobLastRun(jobCopy.ID, time.Now()); err != nil {
|
||||
log.Printf("scheduler: failed to update last_run for job %d: %v", jobCopy.ID, err)
|
||||
}
|
||||
if len(findings) > 0 && jobCopy.Notify && s.OnFindings != nil {
|
||||
s.OnFindings(jobCopy.Name, findings)
|
||||
}
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("registering cron job %q (%s): %w", job.Name, job.CronExpr, err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.jobs[job.ID] = cronJob
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// executeScan runs a scan against the job's configured path and persists findings.
|
||||
func (s *Scheduler) executeScan(job storage.ScheduledJob) ([]engine.Finding, error) {
|
||||
src, err := selectSchedulerSource(job.ScanPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("selecting source for %q: %w", job.ScanPath, err)
|
||||
}
|
||||
|
||||
cfg := engine.ScanConfig{
|
||||
Workers: 0, // auto
|
||||
Verify: false,
|
||||
Unmask: false,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
ch, err := s.engine.Scan(ctx, src, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("starting scan: %w", err)
|
||||
}
|
||||
|
||||
var findings []engine.Finding
|
||||
for f := range ch {
|
||||
findings = append(findings, f)
|
||||
}
|
||||
|
||||
// Persist findings.
|
||||
for _, f := range findings {
|
||||
sf := storage.Finding{
|
||||
ProviderName: f.ProviderName,
|
||||
KeyValue: f.KeyValue,
|
||||
KeyMasked: f.KeyMasked,
|
||||
Confidence: f.Confidence,
|
||||
SourcePath: f.Source,
|
||||
SourceType: f.SourceType,
|
||||
LineNumber: f.LineNumber,
|
||||
}
|
||||
if _, err := s.db.SaveFinding(sf, s.encKey); err != nil {
|
||||
log.Printf("scheduler: failed to save finding: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return findings, nil
|
||||
}
|
||||
21
pkg/scheduler/source.go
Normal file
21
pkg/scheduler/source.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/salvacybersec/keyhunter/pkg/engine/sources"
|
||||
)
|
||||
|
||||
// selectSchedulerSource returns the appropriate Source for a scheduled scan path.
|
||||
// Only file and directory paths are supported (same as bot scans).
|
||||
func selectSchedulerSource(path string) (sources.Source, error) {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat %q: %w", path, err)
|
||||
}
|
||||
if info.IsDir() {
|
||||
return sources.NewDirSource(path), nil
|
||||
}
|
||||
return sources.NewFileSource(path), nil
|
||||
}
|
||||
164
pkg/storage/scheduled_jobs.go
Normal file
164
pkg/storage/scheduled_jobs.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ScheduledJob represents a cron-based recurring scan job.
|
||||
type ScheduledJob struct {
|
||||
ID int64
|
||||
Name string
|
||||
CronExpr string
|
||||
ScanPath string
|
||||
Enabled bool
|
||||
Notify bool
|
||||
LastRunAt *time.Time
|
||||
NextRunAt *time.Time
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// SaveScheduledJob inserts a new scheduled job and returns its ID.
|
||||
func (db *DB) SaveScheduledJob(job ScheduledJob) (int64, error) {
|
||||
enabledInt := 0
|
||||
if job.Enabled {
|
||||
enabledInt = 1
|
||||
}
|
||||
notifyInt := 0
|
||||
if job.Notify {
|
||||
notifyInt = 1
|
||||
}
|
||||
|
||||
res, err := db.sql.Exec(
|
||||
`INSERT INTO scheduled_jobs (name, cron_expr, scan_path, enabled, notify)
|
||||
VALUES (?, ?, ?, ?, ?)`,
|
||||
job.Name, job.CronExpr, job.ScanPath, enabledInt, notifyInt,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("inserting scheduled job: %w", err)
|
||||
}
|
||||
return res.LastInsertId()
|
||||
}
|
||||
|
||||
// ListScheduledJobs returns all scheduled jobs ordered by creation time.
|
||||
func (db *DB) ListScheduledJobs() ([]ScheduledJob, error) {
|
||||
rows, err := db.sql.Query(
|
||||
`SELECT id, name, cron_expr, scan_path, enabled, notify, last_run_at, next_run_at, created_at
|
||||
FROM scheduled_jobs ORDER BY created_at ASC`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying scheduled jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []ScheduledJob
|
||||
for rows.Next() {
|
||||
j, err := scanJobRow(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
return jobs, rows.Err()
|
||||
}
|
||||
|
||||
// GetScheduledJob returns a single job by ID.
|
||||
func (db *DB) GetScheduledJob(id int64) (*ScheduledJob, error) {
|
||||
row := db.sql.QueryRow(
|
||||
`SELECT id, name, cron_expr, scan_path, enabled, notify, last_run_at, next_run_at, created_at
|
||||
FROM scheduled_jobs WHERE id = ?`, id,
|
||||
)
|
||||
var j ScheduledJob
|
||||
var enabledInt, notifyInt int
|
||||
var lastRun, nextRun, createdAt sql.NullString
|
||||
if err := row.Scan(&j.ID, &j.Name, &j.CronExpr, &j.ScanPath,
|
||||
&enabledInt, ¬ifyInt, &lastRun, &nextRun, &createdAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
j.Enabled = enabledInt != 0
|
||||
j.Notify = notifyInt != 0
|
||||
j.LastRunAt = parseNullTime(lastRun)
|
||||
j.NextRunAt = parseNullTime(nextRun)
|
||||
if createdAt.Valid {
|
||||
t, _ := time.Parse("2006-01-02 15:04:05", createdAt.String)
|
||||
j.CreatedAt = t
|
||||
}
|
||||
return &j, nil
|
||||
}
|
||||
|
||||
// DeleteScheduledJob removes a job by ID and returns rows affected.
|
||||
func (db *DB) DeleteScheduledJob(id int64) (int64, error) {
|
||||
res, err := db.sql.Exec(`DELETE FROM scheduled_jobs WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("deleting scheduled job %d: %w", id, err)
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
// UpdateJobLastRun updates the last_run_at timestamp for a job.
|
||||
func (db *DB) UpdateJobLastRun(id int64, t time.Time) error {
|
||||
_, err := db.sql.Exec(
|
||||
`UPDATE scheduled_jobs SET last_run_at = ? WHERE id = ?`,
|
||||
t.Format("2006-01-02 15:04:05"), id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// ListEnabledScheduledJobs returns only enabled jobs.
|
||||
func (db *DB) ListEnabledScheduledJobs() ([]ScheduledJob, error) {
|
||||
rows, err := db.sql.Query(
|
||||
`SELECT id, name, cron_expr, scan_path, enabled, notify, last_run_at, next_run_at, created_at
|
||||
FROM scheduled_jobs WHERE enabled = 1 ORDER BY created_at ASC`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying enabled scheduled jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []ScheduledJob
|
||||
for rows.Next() {
|
||||
j, err := scanJobRow(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
return jobs, rows.Err()
|
||||
}
|
||||
|
||||
func scanJobRow(rows *sql.Rows) (ScheduledJob, error) {
|
||||
var j ScheduledJob
|
||||
var enabledInt, notifyInt int
|
||||
var lastRun, nextRun, createdAt sql.NullString
|
||||
if err := rows.Scan(&j.ID, &j.Name, &j.CronExpr, &j.ScanPath,
|
||||
&enabledInt, ¬ifyInt, &lastRun, &nextRun, &createdAt); err != nil {
|
||||
return j, fmt.Errorf("scanning scheduled job row: %w", err)
|
||||
}
|
||||
j.Enabled = enabledInt != 0
|
||||
j.Notify = notifyInt != 0
|
||||
j.LastRunAt = parseNullTime(lastRun)
|
||||
j.NextRunAt = parseNullTime(nextRun)
|
||||
if createdAt.Valid {
|
||||
t, _ := time.Parse("2006-01-02 15:04:05", createdAt.String)
|
||||
j.CreatedAt = t
|
||||
}
|
||||
return j, nil
|
||||
}
|
||||
|
||||
func parseNullTime(ns sql.NullString) *time.Time {
|
||||
if !ns.Valid || ns.String == "" {
|
||||
return nil
|
||||
}
|
||||
// Try multiple formats SQLite may return.
|
||||
for _, layout := range []string{
|
||||
"2006-01-02 15:04:05",
|
||||
"2006-01-02T15:04:05Z",
|
||||
time.RFC3339,
|
||||
} {
|
||||
if t, err := time.Parse(layout, ns.String); err == nil {
|
||||
return &t
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
104
pkg/storage/scheduled_jobs_test.go
Normal file
104
pkg/storage/scheduled_jobs_test.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestScheduledJobCRUD(t *testing.T) {
|
||||
db, err := Open(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// Add a job.
|
||||
job := ScheduledJob{
|
||||
Name: "nightly-scan",
|
||||
CronExpr: "0 0 * * *",
|
||||
ScanPath: "/tmp/repo",
|
||||
Enabled: true,
|
||||
Notify: true,
|
||||
}
|
||||
id, err := db.SaveScheduledJob(job)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, id > 0)
|
||||
|
||||
// Get the job by ID.
|
||||
got, err := db.GetScheduledJob(id)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "nightly-scan", got.Name)
|
||||
assert.Equal(t, "0 0 * * *", got.CronExpr)
|
||||
assert.Equal(t, "/tmp/repo", got.ScanPath)
|
||||
assert.True(t, got.Enabled)
|
||||
assert.True(t, got.Notify)
|
||||
assert.Nil(t, got.LastRunAt)
|
||||
|
||||
// List all jobs.
|
||||
jobs, err := db.ListScheduledJobs()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, jobs, 1)
|
||||
assert.Equal(t, "nightly-scan", jobs[0].Name)
|
||||
|
||||
// Add a second job (disabled).
|
||||
job2 := ScheduledJob{
|
||||
Name: "weekly-scan",
|
||||
CronExpr: "0 0 * * 0",
|
||||
ScanPath: "/tmp/other",
|
||||
Enabled: false,
|
||||
Notify: false,
|
||||
}
|
||||
id2, err := db.SaveScheduledJob(job2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// List enabled only.
|
||||
enabled, err := db.ListEnabledScheduledJobs()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, enabled, 1)
|
||||
assert.Equal(t, id, enabled[0].ID)
|
||||
|
||||
// Delete the first job.
|
||||
affected, err := db.DeleteScheduledJob(id)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), affected)
|
||||
|
||||
// Only the second job should remain.
|
||||
jobs, err = db.ListScheduledJobs()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, jobs, 1)
|
||||
assert.Equal(t, id2, jobs[0].ID)
|
||||
|
||||
// Delete non-existent returns 0 affected.
|
||||
affected, err = db.DeleteScheduledJob(999)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), affected)
|
||||
}
|
||||
|
||||
func TestUpdateJobLastRun(t *testing.T) {
|
||||
db, err := Open(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
id, err := db.SaveScheduledJob(ScheduledJob{
|
||||
Name: "test",
|
||||
CronExpr: "*/5 * * * *",
|
||||
ScanPath: "/tmp/test",
|
||||
Enabled: true,
|
||||
Notify: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
job, err := db.GetScheduledJob(id)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, job.LastRunAt)
|
||||
|
||||
// Update last run.
|
||||
now := time.Now().Truncate(time.Second)
|
||||
require.NoError(t, db.UpdateJobLastRun(id, now))
|
||||
|
||||
job, err = db.GetScheduledJob(id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, job.LastRunAt, "LastRunAt should be set after UpdateJobLastRun")
|
||||
assert.Equal(t, now.Format("2006-01-02 15:04:05"), job.LastRunAt.Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
@@ -55,3 +55,18 @@ CREATE TABLE IF NOT EXISTS custom_dorks (
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_custom_dorks_source ON custom_dorks(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_custom_dorks_category ON custom_dorks(category);
|
||||
|
||||
-- Phase 17: scheduled scan jobs for cron-based recurring scans.
|
||||
CREATE TABLE IF NOT EXISTS scheduled_jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
cron_expr TEXT NOT NULL,
|
||||
scan_path TEXT NOT NULL,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
notify INTEGER NOT NULL DEFAULT 1,
|
||||
last_run_at DATETIME,
|
||||
next_run_at DATETIME,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_enabled ON scheduled_jobs(enabled);
|
||||
|
||||
Reference in New Issue
Block a user