feat(18-02): REST API handlers for /api/v1/* endpoints
- Stats, keys, providers, scan, recon, dorks, config endpoints - JSON response wrappers with proper tags for all entities - Filtering, pagination, 404/204/202 status codes - SSE hub stub (full impl in task 2) - Resolved merge conflict in schema.sql - 16 passing tests covering all endpoints
This commit is contained in:
115
pkg/web/sse.go
Normal file
115
pkg/web/sse.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package web
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// SSEEvent represents a server-sent event with a type and JSON-serializable data.
|
||||
type SSEEvent struct {
|
||||
Type string `json:"type"`
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
// SSEHub manages SSE client subscriptions and broadcasts events to all
|
||||
// connected clients. It is safe for concurrent use.
|
||||
type SSEHub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[chan SSEEvent]struct{}
|
||||
}
|
||||
|
||||
// NewSSEHub creates an empty SSE hub ready to accept subscriptions.
|
||||
func NewSSEHub() *SSEHub {
|
||||
return &SSEHub{
|
||||
clients: make(map[chan SSEEvent]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe creates a new buffered channel for a client and registers it.
|
||||
// The caller must call Unsubscribe when done.
|
||||
func (h *SSEHub) Subscribe() chan SSEEvent {
|
||||
ch := make(chan SSEEvent, 32)
|
||||
h.mu.Lock()
|
||||
h.clients[ch] = struct{}{}
|
||||
h.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Unsubscribe removes a client channel from the hub and closes it.
|
||||
func (h *SSEHub) Unsubscribe(ch chan SSEEvent) {
|
||||
h.mu.Lock()
|
||||
delete(h.clients, ch)
|
||||
h.mu.Unlock()
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// Broadcast sends an event to all connected clients. If a client's buffer is
|
||||
// full the event is dropped for that client (non-blocking send).
|
||||
func (h *SSEHub) Broadcast(evt SSEEvent) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for ch := range h.clients {
|
||||
select {
|
||||
case ch <- evt:
|
||||
default:
|
||||
// client buffer full, drop event
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ClientCount returns the number of currently connected SSE clients.
|
||||
func (h *SSEHub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
// handleSSEScanProgress streams scan progress events to the client via SSE.
|
||||
func (s *Server) handleSSEScanProgress(w http.ResponseWriter, r *http.Request) {
|
||||
s.serveSSE(w, r)
|
||||
}
|
||||
|
||||
// handleSSEReconProgress streams recon progress events to the client via SSE.
|
||||
func (s *Server) handleSSEReconProgress(w http.ResponseWriter, r *http.Request) {
|
||||
s.serveSSE(w, r)
|
||||
}
|
||||
|
||||
// serveSSE is the shared SSE handler for both scan and recon progress endpoints.
|
||||
func (s *Server) serveSSE(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
|
||||
ch := s.sse.Subscribe()
|
||||
defer s.sse.Unsubscribe(ch)
|
||||
|
||||
// Send initial connection event
|
||||
fmt.Fprintf(w, "event: connected\ndata: {}\n\n")
|
||||
flusher.Flush()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case evt, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(evt.Data)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", evt.Type, data)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user