merge: phase 17 wave 1

This commit is contained in:
salvacybersec
2026-04-06 17:29:18 +03:00
10 changed files with 712 additions and 1 deletions

21
pkg/scheduler/jobs.go Normal file
View File

@@ -0,0 +1,21 @@
package scheduler
import "time"
// Job represents a scheduled scan job with its runtime function.
type Job struct {
Name string
CronExpr string
ScanCommand string
NotifyTelegram bool
Enabled bool
RunFunc func(ctx interface{}) (int, error)
}
// JobResult contains the outcome of a scheduled or manually triggered scan job.
type JobResult struct {
JobName string
FindingCount int
Duration time.Duration
Error error
}

179
pkg/scheduler/scheduler.go Normal file
View File

@@ -0,0 +1,179 @@
package scheduler
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/salvacybersec/keyhunter/pkg/storage"
)
// Config holds the dependencies for a Scheduler.
type Config struct {
// DB is the storage backend for persisting jobs and subscribers.
DB *storage.DB
// ScanFunc executes a scan command and returns the finding count.
ScanFunc func(ctx context.Context, scanCommand string) (int, error)
// OnComplete is called after a job finishes. May be nil.
OnComplete func(result JobResult)
}
// Scheduler wraps gocron with SQLite persistence for scheduled scan jobs.
type Scheduler struct {
cfg Config
sched gocron.Scheduler
jobs map[string]gocron.Job
mu sync.Mutex
}
// New creates a new Scheduler with the given configuration.
func New(cfg Config) (*Scheduler, error) {
s, err := gocron.NewScheduler()
if err != nil {
return nil, fmt.Errorf("creating gocron scheduler: %w", err)
}
return &Scheduler{
cfg: cfg,
sched: s,
jobs: make(map[string]gocron.Job),
}, nil
}
// Start loads all enabled jobs from the database and begins scheduling.
func (s *Scheduler) Start(ctx context.Context) error {
jobs, err := s.cfg.DB.ListScheduledJobs()
if err != nil {
return fmt.Errorf("loading scheduled jobs: %w", err)
}
for _, sj := range jobs {
if !sj.Enabled {
continue
}
if err := s.registerJob(ctx, sj); err != nil {
return fmt.Errorf("registering job %q: %w", sj.Name, err)
}
}
s.sched.Start()
return nil
}
// Stop shuts down the gocron scheduler.
func (s *Scheduler) Stop() error {
return s.sched.Shutdown()
}
// AddJob creates a new scheduled job, persists it, and registers it with gocron.
func (s *Scheduler) AddJob(name, cronExpr, scanCommand string, notifyTelegram bool) error {
sj := storage.ScheduledJob{
Name: name,
CronExpr: cronExpr,
ScanCommand: scanCommand,
NotifyTelegram: notifyTelegram,
Enabled: true,
}
if _, err := s.cfg.DB.SaveScheduledJob(sj); err != nil {
return fmt.Errorf("saving job %q: %w", name, err)
}
return s.registerJob(context.Background(), sj)
}
// RemoveJob removes a job from gocron and deletes it from the database.
func (s *Scheduler) RemoveJob(name string) error {
s.mu.Lock()
j, ok := s.jobs[name]
if ok {
delete(s.jobs, name)
}
s.mu.Unlock()
if ok {
if err := s.sched.RemoveJob(j.ID()); err != nil {
return fmt.Errorf("removing gocron job %q: %w", name, err)
}
}
if _, err := s.cfg.DB.DeleteScheduledJob(name); err != nil {
return fmt.Errorf("deleting job %q from DB: %w", name, err)
}
return nil
}
// ListJobs returns all scheduled jobs from the database.
func (s *Scheduler) ListJobs() ([]storage.ScheduledJob, error) {
return s.cfg.DB.ListScheduledJobs()
}
// RunJob manually triggers a job by name. Looks up the job in the DB,
// runs ScanFunc, updates last_run, and calls OnComplete.
func (s *Scheduler) RunJob(ctx context.Context, name string) (JobResult, error) {
sj, err := s.cfg.DB.GetScheduledJob(name)
if err != nil {
return JobResult{}, fmt.Errorf("getting job %q: %w", name, err)
}
start := time.Now()
findings, scanErr := s.cfg.ScanFunc(ctx, sj.ScanCommand)
dur := time.Since(start)
result := JobResult{
JobName: name,
FindingCount: findings,
Duration: dur,
Error: scanErr,
}
// Update last_run regardless of error
now := time.Now().UTC()
_ = s.cfg.DB.UpdateJobLastRun(name, now, nil)
if s.cfg.OnComplete != nil {
s.cfg.OnComplete(result)
}
return result, nil
}
// JobCount returns the number of registered gocron jobs.
func (s *Scheduler) JobCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.jobs)
}
// registerJob creates a gocron cron job and stores the handle.
func (s *Scheduler) registerJob(ctx context.Context, sj storage.ScheduledJob) error {
jobName := sj.Name
scanCmd := sj.ScanCommand
notify := sj.NotifyTelegram
j, err := s.sched.NewJob(
gocron.CronJob(sj.CronExpr, false),
gocron.NewTask(func() {
start := time.Now()
findings, scanErr := s.cfg.ScanFunc(ctx, scanCmd)
dur := time.Since(start)
now := time.Now().UTC()
_ = s.cfg.DB.UpdateJobLastRun(jobName, now, nil)
if notify && s.cfg.OnComplete != nil {
s.cfg.OnComplete(JobResult{
JobName: jobName,
FindingCount: findings,
Duration: dur,
Error: scanErr,
})
}
}),
)
if err != nil {
return fmt.Errorf("creating gocron job: %w", err)
}
s.mu.Lock()
s.jobs[sj.Name] = j
s.mu.Unlock()
return nil
}

