package controllers import ( "encoding/json" "fmt" "log" "sync" "time" "github.com/gofiber/websocket/v2" ) // WebSocketManager handles all active WebSocket connections type WebSocketManager struct { connections map[*websocket.Conn]bool mutex sync.RWMutex broadcast chan WebSocketMessage } // WebSocketMessage represents the structure of messages type WebSocketMessage struct { Command string `json:"command"` Channel string `json:"channel"` Text string `json:"text"` } // Global instance of WebSocketManager var WSManager = &WebSocketManager{ connections: make(map[*websocket.Conn]bool), broadcast: make(chan WebSocketMessage, 100), // Buffer size of 100 } func init() { // Start the broadcast handler go WSManager.handleBroadcasts() } func (m *WebSocketManager) handleBroadcasts() { for message := range m.broadcast { jsonMessage, err := json.Marshal(message) if err != nil { log.Printf("Error marshaling message: %v", err) continue } m.mutex.RLock() for conn := range m.connections { // Send message asynchronously go func(c *websocket.Conn) { writeTimeout := time.Now().Add(time.Second * 5) c.SetWriteDeadline(writeTimeout) if err := c.WriteMessage(websocket.TextMessage, jsonMessage); err != nil { log.Printf("Error sending message: %v", err) m.removeConnection(c) } }(conn) } m.mutex.RUnlock() } } // BroadcastMessage sends a message to all connected clients func (m *WebSocketManager) BroadcastMessage(command, channel, text string) { message := WebSocketMessage{ Command: command, Channel: channel, Text: text, } // Non-blocking send to broadcast channel select { case m.broadcast <- message: // Message queued successfully default: log.Println("Broadcast channel full, message dropped") } } // SendMessageToClient sends a message to a specific client func (m *WebSocketManager) SendMessageToClient(conn *websocket.Conn, command, channel, text string) error { message := WebSocketMessage{ Command: command, Channel: channel, Text: text, } jsonMessage, err := json.Marshal(message) if err != nil { return err } m.mutex.RLock() defer m.mutex.RUnlock() if _, exists := m.connections[conn]; exists { writeTimeout := time.Now().Add(time.Second * 5) conn.SetWriteDeadline(writeTimeout) return conn.WriteMessage(websocket.TextMessage, jsonMessage) } return nil } func (m *WebSocketManager) addConnection(conn *websocket.Conn) { m.mutex.Lock() defer m.mutex.Unlock() m.connections[conn] = true } func (m *WebSocketManager) removeConnection(conn *websocket.Conn) { m.mutex.Lock() defer m.mutex.Unlock() delete(m.connections, conn) conn.Close() } // WebsocketHandler handles WebSocket connections func WebsocketHandler(c *websocket.Conn) { // Set read deadline c.SetReadDeadline(time.Now().Add(time.Second * 60)) // 1 minute timeout // Add the connection to our manager WSManager.addConnection(c) defer WSManager.removeConnection(c) // Handle incoming messages for { _, message, err := c.ReadMessage() if err != nil { log.Println("Error reading message:", err) break } // Process message asynchronously go func() { var wsMessage WebSocketMessage if err := json.Unmarshal(message, &wsMessage); err != nil { log.Printf("Error parsing message: %v", err) return } switch wsMessage.Command { case "chat": WSManager.BroadcastMessage("chat", wsMessage.Channel, wsMessage.Text) case "join": WSManager.BroadcastMessage("system", wsMessage.Channel, fmt.Sprintf("User joined channel %s", wsMessage.Channel)) case "leave": WSManager.BroadcastMessage("system", wsMessage.Channel, fmt.Sprintf("User left channel %s", wsMessage.Channel)) default: WSManager.SendMessageToClient(c, "error", "", "Unknown command") } }() } }