apifiber/controllers/websocketController.go.old

157 lines
3.8 KiB
Go

package controllers
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gofiber/websocket/v2"
)
// WebSocketManager handles all active WebSocket connections
type WebSocketManager struct {
connections map[*websocket.Conn]bool
mutex sync.RWMutex
broadcast chan WebSocketMessage
}
// WebSocketMessage represents the structure of messages
type WebSocketMessage struct {
Command string `json:"command"`
Channel string `json:"channel"`
Text string `json:"text"`
}
// Global instance of WebSocketManager
var WSManager = &WebSocketManager{
connections: make(map[*websocket.Conn]bool),
broadcast: make(chan WebSocketMessage, 100), // Buffer size of 100
}
func init() {
// Start the broadcast handler
go WSManager.handleBroadcasts()
}
func (m *WebSocketManager) handleBroadcasts() {
for message := range m.broadcast {
jsonMessage, err := json.Marshal(message)
if err != nil {
log.Printf("Error marshaling message: %v", err)
continue
}
m.mutex.RLock()
for conn := range m.connections {
// Send message asynchronously
go func(c *websocket.Conn) {
writeTimeout := time.Now().Add(time.Second * 5)
c.SetWriteDeadline(writeTimeout)
if err := c.WriteMessage(websocket.TextMessage, jsonMessage); err != nil {
log.Printf("Error sending message: %v", err)
m.removeConnection(c)
}
}(conn)
}
m.mutex.RUnlock()
}
}
// BroadcastMessage sends a message to all connected clients
func (m *WebSocketManager) BroadcastMessage(command, channel, text string) {
message := WebSocketMessage{
Command: command,
Channel: channel,
Text: text,
}
// Non-blocking send to broadcast channel
select {
case m.broadcast <- message:
// Message queued successfully
default:
log.Println("Broadcast channel full, message dropped")
}
}
// SendMessageToClient sends a message to a specific client
func (m *WebSocketManager) SendMessageToClient(conn *websocket.Conn, command, channel, text string) error {
message := WebSocketMessage{
Command: command,
Channel: channel,
Text: text,
}
jsonMessage, err := json.Marshal(message)
if err != nil {
return err
}
m.mutex.RLock()
defer m.mutex.RUnlock()
if _, exists := m.connections[conn]; exists {
writeTimeout := time.Now().Add(time.Second * 5)
conn.SetWriteDeadline(writeTimeout)
return conn.WriteMessage(websocket.TextMessage, jsonMessage)
}
return nil
}
func (m *WebSocketManager) addConnection(conn *websocket.Conn) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.connections[conn] = true
}
func (m *WebSocketManager) removeConnection(conn *websocket.Conn) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.connections, conn)
conn.Close()
}
// WebsocketHandler handles WebSocket connections
func WebsocketHandler(c *websocket.Conn) {
// Set read deadline
c.SetReadDeadline(time.Now().Add(time.Second * 60)) // 1 minute timeout
// Add the connection to our manager
WSManager.addConnection(c)
defer WSManager.removeConnection(c)
// Handle incoming messages
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
break
}
// Process message asynchronously
go func() {
var wsMessage WebSocketMessage
if err := json.Unmarshal(message, &wsMessage); err != nil {
log.Printf("Error parsing message: %v", err)
return
}
switch wsMessage.Command {
case "chat":
WSManager.BroadcastMessage("chat", wsMessage.Channel, wsMessage.Text)
case "join":
WSManager.BroadcastMessage("system", wsMessage.Channel,
fmt.Sprintf("User joined channel %s", wsMessage.Channel))
case "leave":
WSManager.BroadcastMessage("system", wsMessage.Channel,
fmt.Sprintf("User left channel %s", wsMessage.Channel))
default:
WSManager.SendMessageToClient(c, "error", "", "Unknown command")
}
}()
}
}