package web import ( "context" "fmt" "regexp" "sync" "time" ) // Job represents an installation, uninstallation, or update job type Job struct { ID string // Unique job ID (timestamp-based) Type string // "install", "uninstall", "update" MethodID string // Installation method ID (e.g., "mpv-flatpak") AppName string // Display name (e.g., "MPV") Status string // "pending", "running", "complete", "error", "cancelled" Progress int // 0-100 percentage Message string // Current action message Output []string // Full log output (all lines with timestamps) Error string // Short error message if failed ErrorDetails string // Detailed error reason StartedAt time.Time CompletedAt *time.Time // Context for cancellation support ctx context.Context cancel context.CancelFunc } // Context returns the job's context func (j *Job) Context() context.Context { return j.ctx } // Cancel cancels the job's context func (j *Job) Cancel() { if j.cancel != nil { j.cancel() } } // IsCancelled returns true if the job's context has been cancelled func (j *Job) IsCancelled() bool { if j.ctx == nil { return false } select { case <-j.ctx.Done(): return true default: return false } } // JobEvent represents an event broadcast for SSE streaming type JobEvent struct { JobID string // Job identifier Type string // "created", "progress", "output", "status", "complete" Data interface{} // Event-specific data } // JobEventData contains the data for progress/output events type JobProgressData struct { Progress int `json:"progress"` Message string `json:"message"` } type JobOutputData struct { Line string `json:"line"` } type JobStatusData struct { Status string `json:"status"` Error string `json:"error,omitempty"` ErrorDetails string `json:"errorDetails,omitempty"` AppName string `json:"appName,omitempty"` Type string `json:"type,omitempty"` MethodID string `json:"methodId,omitempty"` } type JobCompleteData struct { Success bool `json:"success"` Progress int `json:"progress"` Message string `json:"message"` AppName string `json:"appName"` Type string `json:"type"` MethodID string `json:"methodId"` CompletedAt time.Time `json:"completedAt"` } // JobManager manages all jobs and broadcasts events for SSE type JobManager struct { jobs map[string]*Job // All active jobs (pending/running) recentJobs []*Job // Last 10 completed jobs for history mu sync.RWMutex // Fan-out pattern: each SSE client gets its own channel clients map[chan JobEvent]struct{} // Registry of connected SSE clients clientsMu sync.RWMutex // Protects clients map } // maxRecentJobs is the maximum number of recent jobs to keep in history const maxRecentJobs = 10 // progressRegex matches percentages like "(45%)" or "45%" var progressRegex = regexp.MustCompile(`(?:\()?(\d{1,3})%(?:\))?`) // NewJobManager creates a new job manager with fan-out broadcasting func NewJobManager() *JobManager { return &JobManager{ jobs: make(map[string]*Job), recentJobs: make([]*Job, 0, maxRecentJobs), clients: make(map[chan JobEvent]struct{}), } } // RegisterClient registers a new SSE client and returns its dedicated channel // The client must call UnregisterClient when done to prevent memory leaks func (jm *JobManager) RegisterClient() chan JobEvent { clientChan := make(chan JobEvent, 100) // Buffered to handle bursts jm.clientsMu.Lock() jm.clients[clientChan] = struct{}{} jm.clientsMu.Unlock() return clientChan } // UnregisterClient removes a client and closes its channel // Safe to call multiple times func (jm *JobManager) UnregisterClient(c chan JobEvent) { jm.clientsMu.Lock() defer jm.clientsMu.Unlock() if _, exists := jm.clients[c]; exists { delete(jm.clients, c) close(c) } } // broadcastEvent sends an event to ALL registered SSE clients // Uses non-blocking send to prevent slow clients from blocking func (jm *JobManager) broadcastEvent(event JobEvent) { jm.clientsMu.RLock() defer jm.clientsMu.RUnlock() for clientChan := range jm.clients { select { case clientChan <- event: // Event sent successfully default: // Client buffer full, skip this event for this client // This prevents slow clients from blocking everyone else } } } // CreateJob creates a new pending job and returns it func (jm *JobManager) CreateJob(jobType, methodID, appName string) *Job { jobID := fmt.Sprintf("%d", time.Now().UnixNano()) // Create a context with cancel for this job ctx, cancel := context.WithCancel(context.Background()) job := &Job{ ID: jobID, Type: jobType, MethodID: methodID, AppName: appName, Status: "pending", Progress: 0, Message: "Job created", Output: make([]string, 0), StartedAt: time.Now(), ctx: ctx, cancel: cancel, } jm.mu.Lock() jm.jobs[jobID] = job jm.mu.Unlock() // Broadcast job creation event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "created", Data: map[string]interface{}{ "type": jobType, "methodId": methodID, "appName": appName, }, }) return job } // GetJob retrieves a job by ID func (jm *JobManager) GetJob(jobID string) (*Job, bool) { jm.mu.RLock() defer jm.mu.RUnlock() job, exists := jm.jobs[jobID] return job, exists } // UpdateProgress updates the progress and message for a job // Also parses progress from the message if it contains a percentage func (jm *JobManager) UpdateProgress(jobID string, progress int, message string) { jm.mu.Lock() defer jm.mu.Unlock() job, exists := jm.jobs[jobID] if !exists { return } // Parse progress from message if it contains percentage parsedProgress := jm.parseProgress(message) if parsedProgress > progress { progress = parsedProgress } // Ensure progress is within bounds if progress < 0 { progress = 0 } else if progress > 100 { progress = 100 } job.Progress = progress job.Message = message // Update status to running if it was pending if job.Status == "pending" { job.Status = "running" } // Broadcast progress event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "progress", Data: JobProgressData{ Progress: progress, Message: message, }, }) } // AddOutput adds a timestamped output line to the job func (jm *JobManager) AddOutput(jobID string, line string) { jm.mu.Lock() defer jm.mu.Unlock() job, exists := jm.jobs[jobID] if !exists { return } // Format output with timestamp timestamp := time.Now().Format("15:04:05") timestampedLine := fmt.Sprintf("[%s] %s", timestamp, line) job.Output = append(job.Output, timestampedLine) // Update status to running if it was pending if job.Status == "pending" { job.Status = "running" } // Try to parse progress from the output line if progress := jm.parseProgress(line); progress > job.Progress { job.Progress = progress } // Broadcast output event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "output", Data: JobOutputData{ Line: timestampedLine, }, }) } // SetError marks a job as errored with the given error and details func (jm *JobManager) SetError(jobID string, err error, details string) { jm.mu.Lock() defer jm.mu.Unlock() job, exists := jm.jobs[jobID] if !exists { return } now := time.Now() job.Status = "error" job.Error = err.Error() job.ErrorDetails = details job.CompletedAt = &now // Broadcast status event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "status", Data: JobStatusData{ Status: "error", Error: job.Error, ErrorDetails: job.ErrorDetails, AppName: job.AppName, Type: job.Type, MethodID: job.MethodID, }, }) // Archive the job jm.archiveJob(job) } // CompleteJob marks a job as complete func (jm *JobManager) CompleteJob(jobID string) { jm.mu.Lock() defer jm.mu.Unlock() job, exists := jm.jobs[jobID] if !exists { return } now := time.Now() job.Status = "complete" job.Progress = 100 job.CompletedAt = &now // Broadcast complete event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "complete", Data: JobCompleteData{ Success: true, Progress: 100, Message: job.Message, AppName: job.AppName, Type: job.Type, MethodID: job.MethodID, CompletedAt: now, }, }) // Archive the job jm.archiveJob(job) } // CancelJob cancels a running job by invoking its cancel function // Returns an error if the job is not found or already completed func (jm *JobManager) CancelJob(jobID string) error { jm.mu.Lock() defer jm.mu.Unlock() job, exists := jm.jobs[jobID] if !exists { return fmt.Errorf("job not found: %s", jobID) } // Check if job can be cancelled if job.Status == "complete" { return fmt.Errorf("job already completed: %s", jobID) } if job.Status == "error" { return fmt.Errorf("job already errored: %s", jobID) } if job.Status == "cancelled" { return fmt.Errorf("job already cancelled: %s", jobID) } // Cancel the context if job.cancel != nil { job.cancel() } // Update job status now := time.Now() job.Status = "cancelled" job.CompletedAt = &now job.Message = "Cancelled by user" job.Error = "Job cancelled by user" // Broadcast cancelled event jm.broadcastEvent(JobEvent{ JobID: jobID, Type: "status", Data: JobStatusData{ Status: "cancelled", Error: "Job cancelled by user", ErrorDetails: "The job was cancelled by user request", AppName: job.AppName, Type: job.Type, MethodID: job.MethodID, }, }) // Archive the job jm.archiveJob(job) return nil } // GetActiveJobs returns all jobs that are pending or running func (jm *JobManager) GetActiveJobs() []*Job { jm.mu.RLock() defer jm.mu.RUnlock() jobs := make([]*Job, 0, len(jm.jobs)) for _, job := range jm.jobs { if job.Status == "pending" || job.Status == "running" { jobs = append(jobs, job) } } return jobs } // GetRecentJobs returns the last completed jobs (max 10) func (jm *JobManager) GetRecentJobs() []*Job { jm.mu.RLock() defer jm.mu.RUnlock() // Return a copy to avoid race conditions result := make([]*Job, len(jm.recentJobs)) copy(result, jm.recentJobs) return result } // parseProgress extracts a percentage from a string // Matches patterns like "(45%)" or "45%" func (jm *JobManager) parseProgress(s string) int { matches := progressRegex.FindStringSubmatch(s) if len(matches) > 1 { var progress int if _, err := fmt.Sscanf(matches[1], "%d", &progress); err == nil { if progress >= 0 && progress <= 100 { return progress } } } return 0 } // archiveJob moves a completed/errored job to recent jobs and removes from active // Must be called with lock already held func (jm *JobManager) archiveJob(job *Job) { // Remove from active jobs delete(jm.jobs, job.ID) // Add to recent jobs (at the beginning) jm.recentJobs = append([]*Job{job}, jm.recentJobs...) // Trim to max size if len(jm.recentJobs) > maxRecentJobs { jm.recentJobs = jm.recentJobs[:maxRecentJobs] } } // GetAllJobs returns all active jobs (for debugging/admin purposes) func (jm *JobManager) GetAllJobs() []*Job { jm.mu.RLock() defer jm.mu.RUnlock() jobs := make([]*Job, 0, len(jm.jobs)) for _, job := range jm.jobs { jobs = append(jobs, job) } return jobs } // GetJobsByType returns all active jobs of a specific type func (jm *JobManager) GetJobsByType(jobType string) []*Job { jm.mu.RLock() defer jm.mu.RUnlock() jobs := make([]*Job, 0) for _, job := range jm.jobs { if job.Type == jobType { jobs = append(jobs, job) } } return jobs } // GetJobsByStatus returns all active jobs with a specific status func (jm *JobManager) GetJobsByStatus(status string) []*Job { jm.mu.RLock() defer jm.mu.RUnlock() jobs := make([]*Job, 0) for _, job := range jm.jobs { if job.Status == status { jobs = append(jobs, job) } } return jobs } // HasActiveJob checks if there's an active job for a specific method func (jm *JobManager) HasActiveJob(methodID string) bool { jm.mu.RLock() defer jm.mu.RUnlock() for _, job := range jm.jobs { if job.MethodID == methodID && (job.Status == "pending" || job.Status == "running") { return true } } return false } // Duration returns the duration of the job func (j *Job) Duration() time.Duration { if j.CompletedAt != nil { return j.CompletedAt.Sub(j.StartedAt) } return time.Since(j.StartedAt) } // IsComplete returns true if the job is complete or errored func (j *Job) IsComplete() bool { return j.Status == "complete" || j.Status == "error" } // IsSuccess returns true if the job completed successfully func (j *Job) IsSuccess() bool { return j.Status == "complete" } // IsError returns true if the job failed with an error func (j *Job) IsError() bool { return j.Status == "error" } // IsRunning returns true if the job is currently running func (j *Job) IsRunning() bool { return j.Status == "running" } // IsPending returns true if the job is pending func (j *Job) IsPending() bool { return j.Status == "pending" } // GetOutputLines returns the output as a single string func (j *Job) GetOutputString() string { result := "" for _, line := range j.Output { result += line + "\n" } return result }