feat(18-02): SSE hub for live scan/recon progress streaming
- SSEHub with Subscribe/Unsubscribe/Broadcast lifecycle - Non-blocking broadcast with buffered channels (cap 32) - SSE handlers for /api/v1/scan/progress and /api/v1/recon/progress - Proper text/event-stream headers and SSE wire format - 7 passing tests covering hub lifecycle, broadcast, and HTTP handler
This commit is contained in:
217
pkg/web/sse_test.go
Normal file
217
pkg/web/sse_test.go
Normal file
@@ -0,0 +1,217 @@
|
|||||||
|
package web
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSSEHubSubscribeUnsubscribe(t *testing.T) {
|
||||||
|
hub := NewSSEHub()
|
||||||
|
|
||||||
|
ch1 := hub.Subscribe()
|
||||||
|
ch2 := hub.Subscribe()
|
||||||
|
assert.Equal(t, 2, hub.ClientCount())
|
||||||
|
|
||||||
|
hub.Unsubscribe(ch1)
|
||||||
|
assert.Equal(t, 1, hub.ClientCount())
|
||||||
|
|
||||||
|
hub.Unsubscribe(ch2)
|
||||||
|
assert.Equal(t, 0, hub.ClientCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEHubBroadcast(t *testing.T) {
|
||||||
|
hub := NewSSEHub()
|
||||||
|
|
||||||
|
ch1 := hub.Subscribe()
|
||||||
|
ch2 := hub.Subscribe()
|
||||||
|
defer hub.Unsubscribe(ch1)
|
||||||
|
defer hub.Unsubscribe(ch2)
|
||||||
|
|
||||||
|
evt := SSEEvent{Type: "scan:progress", Data: map[string]int{"percent": 50}}
|
||||||
|
hub.Broadcast(evt)
|
||||||
|
|
||||||
|
// Both clients should receive the event
|
||||||
|
select {
|
||||||
|
case got := <-ch1:
|
||||||
|
assert.Equal(t, "scan:progress", got.Type)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("ch1 did not receive event")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case got := <-ch2:
|
||||||
|
assert.Equal(t, "scan:progress", got.Type)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("ch2 did not receive event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEHubBroadcastDropsWhenFull(t *testing.T) {
|
||||||
|
hub := NewSSEHub()
|
||||||
|
ch := hub.Subscribe()
|
||||||
|
defer hub.Unsubscribe(ch)
|
||||||
|
|
||||||
|
// Fill the buffer (capacity 32)
|
||||||
|
for i := 0; i < 32; i++ {
|
||||||
|
hub.Broadcast(SSEEvent{Type: "fill", Data: i})
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should NOT block — it drops the event
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
hub.Broadcast(SSEEvent{Type: "overflow", Data: 33})
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// good, broadcast returned
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("Broadcast blocked on full buffer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEHubClientDisconnect(t *testing.T) {
|
||||||
|
hub := NewSSEHub()
|
||||||
|
ch := hub.Subscribe()
|
||||||
|
assert.Equal(t, 1, hub.ClientCount())
|
||||||
|
|
||||||
|
hub.Unsubscribe(ch)
|
||||||
|
assert.Equal(t, 0, hub.ClientCount())
|
||||||
|
|
||||||
|
// Channel should be closed
|
||||||
|
_, ok := <-ch
|
||||||
|
assert.False(t, ok, "channel should be closed after unsubscribe")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEHTTPHandler(t *testing.T) {
|
||||||
|
s, _ := testServer(t)
|
||||||
|
|
||||||
|
r := chi.NewRouter()
|
||||||
|
s.mountAPI(r)
|
||||||
|
|
||||||
|
// Start the SSE request in a goroutine with a cancelable context
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
// Run handler in background
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Give handler time to set headers and send initial event
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
// Broadcast an event
|
||||||
|
s.sse.Broadcast(SSEEvent{Type: "scan:finding", Data: map[string]string{"key": "test"}})
|
||||||
|
|
||||||
|
// Give time for event to be written
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
// Cancel the context to disconnect
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("handler did not return after context cancel")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check response headers
|
||||||
|
assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type"))
|
||||||
|
assert.Equal(t, "no-cache", w.Header().Get("Cache-Control"))
|
||||||
|
|
||||||
|
// Parse SSE events from body
|
||||||
|
body := w.Body.String()
|
||||||
|
assert.Contains(t, body, "event: connected")
|
||||||
|
assert.Contains(t, body, "event: scan:finding")
|
||||||
|
assert.Contains(t, body, "data:")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEEventFormat(t *testing.T) {
|
||||||
|
s, _ := testServer(t)
|
||||||
|
|
||||||
|
r := chi.NewRouter()
|
||||||
|
s.mountAPI(r)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/recon/progress", nil)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
s.sse.Broadcast(SSEEvent{Type: "recon:complete", Data: map[string]int{"total": 5}})
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
// Verify SSE format: "event: {type}\ndata: {json}\n\n"
|
||||||
|
body := w.Body.String()
|
||||||
|
scanner := bufio.NewScanner(strings.NewReader(body))
|
||||||
|
var foundEvent, foundData bool
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
if strings.HasPrefix(line, "event: recon:complete") {
|
||||||
|
foundEvent = true
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(line, "data: ") && strings.Contains(line, `"total"`) {
|
||||||
|
foundData = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.True(t, foundEvent, "should have event: recon:complete line")
|
||||||
|
assert.True(t, foundData, "should have data line with JSON")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSSEClientDisconnectRemovesSubscriber(t *testing.T) {
|
||||||
|
s, _ := testServer(t)
|
||||||
|
|
||||||
|
r := chi.NewRouter()
|
||||||
|
s.mountAPI(r)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil)
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
assert.Equal(t, 1, s.sse.ClientCount(), "should have 1 subscriber")
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
// After disconnect, subscriber should be removed
|
||||||
|
// Give a small moment for cleanup
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
assert.Equal(t, 0, s.sse.ClientCount(), "should have 0 subscribers after disconnect")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user