121 lines
4.2 KiB
Go
121 lines
4.2 KiB
Go
package socketserver
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"clickandjoin.app/websocketserver/modules/cache"
|
|
"clickandjoin.app/websocketserver/modules/rabbitmq"
|
|
"clickandjoin.app/websocketserver/modules/redis"
|
|
"clickandjoin.app/websocketserver/modules/structs"
|
|
"clickandjoin.app/websocketserver/modules/utils"
|
|
"clickandjoin.app/websocketserver/socketclients"
|
|
gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper"
|
|
"github.com/gofiber/websocket/v2"
|
|
)
|
|
|
|
var register = make(chan *structs.SocketClient)
|
|
var broadcast = make(chan structs.SocketMessage)
|
|
var unregister = make(chan *websocket.Conn)
|
|
|
|
func RunHub() {
|
|
for {
|
|
select {
|
|
case newSocketClient := <-register:
|
|
userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId"))
|
|
wsSessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("wsSessionId"))
|
|
|
|
newSocketClient.UserId = userId
|
|
|
|
gocnjhelper.LogDebugf("clients: %d %s", len(cache.GetSocketClients()), cache.GetSocketClients())
|
|
|
|
if redis.ExistsUserWebSocketSessionId(userId, wsSessionId) { // kick out the other connected socket connections with this session
|
|
gocnjhelper.LogDebug("ws id already in list")
|
|
|
|
if socketClient, ok := socketclients.GetSocketClientByWsSessionId(wsSessionId); ok {
|
|
gocnjhelper.LogDebugf("sc %s %s", socketClient, ok)
|
|
|
|
// a socket client with this session is connected to this server
|
|
socketClient.SendCloseMessage()
|
|
socketClient.CancelFunc()
|
|
rabbitmq.DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName)
|
|
} else { // a socket client with this session is connected to any other server
|
|
/*logrus.Println("else", newSocketClient.UserId)
|
|
rabbitmq.PublishClientMessage(structs.RabbitMqMessage{
|
|
Cmd: utils.RabbitMqWsCmdInternalDeleteWebSocketSession,
|
|
Rec: newSocketClient.UserId,
|
|
Body: wsSessionId}) */
|
|
}
|
|
} else {
|
|
redis.AddUserWebSocketSessionId(userId, wsSessionId)
|
|
}
|
|
|
|
err := rabbitmq.CreateWSClientBinding(newSocketClient, userId)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to create client binding, err: %s", err)
|
|
break
|
|
}
|
|
|
|
cache.AddSocketClient(wsSessionId, newSocketClient)
|
|
|
|
gocnjhelper.LogDebugf("REGISTER CLIENT: %s", wsSessionId)
|
|
|
|
// for testing
|
|
if socketClient, ok := cache.GetSocketClient(wsSessionId); ok {
|
|
socketClient.SendMessage(structs.SendSocketMessage{Cmd: utils.WsCmdTest, Body: userId})
|
|
}
|
|
|
|
case data := <-broadcast:
|
|
var receivedMessage structs.ReceivedMessage
|
|
|
|
err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage)
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err)
|
|
}
|
|
|
|
gocnjhelper.LogDebugf("RECEIVED WEBSOCKET MESSAGE: %s", receivedMessage)
|
|
|
|
if len(receivedMessage.Rec) == utils.LenWebSocketSession {
|
|
SendMessageToClient(receivedMessage.Rec, structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body})
|
|
}
|
|
|
|
case connection := <-unregister:
|
|
for id, client := range cache.GetSocketClients() {
|
|
if connection == client.Conn {
|
|
gocnjhelper.LogDebugf("UNREGISTER CLIENT: %s", id)
|
|
cache.DeleteClient(id)
|
|
|
|
client.CancelFunc()
|
|
rabbitmq.DeleteWSClient(client.RabbitMqConsumerId, client.RabbitMqQueueName)
|
|
redis.RemoveUserWebSocketSessionId(client.UserId, id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) {
|
|
if isConnected, socketClients := socketclients.IsReceiverConnectedToThisServer(targetUserId); isConnected {
|
|
// send message to target receiver which is connected to this server
|
|
gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver is connected to this server")
|
|
|
|
for _, socketClient := range socketClients {
|
|
gocnjhelper.LogDebugf("send to %s", socketClient.UserId)
|
|
socketClient.SendMessage(msg)
|
|
}
|
|
} else { // send message to target receiver which is connected to any other server
|
|
gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver connected to other server")
|
|
|
|
if redis.IsUserConnectedToAnyWebSocketServer(targetUserId) {
|
|
err := rabbitmq.PublishClientMessage(structs.RabbitMqMessage{Cmd: msg.Cmd, Rec: targetUserId, Body: msg.Body})
|
|
|
|
if err != nil {
|
|
gocnjhelper.LogErrorf("Failed to publish client message, err: %s", err)
|
|
}
|
|
} else {
|
|
gocnjhelper.LogDebug("rec user not connected to any other websocket server")
|
|
}
|
|
}
|
|
}
|