From d557c7303d4d29b8d125d8435eb2f0efb6415098 Mon Sep 17 00:00:00 2001 From: salvacybersec Date: Mon, 6 Apr 2026 18:06:35 +0300 Subject: [PATCH] feat(18-02): SSE hub for live scan/recon progress streaming - SSEHub with Subscribe/Unsubscribe/Broadcast lifecycle - Non-blocking broadcast with buffered channels (cap 32) - SSE handlers for /api/v1/scan/progress and /api/v1/recon/progress - Proper text/event-stream headers and SSE wire format - 7 passing tests covering hub lifecycle, broadcast, and HTTP handler --- pkg/web/sse_test.go | 217 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 pkg/web/sse_test.go diff --git a/pkg/web/sse_test.go b/pkg/web/sse_test.go new file mode 100644 index 0000000..2b8b405 --- /dev/null +++ b/pkg/web/sse_test.go @@ -0,0 +1,217 @@ +package web + +import ( + "bufio" + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/go-chi/chi/v5" + "github.com/stretchr/testify/assert" +) + +func TestSSEHubSubscribeUnsubscribe(t *testing.T) { + hub := NewSSEHub() + + ch1 := hub.Subscribe() + ch2 := hub.Subscribe() + assert.Equal(t, 2, hub.ClientCount()) + + hub.Unsubscribe(ch1) + assert.Equal(t, 1, hub.ClientCount()) + + hub.Unsubscribe(ch2) + assert.Equal(t, 0, hub.ClientCount()) +} + +func TestSSEHubBroadcast(t *testing.T) { + hub := NewSSEHub() + + ch1 := hub.Subscribe() + ch2 := hub.Subscribe() + defer hub.Unsubscribe(ch1) + defer hub.Unsubscribe(ch2) + + evt := SSEEvent{Type: "scan:progress", Data: map[string]int{"percent": 50}} + hub.Broadcast(evt) + + // Both clients should receive the event + select { + case got := <-ch1: + assert.Equal(t, "scan:progress", got.Type) + case <-time.After(time.Second): + t.Fatal("ch1 did not receive event") + } + + select { + case got := <-ch2: + assert.Equal(t, "scan:progress", got.Type) + case <-time.After(time.Second): + t.Fatal("ch2 did not receive event") + } +} + +func TestSSEHubBroadcastDropsWhenFull(t *testing.T) { + hub := NewSSEHub() + ch := hub.Subscribe() + defer hub.Unsubscribe(ch) + + // Fill the buffer (capacity 32) + for i := 0; i < 32; i++ { + hub.Broadcast(SSEEvent{Type: "fill", Data: i}) + } + + // This should NOT block — it drops the event + done := make(chan struct{}) + go func() { + hub.Broadcast(SSEEvent{Type: "overflow", Data: 33}) + close(done) + }() + + select { + case <-done: + // good, broadcast returned + case <-time.After(time.Second): + t.Fatal("Broadcast blocked on full buffer") + } +} + +func TestSSEHubClientDisconnect(t *testing.T) { + hub := NewSSEHub() + ch := hub.Subscribe() + assert.Equal(t, 1, hub.ClientCount()) + + hub.Unsubscribe(ch) + assert.Equal(t, 0, hub.ClientCount()) + + // Channel should be closed + _, ok := <-ch + assert.False(t, ok, "channel should be closed after unsubscribe") +} + +func TestSSEHTTPHandler(t *testing.T) { + s, _ := testServer(t) + + r := chi.NewRouter() + s.mountAPI(r) + + // Start the SSE request in a goroutine with a cancelable context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil) + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + // Run handler in background + done := make(chan struct{}) + go func() { + r.ServeHTTP(w, req) + close(done) + }() + + // Give handler time to set headers and send initial event + time.Sleep(50 * time.Millisecond) + + // Broadcast an event + s.sse.Broadcast(SSEEvent{Type: "scan:finding", Data: map[string]string{"key": "test"}}) + + // Give time for event to be written + time.Sleep(50 * time.Millisecond) + + // Cancel the context to disconnect + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("handler did not return after context cancel") + } + + // Check response headers + assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) + assert.Equal(t, "no-cache", w.Header().Get("Cache-Control")) + + // Parse SSE events from body + body := w.Body.String() + assert.Contains(t, body, "event: connected") + assert.Contains(t, body, "event: scan:finding") + assert.Contains(t, body, "data:") +} + +func TestSSEEventFormat(t *testing.T) { + s, _ := testServer(t) + + r := chi.NewRouter() + s.mountAPI(r) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := httptest.NewRequest(http.MethodGet, "/api/v1/recon/progress", nil) + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + done := make(chan struct{}) + go func() { + r.ServeHTTP(w, req) + close(done) + }() + + time.Sleep(50 * time.Millisecond) + + s.sse.Broadcast(SSEEvent{Type: "recon:complete", Data: map[string]int{"total": 5}}) + time.Sleep(50 * time.Millisecond) + + cancel() + <-done + + // Verify SSE format: "event: {type}\ndata: {json}\n\n" + body := w.Body.String() + scanner := bufio.NewScanner(strings.NewReader(body)) + var foundEvent, foundData bool + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "event: recon:complete") { + foundEvent = true + } + if strings.HasPrefix(line, "data: ") && strings.Contains(line, `"total"`) { + foundData = true + } + } + assert.True(t, foundEvent, "should have event: recon:complete line") + assert.True(t, foundData, "should have data line with JSON") +} + +func TestSSEClientDisconnectRemovesSubscriber(t *testing.T) { + s, _ := testServer(t) + + r := chi.NewRouter() + s.mountAPI(r) + + ctx, cancel := context.WithCancel(context.Background()) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil) + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + done := make(chan struct{}) + go func() { + r.ServeHTTP(w, req) + close(done) + }() + + time.Sleep(50 * time.Millisecond) + assert.Equal(t, 1, s.sse.ClientCount(), "should have 1 subscriber") + + cancel() + <-done + + // After disconnect, subscriber should be removed + // Give a small moment for cleanup + time.Sleep(10 * time.Millisecond) + assert.Equal(t, 0, s.sse.ClientCount(), "should have 0 subscribers after disconnect") +}