diff --git a/main.go b/main.go index 06ebbbe..7d48a18 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,7 @@ func init() { cfg := config.Cfg - gocnjhelper.InitLogger(config.Cfg.Debug, + gocnjhelper.InitLogger(cfg.Debug, true, true, gocnjhelper.GetConnectionString(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host), diff --git a/modules/rabbitmq/helper.go b/modules/rabbitmq/helper.go index 79e7d86..b1daded 100644 --- a/modules/rabbitmq/helper.go +++ b/modules/rabbitmq/helper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + "clickandjoin.app/websocketserver/modules/cache" "clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/utils" "clickandjoin.app/websocketserver/socketclients" @@ -74,23 +75,24 @@ func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err e case msg := <-msgs: var receivedMessage structs.ReceivedMessage - err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage) + gocnjhelper.LogDebugf("RABBITMQ RECEIVED MESSAGE: %s", msg.Body) - if err != nil { - gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err) + if err = utils.UnmarshalReceivedMessage(msg.Body, &receivedMessage); err != nil { + break } - gocnjhelper.LogDebugf("RABBITMQ RECEIVED MESSAGE: %s", receivedMessage) - if receivedMessage.Cmd == utils.RabbitMqWsCmdInternalDeleteWebSocketSession { - gocnjhelper.LogDebugf("here %s", fmt.Sprintf("%v", receivedMessage.Body)) - /* - if socketClient, ok := cache.GetSocketClient(fmt.Sprintf("%v", receivedMessage.Body)); ok { - logrus.Println("here deleted", socketClient) - socketClient.SendCloseMessage() - socketClient.CancelFunc() - DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName) - } */ + gocnjhelper.LogDebugf("rabbitmq here %s", fmt.Sprintf("%v", receivedMessage.Body)) + + if socketClient, ok := cache.GetSocketClient(fmt.Sprintf("%v", receivedMessage.Body)); ok { + gocnjhelper.LogDebugf("rabbitmq del socketClient %s", socketClient) + cache.DeleteClient(fmt.Sprintf("%v", receivedMessage.Body)) + socketClient.SendCloseMessage() + socketClient.CancelFunc() + DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName) + } else { + gocnjhelper.LogDebug("rabbitmq del socketclient else not found") + } } else { err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) @@ -98,13 +100,6 @@ func CreateWSClientBinding(socketClient *structs.SocketClient, id string) (err e gocnjhelper.LogErrorf("Failed to send message to client, err: %s", err) } } - - /* - err = socketClient.SendMessage(structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) - - if err != nil { - logrus.Errorln("Failed to send message to client, err:", err) - }*/ } } }() diff --git a/modules/redis/redis.go b/modules/redis/redis.go index 368777f..d65b916 100644 --- a/modules/redis/redis.go +++ b/modules/redis/redis.go @@ -24,40 +24,46 @@ func Init() { } } -func AddUserWebSocketSessionId(userId string, wsSessionId string) { - cmd := rdb.LPush(ctx, userId, wsSessionId) +func AddUserWebSocketSessionId(userId string, wsSessionId string) (sessionAlreadyConnected bool) { + res, err := rdb.SAdd(ctx, userId, wsSessionId).Result() - gocnjhelper.LogDebugf("b %s", cmd) + if err != nil { + gocnjhelper.LogErrorf("Failed to add key, err: %s", err.Error()) + } + + gocnjhelper.LogDebugf("redis SAdd %v", res, userId, wsSessionId) + + return res == 0 } func RemoveUserWebSocketSessionId(userId string, wsSessionId string) { - cmd := rdb.LRem(ctx, userId, -1, wsSessionId) + res, err := rdb.SRem(ctx, userId, wsSessionId).Result() - gocnjhelper.LogDebugf("rem %s", cmd) + if err != nil { + gocnjhelper.LogErrorf("Failed to rem key, err: %s", err.Error()) + } + + gocnjhelper.LogDebugf("redis rem key %v", res) } func IsUserConnectedToAnyWebSocketServer(userId string) bool { - cmd := rdb.Exists(ctx, userId) + sessions, err := rdb.SMembers(ctx, userId).Result() - gocnjhelper.LogDebugf("exists b %s", cmd) + if err != nil { + gocnjhelper.LogErrorf("SMembers, err: %s", err.Error()) + return false + } - return cmd.Val() == 1 + return len(sessions) > 0 } func ExistsUserWebSocketSessionId(userId string, wsSessionId string) bool { - wsSessions := rdb.LRange(ctx, userId, 0, -1) + res, err := rdb.SIsMember(ctx, userId, wsSessionId).Result() - gocnjhelper.LogDebugf("found ws %s", wsSessions.Val()) - - return isWsSessionIdInList(wsSessions.Val(), wsSessionId) -} - -func isWsSessionIdInList(wsSessions []string, wsSessionId string) bool { - for _, item := range wsSessions { - if item == wsSessionId { - return true - } + if err != nil { + gocnjhelper.LogErrorf("SIsMember, err: %s", err.Error()) + return false } - return false + return res } diff --git a/modules/utils/utils.go b/modules/utils/utils.go index ec61898..29bd410 100644 --- a/modules/utils/utils.go +++ b/modules/utils/utils.go @@ -22,7 +22,7 @@ func UnmarshalReceivedMessage(body []byte, message any) error { err := json.Unmarshal(body, &message) if err != nil { - gocnjhelper.LogDebugf("Failed to unmarshal received message, err: %s", err) + gocnjhelper.LogErrorf("Failed to unmarshal received message, err: %s", err) return err } diff --git a/socketclients/socketclients.go b/socketclients/socketclients.go index ab04dea..44f186c 100644 --- a/socketclients/socketclients.go +++ b/socketclients/socketclients.go @@ -32,7 +32,3 @@ func getAllSocketClientsByUserId(userId string) []*structs.SocketClient { return connectedSocketClients } - -func GetSocketClientByWsSessionId(wsSessionId string) (socketClient *structs.SocketClient, ok bool) { - return cache.GetSocketClient(wsSessionId) -} diff --git a/socketserver/hub.go b/socketserver/hub.go index f2bd895..cdf2ae1 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -26,27 +26,27 @@ func RunHub() { newSocketClient.UserId = userId - gocnjhelper.LogDebugf("clients: %d %s", len(cache.GetSocketClients()), cache.GetSocketClients()) + gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients())) - if redis.ExistsUserWebSocketSessionId(userId, wsSessionId) { // kick out the other connected socket connections with this session - gocnjhelper.LogDebug("ws id already in list") + if sessionAlreadyConnected := redis.AddUserWebSocketSessionId(userId, wsSessionId); sessionAlreadyConnected { + gocnjhelper.LogDebug("sessionAlreadyConnected") - if socketClient, ok := socketclients.GetSocketClientByWsSessionId(wsSessionId); ok { + if socketClient, ok := cache.GetSocketClient(wsSessionId); ok { gocnjhelper.LogDebugf("sc %s %s", socketClient, ok) // a socket client with this session is connected to this server socketClient.SendCloseMessage() socketClient.CancelFunc() rabbitmq.DeleteWSClient(socketClient.RabbitMqConsumerId, socketClient.RabbitMqQueueName) - } else { // a socket client with this session is connected to any other server - /*logrus.Println("else", newSocketClient.UserId) + } else { + // a socket client with this session is connected to any other server + gocnjhelper.LogDebug("socket client with this session is connected to any other server") + rabbitmq.PublishClientMessage(structs.RabbitMqMessage{ Cmd: utils.RabbitMqWsCmdInternalDeleteWebSocketSession, Rec: newSocketClient.UserId, - Body: wsSessionId}) */ + Body: wsSessionId}) } - } else { - redis.AddUserWebSocketSessionId(userId, wsSessionId) } err := rabbitmq.CreateWSClientBinding(newSocketClient, userId) @@ -58,6 +58,7 @@ func RunHub() { cache.AddSocketClient(wsSessionId, newSocketClient) + gocnjhelper.LogDebugf("clients: %d", len(cache.GetSocketClients())) gocnjhelper.LogDebugf("REGISTER CLIENT: %s", wsSessionId) // for testing