package main import ( "encoding/json" "fmt" "io" "log" "net/http" "sync" "time" ) // WebhookPayload matches AlpenQueue's webhook format type WebhookPayload struct { Status string `json:"status"` Took string `json:"took"` URL string `json:"url"` Content string `json:"content"` } // SSEBroker manages SSE connections and broadcasts type SSEBroker struct { clients map[chan string]bool newClients chan chan string deadClients chan chan string messages chan string mu sync.RWMutex } func NewSSEBroker() *SSEBroker { broker := &SSEBroker{ clients: make(map[chan string]bool), newClients: make(chan chan string), deadClients: make(chan chan string), messages: make(chan string, 100), } go broker.run() return broker } func (b *SSEBroker) run() { for { select { case client := <-b.newClients: b.mu.Lock() b.clients[client] = true b.mu.Unlock() log.Printf("New SSE client connected. Total: %d", len(b.clients)) case client := <-b.deadClients: b.mu.Lock() delete(b.clients, client) close(client) b.mu.Unlock() log.Printf("SSE client disconnected. Total: %d", len(b.clients)) case msg := <-b.messages: b.mu.RLock() for client := range b.clients { select { case client <- msg: default: // Client is slow/blocked, skip } } b.mu.RUnlock() } } } func (b *SSEBroker) Broadcast(message string) { b.messages <- message } func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "https://maxtheweb.com") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } messageChan := make(chan string) b.newClients <- messageChan // Send initial connection message fmt.Fprintf(w, "data: {\"type\":\"connected\"}\n\n") flusher.Flush() // Keep-alive ticker ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case msg := <-messageChan: fmt.Fprintf(w, "data: %s\n\n", msg) flusher.Flush() case <-ticker.C: fmt.Fprintf(w, ": keep-alive\n\n") flusher.Flush() case <-r.Context().Done(): b.deadClients <- messageChan return } } } var broker *SSEBroker func webhookHandler(w http.ResponseWriter, r *http.Request) { // CORS headers w.Header().Set("Access-Control-Allow-Origin", "https://alpenqueue.maxtheweb.com") w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") if r.Method == "OPTIONS" { w.WriteHeader(http.StatusNoContent) return } if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } body, err := io.ReadAll(r.Body) if err != nil { log.Printf("Error reading webhook body: %v", err) http.Error(w, "Bad request", http.StatusBadRequest) return } defer r.Body.Close() var payload WebhookPayload if err := json.Unmarshal(body, &payload); err != nil { log.Printf("Error parsing webhook JSON: %v", err) http.Error(w, "Bad request", http.StatusBadRequest) return } log.Printf("Received webhook: status=%s, url=%s, took=%s", payload.Status, payload.URL, payload.Took) // Create SSE message sseMessage := map[string]interface{}{ "type": "result", "status": payload.Status, "took": payload.Took, "url": payload.URL, "content": payload.Content, "timestamp": time.Now().Format(time.RFC3339), } jsonMsg, _ := json.Marshal(sseMessage) broker.Broadcast(string(jsonMsg)) w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) } func main() { broker = NewSSEBroker() http.HandleFunc("/webhook", webhookHandler) http.Handle("/events", broker) http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "OK") }) log.Println("Webhook SSE server starting on :8081") if err := http.ListenAndServe(":8081", nil); err != nil { log.Fatal(err) } }