package socketserver import ( "fmt" "clickandjoin.app/websocketserver/modules/cache" "clickandjoin.app/websocketserver/modules/rabbitmq" "clickandjoin.app/websocketserver/modules/redis" "clickandjoin.app/websocketserver/modules/scylladb" "clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/utils" "clickandjoin.app/websocketserver/socketclients" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper/dbstructs" "github.com/gofiber/websocket/v2" ) var register = make(chan *structs.SocketClient) var broadcast = make(chan structs.SocketMessage) var unregister = make(chan *websocket.Conn) func RunHub() { for { select { case newSocketClient := <-register: userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId")) wsSessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("wsSessionId")) newSocketClient.UserId = userId gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients())) if sessionAlreadyConnected := redis.AddUserWebSocketSessionId(userId, wsSessionId); sessionAlreadyConnected { gocnjhelper.LogDebug("sessionAlreadyConnected") if socketClient, ok := cache.GetSocketClient(wsSessionId); ok { gocnjhelper.LogDebugf("sc %s %s", socketClient, ok) // a socket client with this session is connected to this server socketClient.SendCloseMessage() socketClient.CancelFunc() rabbitmq.DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName) } else { // a socket client with this session is connected to any other server gocnjhelper.LogDebug("socket client with this session is connected to any other server") rabbitmq.PublishClientMessage(structs.RabbitMqMessage{ Cmd: utils.RabbitMqWsCmdInternalDeleteWebSocketSession, Rec: newSocketClient.UserId, Body: wsSessionId}) } } err := rabbitmq.CreateWSClientBinding(newSocketClient, userId) if err != nil { gocnjhelper.LogErrorf("Failed to create client binding, err: %s", err) break } cache.AddSocketClient(wsSessionId, newSocketClient) gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients())) gocnjhelper.LogDebugf("REGISTER CLIENT: %s", wsSessionId) // for testing if socketClient, ok := cache.GetSocketClient(wsSessionId); ok { socketClient.SendMessage(structs.SendSocketMessage{Cmd: utils.WsCmdTest, Body: userId}) } case data := <-broadcast: var receivedMessage structs.ReceivedMessage if err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage); err != nil { gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err) continue } if len(receivedMessage.RecUser) == utils.LenWebSocketSession { gocnjhelper.LogDebug("type: user message") // TODO: check if the user is allowed to sent a message to target user. For example: privacy settings, friend relationship // TODO: handle when client not connected to websocket. eg. Push Notification SendMessageToClient(receivedMessage.RecUser, structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) } else if len(receivedMessage.RecRoom) == utils.LenRoomId { gocnjhelper.LogDebug("type: room message") var roomUsers []dbstructs.RoomUsers if err := scylladb.Session.Query(gocnjhelper.DbMRoomUsersHelperPKRoomId.Select("user_id")).BindStruct(dbstructs.RoomUsers{RoomId: receivedMessage.RecRoom}).SelectRelease(&roomUsers); err != nil { gocnjhelper.LogErrorf("Failed to get room users, err: %s", err.Error()) continue } client, err := cache.GetSocketClientByConn(data.Conn) if err != nil { gocnjhelper.LogError(err.Error()) continue } // check if user is really in the room and has not faked the room id if !isUserInRoom(client.UserId, roomUsers) { gocnjhelper.LogError("User not in room") continue } for _, roomUser := range roomUsers { // TODO: handle when client not connected to websocket. eg. Push Notification /*connToWs :=*/ SendMessageToClient(roomUser.UserId, structs.SendSocketMessage{ Cmd: receivedMessage.Cmd, Body: receivedMessage.Body, }) } } case connection := <-unregister: for id, client := range cache.GetSocketClients() { if connection == client.Conn { gocnjhelper.LogDebugf("UNREGISTER CLIENT: %s", id) cache.DeleteClient(id) client.CancelFunc() rabbitmq.DeleteWSClient(client.RabbitMqConsumerId, client.RabbitMqQueueName) redis.RemoveUserWebSocketSessionId(client.UserId, id) } } } } } func isUserInRoom(userId string, roomUsers []dbstructs.RoomUsers) bool { for _, roomUser := range roomUsers { if roomUser.UserId == userId { return true } } return false } func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) (connectedToWebSocket bool) { if isConnected, socketClients := socketclients.IsReceiverConnectedToThisServer(targetUserId); isConnected { // send message to target receiver which is connected to this server gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver is connected to this server") for _, socketClient := range socketClients { gocnjhelper.LogDebugf("send to %s", socketClient.UserId) socketClient.SendMessage(msg) } return true } else { // send message to target receiver which is connected to any other server gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver connected to other server") if redis.IsUserConnectedToAnyWebSocketServer(targetUserId) { err := rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: msg.Cmd, Rec: targetUserId, Body: msg.Body}) if err != nil { gocnjhelper.LogErrorf("Failed to publish client message, err: %s", err) } return true } gocnjhelper.LogDebug("rec user not connected to any other websocket server") } return false }