package web import ( "bufio" "context" "net/http" "net/http/httptest" "strings" "testing" "time" "github.com/go-chi/chi/v5" "github.com/stretchr/testify/assert" ) func TestSSEHubSubscribeUnsubscribe(t *testing.T) { hub := NewSSEHub() ch1 := hub.Subscribe() ch2 := hub.Subscribe() assert.Equal(t, 2, hub.ClientCount()) hub.Unsubscribe(ch1) assert.Equal(t, 1, hub.ClientCount()) hub.Unsubscribe(ch2) assert.Equal(t, 0, hub.ClientCount()) } func TestSSEHubBroadcast(t *testing.T) { hub := NewSSEHub() ch1 := hub.Subscribe() ch2 := hub.Subscribe() defer hub.Unsubscribe(ch1) defer hub.Unsubscribe(ch2) evt := SSEEvent{Type: "scan:progress", Data: map[string]int{"percent": 50}} hub.Broadcast(evt) // Both clients should receive the event select { case got := <-ch1: assert.Equal(t, "scan:progress", got.Type) case <-time.After(time.Second): t.Fatal("ch1 did not receive event") } select { case got := <-ch2: assert.Equal(t, "scan:progress", got.Type) case <-time.After(time.Second): t.Fatal("ch2 did not receive event") } } func TestSSEHubBroadcastDropsWhenFull(t *testing.T) { hub := NewSSEHub() ch := hub.Subscribe() defer hub.Unsubscribe(ch) // Fill the buffer (capacity 32) for i := 0; i < 32; i++ { hub.Broadcast(SSEEvent{Type: "fill", Data: i}) } // This should NOT block — it drops the event done := make(chan struct{}) go func() { hub.Broadcast(SSEEvent{Type: "overflow", Data: 33}) close(done) }() select { case <-done: // good, broadcast returned case <-time.After(time.Second): t.Fatal("Broadcast blocked on full buffer") } } func TestSSEHubClientDisconnect(t *testing.T) { hub := NewSSEHub() ch := hub.Subscribe() assert.Equal(t, 1, hub.ClientCount()) hub.Unsubscribe(ch) assert.Equal(t, 0, hub.ClientCount()) // Channel should be closed _, ok := <-ch assert.False(t, ok, "channel should be closed after unsubscribe") } func TestSSEHTTPHandler(t *testing.T) { s, _ := testServer(t) r := chi.NewRouter() s.mountAPI(r) // Start the SSE request in a goroutine with a cancelable context ctx, cancel := context.WithCancel(context.Background()) defer cancel() req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil) req = req.WithContext(ctx) w := httptest.NewRecorder() // Run handler in background done := make(chan struct{}) go func() { r.ServeHTTP(w, req) close(done) }() // Give handler time to set headers and send initial event time.Sleep(50 * time.Millisecond) // Broadcast an event s.sse.Broadcast(SSEEvent{Type: "scan:finding", Data: map[string]string{"key": "test"}}) // Give time for event to be written time.Sleep(50 * time.Millisecond) // Cancel the context to disconnect cancel() select { case <-done: case <-time.After(2 * time.Second): t.Fatal("handler did not return after context cancel") } // Check response headers assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) assert.Equal(t, "no-cache", w.Header().Get("Cache-Control")) // Parse SSE events from body body := w.Body.String() assert.Contains(t, body, "event: connected") assert.Contains(t, body, "event: scan:finding") assert.Contains(t, body, "data:") } func TestSSEEventFormat(t *testing.T) { s, _ := testServer(t) r := chi.NewRouter() s.mountAPI(r) ctx, cancel := context.WithCancel(context.Background()) defer cancel() req := httptest.NewRequest(http.MethodGet, "/api/v1/recon/progress", nil) req = req.WithContext(ctx) w := httptest.NewRecorder() done := make(chan struct{}) go func() { r.ServeHTTP(w, req) close(done) }() time.Sleep(50 * time.Millisecond) s.sse.Broadcast(SSEEvent{Type: "recon:complete", Data: map[string]int{"total": 5}}) time.Sleep(50 * time.Millisecond) cancel() <-done // Verify SSE format: "event: {type}\ndata: {json}\n\n" body := w.Body.String() scanner := bufio.NewScanner(strings.NewReader(body)) var foundEvent, foundData bool for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "event: recon:complete") { foundEvent = true } if strings.HasPrefix(line, "data: ") && strings.Contains(line, `"total"`) { foundData = true } } assert.True(t, foundEvent, "should have event: recon:complete line") assert.True(t, foundData, "should have data line with JSON") } func TestSSEClientDisconnectRemovesSubscriber(t *testing.T) { s, _ := testServer(t) r := chi.NewRouter() s.mountAPI(r) ctx, cancel := context.WithCancel(context.Background()) req := httptest.NewRequest(http.MethodGet, "/api/v1/scan/progress", nil) req = req.WithContext(ctx) w := httptest.NewRecorder() done := make(chan struct{}) go func() { r.ServeHTTP(w, req) close(done) }() time.Sleep(50 * time.Millisecond) assert.Equal(t, 1, s.sse.ClientCount(), "should have 1 subscriber") cancel() <-done // After disconnect, subscriber should be removed // Give a small moment for cleanup time.Sleep(10 * time.Millisecond) assert.Equal(t, 0, s.sse.ClientCount(), "should have 0 subscribers after disconnect") }