package web import ( "encoding/json" "fmt" "net/http" "sync" ) // SSEEvent represents a server-sent event with a type and JSON-serializable data. type SSEEvent struct { Type string `json:"type"` Data interface{} `json:"data"` } // SSEHub manages SSE client subscriptions and broadcasts events to all // connected clients. It is safe for concurrent use. type SSEHub struct { mu sync.RWMutex clients map[chan SSEEvent]struct{} } // NewSSEHub creates an empty SSE hub ready to accept subscriptions. func NewSSEHub() *SSEHub { return &SSEHub{ clients: make(map[chan SSEEvent]struct{}), } } // Subscribe creates a new buffered channel for a client and registers it. // The caller must call Unsubscribe when done. func (h *SSEHub) Subscribe() chan SSEEvent { ch := make(chan SSEEvent, 32) h.mu.Lock() h.clients[ch] = struct{}{} h.mu.Unlock() return ch } // Unsubscribe removes a client channel from the hub and closes it. func (h *SSEHub) Unsubscribe(ch chan SSEEvent) { h.mu.Lock() delete(h.clients, ch) h.mu.Unlock() close(ch) } // Broadcast sends an event to all connected clients. If a client's buffer is // full the event is dropped for that client (non-blocking send). func (h *SSEHub) Broadcast(evt SSEEvent) { h.mu.RLock() defer h.mu.RUnlock() for ch := range h.clients { select { case ch <- evt: default: // client buffer full, drop event } } } // ClientCount returns the number of currently connected SSE clients. func (h *SSEHub) ClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // handleSSEScanProgress streams scan progress events to the client via SSE. func (s *Server) handleSSEScanProgress(w http.ResponseWriter, r *http.Request) { s.serveSSE(w, r) } // handleSSEReconProgress streams recon progress events to the client via SSE. func (s *Server) handleSSEReconProgress(w http.ResponseWriter, r *http.Request) { s.serveSSE(w, r) } // serveSSE is the shared SSE handler for both scan and recon progress endpoints. func (s *Server) serveSSE(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") ch := s.sse.Subscribe() defer s.sse.Unsubscribe(ch) // Send initial connection event fmt.Fprintf(w, "event: connected\ndata: {}\n\n") flusher.Flush() for { select { case <-r.Context().Done(): return case evt, ok := <-ch: if !ok { return } data, err := json.Marshal(evt.Data) if err != nil { continue } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evt.Type, data) flusher.Flush() } } }