package web import ( "encoding/json" "fmt" "net/http" "gitgud.io/mike/mpv-manager/pkg/constants" "gitgud.io/mike/mpv-manager/pkg/log" ) func (s *Server) handleInstallStreamSSE(w http.ResponseWriter, r *http.Request) { sessionID := r.URL.Query().Get("session_id") if sessionID == "" { log.Error(constants.LogPrefixAPI + " Missing session ID in request") http.Error(w, "Missing session ID", http.StatusBadRequest) return } log.Info("[SSE] Client connected to stream: session_id=" + sessionID) session, exists := s.sessions.Get(sessionID) if !exists { log.Error(constants.LogPrefixAPI + " Session not found: session_id=" + sessionID) http.Error(w, "Session not found", http.StatusNotFound) return } log.Info("[SSE] Streaming started for session: " + sessionID) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { log.Error(constants.LogPrefixAPI + " Flusher not supported by response writer") http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } eventCount := 0 for { select { case output, ok := <-session.OutputChan: if ok { eventCount++ fmt.Fprintf(w, "event: output\ndata: %s\n\n", jsonEscape(output)) flusher.Flush() } case err, ok := <-session.ErrorChan: if ok { eventCount++ log.Error(constants.LogPrefixAPI + " Error event: " + err.Error()) fmt.Fprintf(w, "event: error\ndata: %s\n\n", jsonEscape(err.Error())) flusher.Flush() } case <-session.DoneChan: eventCount++ log.Info("[SSE] Done event for session: " + sessionID + " (total events: " + fmt.Sprintf("%d", eventCount) + ")") fmt.Fprintf(w, "event: done\ndata: {}\n\n") flusher.Flush() return case <-r.Context().Done(): log.Info("[SSE] Client disconnected from stream: " + sessionID + " (total events: " + fmt.Sprintf("%d", eventCount) + ")") return } } } // handleJobStreamSSE streams job events via Server-Sent Events // Clients can subscribe to all job events or filter by job_id query parameter // Uses fan-out pattern: each client gets its own dedicated channel func (s *Server) handleJobStreamSSE(w http.ResponseWriter, r *http.Request) { jobID := r.URL.Query().Get("job_id") log.Info("[SSE] Client connected to job stream: job_id=" + jobID) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { log.Error(constants.LogPrefixAPI + " Flusher not supported by response writer") http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // Register as a client and get a dedicated channel (fan-out pattern) clientChan := s.jobManager.RegisterClient() defer s.jobManager.UnregisterClient(clientChan) // If job_id specified, send initial status if job exists if jobID != "" { if job, exists := s.jobManager.GetJob(jobID); exists { initialData := map[string]interface{}{ "job_id": job.ID, "status": job.Status, "type": job.Type, "methodId": job.MethodID, "appName": job.AppName, } data, _ := json.Marshal(initialData) fmt.Fprintf(w, "event: status\ndata: %s\n\n", data) flusher.Flush() } } eventCount := 0 for { select { case event, ok := <-clientChan: if !ok { log.Info("[SSE] Client channel closed") return } // Filter by job_id if specified if jobID != "" && event.JobID != jobID { continue } eventCount++ // Build the event data with job_id included eventData := map[string]interface{}{ "job_id": event.JobID, } // Merge the event data switch v := event.Data.(type) { case map[string]interface{}: for key, val := range v { eventData[key] = val } case JobProgressData: eventData["progress"] = v.Progress eventData["message"] = v.Message case JobOutputData: eventData["output"] = v.Line case JobStatusData: eventData["status"] = v.Status if v.Error != "" { eventData["error"] = v.Error } if v.ErrorDetails != "" { eventData["error_details"] = v.ErrorDetails } if v.AppName != "" { eventData["appName"] = v.AppName } if v.Type != "" { eventData["type"] = v.Type } if v.MethodID != "" { eventData["methodId"] = v.MethodID } case JobCompleteData: eventData["status"] = "complete" eventData["success"] = v.Success eventData["progress"] = v.Progress eventData["message"] = v.Message eventData["appName"] = v.AppName eventData["type"] = v.Type eventData["methodId"] = v.MethodID } data, err := json.Marshal(eventData) if err != nil { log.Error("[SSE] Failed to marshal event data: " + err.Error()) continue } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.Type, data) flusher.Flush() case <-r.Context().Done(): log.Info("[SSE] Client disconnected from job stream (total events: " + fmt.Sprintf("%d", eventCount) + ")") return } } } func jsonEscape(s string) string { b, _ := json.Marshal(s) return string(b[1 : len(b)-1]) }