From b633c31ee8fbef5b8cfcdd45373c0c1bb2d80c27 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 11 Mar 2023 21:08:22 +0100 Subject: [PATCH] rooms --- go.mod | 2 +- go.sum | 4 +++ modules/cache/cache.go | 15 +++++++++ modules/structs/message.go | 7 ++-- modules/utils/definitions.go | 1 + socketserver/hub.go | 65 +++++++++++++++++++++++++++++++----- 6 files changed, 81 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 6a32e74..1bc25cb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module clickandjoin.app/websocketserver go 1.19 require ( - git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.75 + git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.82 github.com/gocql/gocql v1.3.1 github.com/gofiber/fiber/v2 v2.42.0 github.com/gofiber/websocket/v2 v2.1.4 diff --git a/go.sum b/go.sum index c488ee3..05fb8fc 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,10 @@ git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.54 h1:bAB8Z9qJgX/H+P git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.54/go.mod h1:/CvzIeBG4vJK/MgXzwmwCYwodiTGTMOYGDF3ao/fFLM= git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.75 h1:Ei003kPBpKVqukqQIGg111vz82G9toIvTbfnYzhVQ0k= git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.75/go.mod h1:rnEM9rcZy2dg4SaDCGmSf34fp7ECzdyxxnRut2HBmrs= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.81 h1:idNch20NsLGt/nOFvLmJJObYh3eifgYWlZF3j12pgjs= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.81/go.mod h1:rnEM9rcZy2dg4SaDCGmSf34fp7ECzdyxxnRut2HBmrs= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.82 h1:Q92bABF1YoSaLXXWcGGGYQ0MjLjoU/ymicKrPUpSpLQ= +git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper v1.0.82/go.mod h1:rnEM9rcZy2dg4SaDCGmSf34fp7ECzdyxxnRut2HBmrs= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= diff --git a/modules/cache/cache.go b/modules/cache/cache.go index b95cff7..458707a 100644 --- a/modules/cache/cache.go +++ b/modules/cache/cache.go @@ -1,9 +1,11 @@ package cache import ( + "errors" "sync" "clickandjoin.app/websocketserver/modules/structs" + "github.com/gofiber/websocket/v2" ) var socketClients = make(map[string]*structs.SocketClient) @@ -36,3 +38,16 @@ func GetSocketClient(wsSessionId string) (socketClient *structs.SocketClient, ok return client, ok } + +func GetSocketClientByConn(conn *websocket.Conn) (socketClient *structs.SocketClient, err error) { + mu.RLock() + defer mu.RUnlock() + + for _, client := range socketClients { + if client.Conn == conn { + return client, nil + } + } + + return nil, errors.New("Failed to find socket client by ws conn") +} diff --git a/modules/structs/message.go b/modules/structs/message.go index 38f578f..509a468 100644 --- a/modules/structs/message.go +++ b/modules/structs/message.go @@ -1,7 +1,8 @@ package structs type ReceivedMessage struct { - Cmd int - Rec string // represent receiver user id - Body any + Cmd int + RecUser string `json:"rec_user"` // represent receiver user id + RecRoom string `json:"rec_room"` // represent receiver room id + Body any } diff --git a/modules/utils/definitions.go b/modules/utils/definitions.go index eee41b6..f259949 100644 --- a/modules/utils/definitions.go +++ b/modules/utils/definitions.go @@ -2,6 +2,7 @@ package utils const ( LenWebSocketSession = 36 + LenRoomId = 36 WsCmdTest = 99999 // only for testing - userId is sent diff --git a/socketserver/hub.go b/socketserver/hub.go index cdf2ae1..cd32a11 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -6,10 +6,12 @@ import ( "clickandjoin.app/websocketserver/modules/cache" "clickandjoin.app/websocketserver/modules/rabbitmq" "clickandjoin.app/websocketserver/modules/redis" + "clickandjoin.app/websocketserver/modules/scylladb" "clickandjoin.app/websocketserver/modules/structs" "clickandjoin.app/websocketserver/modules/utils" "clickandjoin.app/websocketserver/socketclients" gocnjhelper "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper" + "git.clickandjoin.umbach.dev/ClickandJoin/go-cnj-helper/dbstructs" "github.com/gofiber/websocket/v2" ) @@ -69,16 +71,47 @@ func RunHub() { case data := <-broadcast: var receivedMessage structs.ReceivedMessage - err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage) - - if err != nil { + if err := utils.UnmarshalReceivedMessage(data.Msg, &receivedMessage); err != nil { gocnjhelper.LogErrorf("Failed to unmarshal received msg, err: %s", err) + continue } - gocnjhelper.LogDebugf("RECEIVED WEBSOCKET MESSAGE: %s", receivedMessage) + if len(receivedMessage.RecUser) == utils.LenWebSocketSession { + gocnjhelper.LogDebug("type: user message") + // TODO: check if the user is allowed to sent a message to target user. For example: privacy settings, friend relationship + // TODO: handle when client not connected to websocket. eg. Push Notification + SendMessageToClient(receivedMessage.RecUser, structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + } else if len(receivedMessage.RecRoom) == utils.LenRoomId { + gocnjhelper.LogDebug("type: room message") - if len(receivedMessage.Rec) == utils.LenWebSocketSession { - SendMessageToClient(receivedMessage.Rec, structs.SendSocketMessage{Cmd: receivedMessage.Cmd, Body: receivedMessage.Body}) + var roomUsers []dbstructs.RoomUsers + + if err := scylladb.Session.Query(gocnjhelper.DbMRoomUsersHelperPKRoomId.Select("user_id")).BindStruct(dbstructs.RoomUsers{RoomId: receivedMessage.RecRoom}).SelectRelease(&roomUsers); err != nil { + gocnjhelper.LogErrorf("Failed to get room users, err: %s", err.Error()) + continue + } + + client, err := cache.GetSocketClientByConn(data.Conn) + + if err != nil { + gocnjhelper.LogError(err.Error()) + continue + } + + // check if user is really in the room and has not faked the room id + if !isUserInRoom(client.UserId, roomUsers) { + gocnjhelper.LogError("User not in room") + continue + } + + for _, roomUser := range roomUsers { + // TODO: handle when client not connected to websocket. eg. Push Notification + /*connToWs :=*/ + SendMessageToClient(roomUser.UserId, structs.SendSocketMessage{ + Cmd: receivedMessage.Cmd, + Body: receivedMessage.Body, + }) + } } case connection := <-unregister: @@ -96,7 +129,16 @@ func RunHub() { } } -func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) { +func isUserInRoom(userId string, roomUsers []dbstructs.RoomUsers) bool { + for _, roomUser := range roomUsers { + if roomUser.UserId == userId { + return true + } + } + return false +} + +func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) (connectedToWebSocket bool) { if isConnected, socketClients := socketclients.IsReceiverConnectedToThisServer(targetUserId); isConnected { // send message to target receiver which is connected to this server gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver is connected to this server") @@ -105,6 +147,8 @@ func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) { gocnjhelper.LogDebugf("send to %s", socketClient.UserId) socketClient.SendMessage(msg) } + + return true } else { // send message to target receiver which is connected to any other server gocnjhelper.LogDebug("FORWARDING MESSAGE: receiver connected to other server") @@ -114,8 +158,11 @@ func SendMessageToClient(targetUserId string, msg structs.SendSocketMessage) { if err != nil { gocnjhelper.LogErrorf("Failed to publish client message, err: %s", err) } - } else { - gocnjhelper.LogDebug("rec user not connected to any other websocket server") + + return true } + + gocnjhelper.LogDebug("rec user not connected to any other websocket server") } + return false }