diff --git a/modules/cache/socketclient.go b/modules/cache/socketclient.go index bb612b1..b5e888d 100644 --- a/modules/cache/socketclient.go +++ b/modules/cache/socketclient.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/gofiber/websocket/v2" + "github.com/rs/zerolog/log" ) var socketClients []*structs.SocketClient @@ -39,3 +40,17 @@ func GetSocketClients() []*structs.SocketClient { return socketClients } + +func SubscribeSocketClientToTopic(sessionId string, topic string) { + mu.Lock() + defer mu.Unlock() + + log.Info().Msgf("Subscribing session %s to topic %s", sessionId, topic) + + for _, socketClient := range socketClients { + if socketClient.SessionId == sessionId { + socketClient.SubscribedTopic = topic + break + } + } +} diff --git a/modules/structs/socket.go b/modules/structs/socket.go index 9709d11..29d8977 100644 --- a/modules/structs/socket.go +++ b/modules/structs/socket.go @@ -17,10 +17,11 @@ const ( ) type SocketClient struct { - SessionId string - UserId string - Conn *websocket.Conn - connMu sync.Mutex + SessionId string + UserId string + Conn *websocket.Conn + connMu sync.Mutex + SubscribedTopic string } type SocketMessage struct { diff --git a/modules/utils/globals.go b/modules/utils/globals.go index a34a491..ed0a8c8 100644 --- a/modules/utils/globals.go +++ b/modules/utils/globals.go @@ -84,6 +84,7 @@ const ( SentCmdInstallingGlobalPythonPackages = 36 SentCmdInstallingGlobalPythonPackagesFailed = 37 SentCmdInstallingGlobalPythonPackagesFinished = 38 + SentCmdUpdateUsers = 39 ) // commands received from web clients @@ -110,6 +111,7 @@ const ( ReceivedCmdDeleteUserApiKey = 20 ReceivedCmdGroupTasksInstallPythonPackages = 21 ReceivedCmdGroupTasksInstallGlobalPythonPackages = 22 + ReceivedCmdSubscribeToTopic = 23 ) const ( @@ -201,3 +203,16 @@ var DynamicGroupTasksPermissions = []string{ PermissionGroupTasksOverviewXYInstallPythonPackages, PermissionGroupTasksOverviewXYView, } + +// This represents the paths registered on the api. +// Available paths which are used for the websocket topic subscription +// to only send messages to clients who are subscribed to the topic +const ( + // this is path is dynamic because there are more params after the /v1/grouptasks/ + SubscribedDynamicTopicGroupTasks = "/grouptasks/" + + // these paths are static - there are no params after the path + SubscribedTopicAdminAreaRoles = "/adminarea/roles" + SubscribedTopicUsers = "/users" + SubscribedTopicUserProfile = "/user-profile" +) diff --git a/socketclients/socketclients.go b/socketclients/socketclients.go index 50fa3bd..e00484b 100644 --- a/socketclients/socketclients.go +++ b/socketclients/socketclients.go @@ -19,6 +19,14 @@ import ( "gorm.io/gorm" ) +func BroadcastMessageToTopic(topic string, sendSocketMessage structs.SendSocketMessage) { + for _, client := range cache.GetSocketClients() { + if client.SubscribedTopic == topic { + client.SendMessage(sendSocketMessage) + } + } +} + func BroadcastMessage(sendSocketMessage structs.SendSocketMessage) { for _, client := range cache.GetSocketClients() { client.SendMessage(sendSocketMessage) @@ -69,6 +77,15 @@ func SendMessageToUser(userId string, ignoreUserSessionId string, sendSocketMess } } +func SendMessageToUserWithTopic(userId string, topic string, sendSocketMessage structs.SendSocketMessage) { + for _, client := range cache.GetSocketClients() { + log.Info().Msgf("Client: %v", client.SubscribedTopic) + if client.UserId == userId && client.SubscribedTopic == topic { + client.SendMessage(sendSocketMessage) + } + } +} + func SendMessageOnlyToSessionId(sessionId string, sendSocketMessage structs.SendSocketMessage) { for _, client := range cache.GetSocketClients() { if client.SessionId == sessionId { @@ -117,17 +134,20 @@ func UpdateConnectedUsers(userId string) { database.DB.First(&user, "id = ?", userId) BroadcastMessage(structs.SendSocketMessage{ - Cmd: utils.SentCmdUpdateConnectedUsers, + Cmd: utils.SentCmdUpdateConnectedUsers, + Body: len(cache.GetSocketClients()), + }) + + BroadcastMessageToTopic(utils.SubscribedTopicUsers, structs.SendSocketMessage{ + Cmd: utils.SentCmdUpdateUsers, Body: struct { - WebSocketUsersCount int - UserId string - ConnectionStatus uint8 - LastOnline time.Time + UserId string + ConnectionStatus uint8 + LastOnline time.Time }{ - WebSocketUsersCount: len(cache.GetSocketClients()), - UserId: userId, - ConnectionStatus: isUserGenerallyConnected(userId), - LastOnline: user.LastOnline, + UserId: userId, + ConnectionStatus: isUserGenerallyConnected(userId), + LastOnline: user.LastOnline, }, }) } @@ -982,8 +1002,13 @@ func CreateNewUserApiKey(userId string, apiName string) { } database.DB.Create(&newApiKey) + /* + SendMessageToUser(userId, "", structs.SendSocketMessage{ + Cmd: utils.SentCmdNewUserApiKeyCreated, + Body: newApiKey, + }) */ - SendMessageToUser(userId, "", structs.SendSocketMessage{ + SendMessageToUserWithTopic(userId, utils.SubscribedTopicUserProfile, structs.SendSocketMessage{ Cmd: utils.SentCmdNewUserApiKeyCreated, Body: newApiKey, }) @@ -999,8 +1024,13 @@ func CreateNewUserApiKey(userId string, apiName string) { func DeleteUserApiKey(userId string, apiKey string) { database.DB.Where("id = ?", apiKey).Where("user_id = ?", userId).Delete(&structs.UserApiKey{}) + /* + SendMessageToUser(userId, "", structs.SendSocketMessage{ + Cmd: utils.SentCmdDeletedUserApiKey, + Body: apiKey, + }) */ - SendMessageToUser(userId, "", structs.SendSocketMessage{ + SendMessageToUserWithTopic(userId, utils.SubscribedTopicUserProfile, structs.SendSocketMessage{ Cmd: utils.SentCmdDeletedUserApiKey, Body: apiKey, }) diff --git a/socketserver/hub.go b/socketserver/hub.go index c034cc8..73b8ea8 100644 --- a/socketserver/hub.go +++ b/socketserver/hub.go @@ -337,6 +337,11 @@ func RunHub() { grouptasks.InstallGlobalPythonPackages(data.Conn.Locals("userId").(string)) break + case utils.ReceivedCmdSubscribeToTopic: + log.Info().Msgf("Received subscribe to topic: %v", receivedMessage.Body) + + cache.SubscribeSocketClientToTopic(data.Conn.Locals("sessionId").(string), receivedMessage.Body["topic"].(string)) + break default: log.Error().Msgf("Received unknown message: %v", receivedMessage) break