WebSocketServer/socketserver/hub.go

162 lines
5.6 KiB
Go

package socketserver
import (
"fmt"
"clickandjoin.app/websocketserver/modules/cache"
"clickandjoin.app/websocketserver/modules/rabbitmq"
"clickandjoin.app/websocketserver/modules/redis"
"clickandjoin.app/websocketserver/modules/scylladb"
"clickandjoin.app/websocketserver/modules/structs"
"clickandjoin.app/websocketserver/modules/utils"
"clickandjoin.app/websocketserver/socketclients"
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
"git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper/dbstructs"
"github.com/gofiber/websocket/v2"
)
var register = make(chan *structs.SocketClient)
var broadcast = make(chan structs.SocketMessage)
var unregister = make(chan *websocket.Conn)
func RunHub() {
for {
select {
case newSocketClient := <-register:
userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId"))
wsSessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("wsSessionId"))
newSocketClient.UserId = userId
gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients()))
if sessionAlreadyConnected := redis.AddUserWebSocketSessionId(userId, wsSessionId); sessionAlreadyConnected {
gocnjhelper.LogDebug("sessionAlreadyConnected")
if socketClient, ok := cache.GetSocketClient(wsSessionId); ok {
gocnjhelper.LogDebugf("sc %s %s", socketClient, ok)
// a socket client with this session is connected to this server
socketClient.SendCloseMessage()
socketClient.CancelFunc()
rabbitmq.DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName)
} else {
// a socket client with this session is connected to any other server
gocnjhelper.LogDebug("socket client with this session is connected to any other server")
rabbitmq.PublishClientMessage(structs.RabbitMqMessage{
Cmd: utils.RabbitMqWsCmdInternalDeleteWebSocketSession,
Rec: newSocketClient.UserId,
Body: wsSessionId})
}
}
err := rabbitmq.CreateWSClientBinding(newSocketClient, userId)
if err != nil {
gocnjhelper.LogErrorf("Failed to create client binding, err: %s", err)
break
}
cache.AddSocketClient(wsSessionId, newSocketClient)
gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients()))
gocnjhelper.LogDebugf("REGISTER CLIENT: %s", wsSessionId)
// for testing
if socketClient, ok := cache.GetSocketClient(wsSessionId); ok {
socketClient.SendMessage(structs.SendSocketMessage{Cmd: utils.WsCmdTest, Body: userId})
}
case data := <-broadcast:
var receivedMessage structs.ReceivedMessage
if err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage); err != nil {
gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err)
continue
}
if len(receivedMessage.RecUser) == utils.LenWebSocketSession {
gocnjhelper.LogDebug("type: user message")
// TODO: check if the user is allowed to sent a message to target user. For example: privacy settings, friend relationship
SendMessageToClient(receivedMessage.RecUser, structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
} else if len(receivedMessage.RecRoom) == utils.LenRoomId {
gocnjhelper.LogDebug("type: room message")
var roomUsers []dbstructs.RoomUsers
if err := scylladb.Session.Query(gocnjhelper.DbMRoomUsersHelperPKRoomId.Select("user_id")).BindStruct(dbstructs.RoomUsers{RoomId: receivedMessage.RecRoom}).SelectRelease(&roomUsers); err != nil {
gocnjhelper.LogErrorf("Failed to get room users, err: %s", err.Error())
continue
}
client, err := cache.GetSocketClientByConn(data.Conn)
if err != nil {
gocnjhelper.LogError(err.Error())
continue
}
// check if user is in the room and has not faked the room id
if !isUserInRoom(client.UserId, roomUsers) {
gocnjhelper.LogError("User not in room")
continue
}
for _, roomUser := range roomUsers {
SendMessageToClient(roomUser.UserId, structs.SendSocketMessage{
Cmd: receivedMessage.Cmd,
Body: receivedMessage.Body,
})
}
}
case connection := <-unregister:
for id, client := range cache.GetSocketClients() {
if connection == client.Conn {
gocnjhelper.LogDebugf("UNREGISTER CLIENT: %s", id)
cache.DeleteClient(id)
client.CancelFunc()
rabbitmq.DeleteWSClient(client.RabbitMqConsumerId, client.RabbitMqQueueName)
redis.RemoveUserWebSocketSessionId(client.UserId, id)
}
}
}
}
}
func isUserInRoom(userId string, roomUsers []dbstructs.RoomUsers) bool {
for _, roomUser := range roomUsers {
if roomUser.UserId == userId {
return true
}
}
return false
}
func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) {
if isConnected, socketClients := socketclients.IsReceiverConnectedToThisServer(targetUserId); isConnected {
// send message to target receiver which is connected to this server
gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver is connected to this server")
for _, socketClient := range socketClients {
gocnjhelper.LogDebugf("send to %s", socketClient.UserId)
socketClient.SendMessage(msg)
}
} else { // send message to target receiver which is connected to any other server
gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver connected to other server")
if redis.IsUserConnectedToAnyWebSocketServer(targetUserId) {
err := rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: msg.Cmd, Rec: targetUserId, Body: msg.Body})
if err != nil {
gocnjhelper.LogErrorf("Failed to publish client message, err: %s", err)
}
} else {
// TODO: handle when client not connected to websocket. eg. Push Notification
gocnjhelper.LogDebug("rec user not connected to any other websocket server")
}
}
}