View File

@@ -0,0 +1,204 @@
package scheduler_test
import (
"context"
"sync"
"testing"
"time"
"github.com/salvacybersec/keyhunter/pkg/scheduler"
"github.com/salvacybersec/keyhunter/pkg/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func openTestDB(t *testing.T) *storage.DB {
t.Helper()
db, err := storage.Open(":memory:")
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
return db
}
func TestStorageRoundTrip(t *testing.T) {
db := openTestDB(t)
id, err := db.SaveScheduledJob(storage.ScheduledJob{
Name: "nightly-scan",
CronExpr: "0 2 * * *",
ScanCommand: "/tmp/repos",
NotifyTelegram: true,
Enabled: true,
})
require.NoError(t, err)
assert.Greater(t, id, int64(0))
jobs, err := db.ListScheduledJobs()
require.NoError(t, err)
require.Len(t, jobs, 1)
assert.Equal(t, "nightly-scan", jobs[0].Name)
assert.Equal(t, "0 2 * * *", jobs[0].CronExpr)
assert.Equal(t, "/tmp/repos", jobs[0].ScanCommand)
assert.True(t, jobs[0].NotifyTelegram)
assert.True(t, jobs[0].Enabled)
got, err := db.GetScheduledJob("nightly-scan")
require.NoError(t, err)
assert.Equal(t, "nightly-scan", got.Name)
now := time.Now().UTC()
next := now.Add(24 * time.Hour)
err = db.UpdateJobLastRun("nightly-scan", now, &next)
require.NoError(t, err)
got2, err := db.GetScheduledJob("nightly-scan")
require.NoError(t, err)
require.NotNil(t, got2.LastRun)
require.NotNil(t, got2.NextRun)
n, err := db.DeleteScheduledJob("nightly-scan")
require.NoError(t, err)
assert.Equal(t, int64(1), n)
jobs, err = db.ListScheduledJobs()
require.NoError(t, err)
assert.Empty(t, jobs)
}
func TestSubscriberRoundTrip(t *testing.T) {
db := openTestDB(t)
err := db.AddSubscriber(12345, "alice")
require.NoError(t, err)
subs, err := db.ListSubscribers()
require.NoError(t, err)
require.Len(t, subs, 1)
assert.Equal(t, int64(12345), subs[0].ChatID)
assert.Equal(t, "alice", subs[0].Username)
ok, err := db.IsSubscribed(12345)
require.NoError(t, err)
assert.True(t, ok)
ok, err = db.IsSubscribed(99999)
require.NoError(t, err)
assert.False(t, ok)
n, err := db.RemoveSubscriber(12345)
require.NoError(t, err)
assert.Equal(t, int64(1), n)
subs, err = db.ListSubscribers()
require.NoError(t, err)
assert.Empty(t, subs)
}
func TestSchedulerStartLoadsJobs(t *testing.T) {
db := openTestDB(t)
_, err := db.SaveScheduledJob(storage.ScheduledJob{
Name: "job-a", CronExpr: "0 * * * *", ScanCommand: "/a", Enabled: true,
})
require.NoError(t, err)
_, err = db.SaveScheduledJob(storage.ScheduledJob{
Name: "job-b", CronExpr: "0 * * * *", ScanCommand: "/b", Enabled: true,
})
require.NoError(t, err)
// Disabled job should not be registered
_, err = db.SaveScheduledJob(storage.ScheduledJob{
Name: "job-c", CronExpr: "0 * * * *", ScanCommand: "/c", Enabled: false,
})
require.NoError(t, err)
s, err := scheduler.New(scheduler.Config{
DB: db,
ScanFunc: func(ctx context.Context, cmd string) (int, error) {
return 0, nil
},
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = s.Start(ctx)
require.NoError(t, err)
defer s.Stop()
assert.Equal(t, 2, s.JobCount())
}
func TestSchedulerAddRemoveJob(t *testing.T) {
db := openTestDB(t)
s, err := scheduler.New(scheduler.Config{
DB: db,
ScanFunc: func(ctx context.Context, cmd string) (int, error) {
return 0, nil
},
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = s.Start(ctx)
require.NoError(t, err)
defer s.Stop()
err = s.AddJob("test-job", "0 * * * *", "/test", true)
require.NoError(t, err)
assert.Equal(t, 1, s.JobCount())
jobs, err := db.ListScheduledJobs()
require.NoError(t, err)
require.Len(t, jobs, 1)
assert.Equal(t, "test-job", jobs[0].Name)
err = s.RemoveJob("test-job")
require.NoError(t, err)
assert.Equal(t, 0, s.JobCount())
jobs, err = db.ListScheduledJobs()
require.NoError(t, err)
assert.Empty(t, jobs)
}
func TestSchedulerRunJob(t *testing.T) {
db := openTestDB(t)
var mu sync.Mutex
var scanCalled string
var completeCalled bool
s, err := scheduler.New(scheduler.Config{
DB: db,
ScanFunc: func(ctx context.Context, cmd string) (int, error) {
mu.Lock()
scanCalled = cmd
mu.Unlock()
return 5, nil
},
OnComplete: func(result scheduler.JobResult) {
mu.Lock()
completeCalled = true
mu.Unlock()
},
})
require.NoError(t, err)
_, err = db.SaveScheduledJob(storage.ScheduledJob{
Name: "manual-run", CronExpr: "0 * * * *", ScanCommand: "/manual", NotifyTelegram: true, Enabled: true,
})
require.NoError(t, err)
result, err := s.RunJob(context.Background(), "manual-run")
require.NoError(t, err)
assert.Equal(t, "manual-run", result.JobName)
assert.Equal(t, 5, result.FindingCount)
mu.Lock()
assert.Equal(t, "/manual", scanCalled)
assert.True(t, completeCalled)
mu.Unlock()
}

View File

@@ -0,0 +1,111 @@
package storage
import (
"database/sql"
"time"
)
// ScheduledJob represents a cron-based scheduled scan job.
type ScheduledJob struct {
ID int64
Name string
CronExpr string
ScanCommand string
NotifyTelegram bool
Enabled bool
LastRun *time.Time
NextRun *time.Time
CreatedAt time.Time
}
// SaveScheduledJob inserts a new scheduled job. Returns the new row ID.
func (db *DB) SaveScheduledJob(j ScheduledJob) (int64, error) {
res, err := db.sql.Exec(
`INSERT INTO scheduled_jobs (name, cron_expr, scan_command, notify_telegram, enabled)
VALUES (?, ?, ?, ?, ?)`,
j.Name, j.CronExpr, j.ScanCommand, j.NotifyTelegram, j.Enabled,
)
if err != nil {
return 0, err
}
return res.LastInsertId()
}
// ListScheduledJobs returns all scheduled jobs.
func (db *DB) ListScheduledJobs() ([]ScheduledJob, error) {
rows, err := db.sql.Query(
`SELECT id, name, cron_expr, scan_command, notify_telegram, enabled, last_run, next_run, created_at
FROM scheduled_jobs ORDER BY id`,
)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []ScheduledJob
for rows.Next() {
var j ScheduledJob
var lastRun, nextRun sql.NullTime
if err := rows.Scan(&j.ID, &j.Name, &j.CronExpr, &j.ScanCommand,
&j.NotifyTelegram, &j.Enabled, &lastRun, &nextRun, &j.CreatedAt); err != nil {
return nil, err
}
if lastRun.Valid {
j.LastRun = &lastRun.Time
}
if nextRun.Valid {
j.NextRun = &nextRun.Time
}
jobs = append(jobs, j)
}
return jobs, rows.Err()
}
// GetScheduledJob returns a single scheduled job by name.
func (db *DB) GetScheduledJob(name string) (*ScheduledJob, error) {
var j ScheduledJob
var lastRun, nextRun sql.NullTime
err := db.sql.QueryRow(
`SELECT id, name, cron_expr, scan_command, notify_telegram, enabled, last_run, next_run, created_at
FROM scheduled_jobs WHERE name = ?`, name,
).Scan(&j.ID, &j.Name, &j.CronExpr, &j.ScanCommand,
&j.NotifyTelegram, &j.Enabled, &lastRun, &nextRun, &j.CreatedAt)
if err != nil {
return nil, err
}
if lastRun.Valid {
j.LastRun = &lastRun.Time
}
if nextRun.Valid {
j.NextRun = &nextRun.Time
}
return &j, nil
}
// DeleteScheduledJob removes a scheduled job by name. Returns rows affected.
func (db *DB) DeleteScheduledJob(name string) (int64, error) {
res, err := db.sql.Exec(`DELETE FROM scheduled_jobs WHERE name = ?`, name)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// UpdateJobLastRun updates the last_run and next_run timestamps for a job.
func (db *DB) UpdateJobLastRun(name string, lastRun time.Time, nextRun *time.Time) error {
var nr sql.NullTime
if nextRun != nil {
nr = sql.NullTime{Time: *nextRun, Valid: true}
}
_, err := db.sql.Exec(
`UPDATE scheduled_jobs SET last_run = ?, next_run = ? WHERE name = ?`,
lastRun, nr, name,
)
return err
}
// SetJobEnabled updates the enabled flag for a scheduled job.
func (db *DB) SetJobEnabled(name string, enabled bool) error {
_, err := db.sql.Exec(`UPDATE scheduled_jobs SET enabled = ? WHERE name = ?`, enabled, name)
return err
}

View File

@@ -55,3 +55,23 @@ CREATE TABLE IF NOT EXISTS custom_dorks (
CREATE INDEX IF NOT EXISTS idx_custom_dorks_source ON custom_dorks(source);
CREATE INDEX IF NOT EXISTS idx_custom_dorks_category ON custom_dorks(category);
-- Phase 17: Telegram bot subscribers for auto-notifications.
CREATE TABLE IF NOT EXISTS subscribers (
chat_id INTEGER PRIMARY KEY,
username TEXT,
subscribed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- Phase 17: Cron-based scheduled scan jobs.
CREATE TABLE IF NOT EXISTS scheduled_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
cron_expr TEXT NOT NULL,
scan_command TEXT NOT NULL,
notify_telegram BOOLEAN DEFAULT FALSE,
enabled BOOLEAN DEFAULT TRUE,
last_run DATETIME,
next_run DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

View File

@@ -0,0 +1,54 @@
package storage
import "time"
// Subscriber represents a Telegram chat subscribed to scan notifications.
type Subscriber struct {
ChatID int64
Username string
SubscribedAt time.Time
}
// AddSubscriber inserts or replaces a subscriber in the database.
func (db *DB) AddSubscriber(chatID int64, username string) error {
_, err := db.sql.Exec(
`INSERT OR REPLACE INTO subscribers (chat_id, username, subscribed_at) VALUES (?, ?, CURRENT_TIMESTAMP)`,
chatID, username,
)
return err
}
// RemoveSubscriber deletes a subscriber by chat ID. Returns rows affected.
func (db *DB) RemoveSubscriber(chatID int64) (int64, error) {
res, err := db.sql.Exec(`DELETE FROM subscribers WHERE chat_id = ?`, chatID)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// ListSubscribers returns all subscribers ordered by subscription time.
func (db *DB) ListSubscribers() ([]Subscriber, error) {
rows, err := db.sql.Query(`SELECT chat_id, username, subscribed_at FROM subscribers ORDER BY subscribed_at`)
if err != nil {
return nil, err
}
defer rows.Close()
var subs []Subscriber
for rows.Next() {
var s Subscriber
if err := rows.Scan(&s.ChatID, &s.Username, &s.SubscribedAt); err != nil {
return nil, err
}
subs = append(subs, s)
}
return subs, rows.Err()
}
// IsSubscribed returns true if the given chat ID is subscribed.
func (db *DB) IsSubscribed(chatID int64) (bool, error) {
var count int
err := db.sql.QueryRow(`SELECT COUNT(*) FROM subscribers WHERE chat_id = ?`, chatID).Scan(&count)
return count > 0, err
}