subscribe topic

main
alex 2023-09-01 23:27:09 +02:00
parent 6c033a0fb6
commit 564f1c82dc
5 changed files with 81 additions and 15 deletions

View File

@ -5,6 +5,7 @@ import (
"sync"
"github.com/gofiber/websocket/v2"
"github.com/rs/zerolog/log"
)
var socketClients []*structs.SocketClient
@ -39,3 +40,17 @@ func GetSocketClients() []*structs.SocketClient {
return socketClients
}
func SubscribeSocketClientToTopic(sessionId string, topic string) {
mu.Lock()
defer mu.Unlock()
log.Info().Msgf("Subscribing session %s to topic %s", sessionId, topic)
for _, socketClient := range socketClients {
if socketClient.SessionId == sessionId {
socketClient.SubscribedTopic = topic
break
}
}
}

View File

@ -17,10 +17,11 @@ const (
)
type SocketClient struct {
SessionId string
UserId string
Conn *websocket.Conn
connMu sync.Mutex
SessionId string
UserId string
Conn *websocket.Conn
connMu sync.Mutex
SubscribedTopic string
}
type SocketMessage struct {

View File

@ -84,6 +84,7 @@ const (
SentCmdInstallingGlobalPythonPackages = 36
SentCmdInstallingGlobalPythonPackagesFailed = 37
SentCmdInstallingGlobalPythonPackagesFinished = 38
SentCmdUpdateUsers = 39
)
// commands received from web clients
@ -110,6 +111,7 @@ const (
ReceivedCmdDeleteUserApiKey = 20
ReceivedCmdGroupTasksInstallPythonPackages = 21
ReceivedCmdGroupTasksInstallGlobalPythonPackages = 22
ReceivedCmdSubscribeToTopic = 23
)
const (
@ -201,3 +203,16 @@ var DynamicGroupTasksPermissions = []string{
PermissionGroupTasksOverviewXYInstallPythonPackages,
PermissionGroupTasksOverviewXYView,
}
// This represents the paths registered on the api.
// Available paths which are used for the websocket topic subscription
// to only send messages to clients who are subscribed to the topic
const (
// this is path is dynamic because there are more params after the /v1/grouptasks/
SubscribedDynamicTopicGroupTasks = "/grouptasks/"
// these paths are static - there are no params after the path
SubscribedTopicAdminAreaRoles = "/adminarea/roles"
SubscribedTopicUsers = "/users"
SubscribedTopicUserProfile = "/user-profile"
)

View File

@ -19,6 +19,14 @@ import (
"gorm.io/gorm"
)
func BroadcastMessageToTopic(topic string, sendSocketMessage structs.SendSocketMessage) {
for _, client := range cache.GetSocketClients() {
if client.SubscribedTopic == topic {
client.SendMessage(sendSocketMessage)
}
}
}
func BroadcastMessage(sendSocketMessage structs.SendSocketMessage) {
for _, client := range cache.GetSocketClients() {
client.SendMessage(sendSocketMessage)
@ -69,6 +77,15 @@ func SendMessageToUser(userId string, ignoreUserSessionId string, sendSocketMess
}
}
func SendMessageToUserWithTopic(userId string, topic string, sendSocketMessage structs.SendSocketMessage) {
for _, client := range cache.GetSocketClients() {
log.Info().Msgf("Client: %v", client.SubscribedTopic)
if client.UserId == userId && client.SubscribedTopic == topic {
client.SendMessage(sendSocketMessage)
}
}
}
func SendMessageOnlyToSessionId(sessionId string, sendSocketMessage structs.SendSocketMessage) {
for _, client := range cache.GetSocketClients() {
if client.SessionId == sessionId {
@ -117,17 +134,20 @@ func UpdateConnectedUsers(userId string) {
database.DB.First(&user, "id = ?", userId)
BroadcastMessage(structs.SendSocketMessage{
Cmd: utils.SentCmdUpdateConnectedUsers,
Cmd: utils.SentCmdUpdateConnectedUsers,
Body: len(cache.GetSocketClients()),
})
BroadcastMessageToTopic(utils.SubscribedTopicUsers, structs.SendSocketMessage{
Cmd: utils.SentCmdUpdateUsers,
Body: struct {
WebSocketUsersCount int
UserId string
ConnectionStatus uint8
LastOnline time.Time
UserId string
ConnectionStatus uint8
LastOnline time.Time
}{
WebSocketUsersCount: len(cache.GetSocketClients()),
UserId: userId,
ConnectionStatus: isUserGenerallyConnected(userId),
LastOnline: user.LastOnline,
UserId: userId,
ConnectionStatus: isUserGenerallyConnected(userId),
LastOnline: user.LastOnline,
},
})
}
@ -982,8 +1002,13 @@ func CreateNewUserApiKey(userId string, apiName string) {
}
database.DB.Create(&newApiKey)
/*
SendMessageToUser(userId, "", structs.SendSocketMessage{
Cmd: utils.SentCmdNewUserApiKeyCreated,
Body: newApiKey,
}) */
SendMessageToUser(userId, "", structs.SendSocketMessage{
SendMessageToUserWithTopic(userId, utils.SubscribedTopicUserProfile, structs.SendSocketMessage{
Cmd: utils.SentCmdNewUserApiKeyCreated,
Body: newApiKey,
})
@ -999,8 +1024,13 @@ func CreateNewUserApiKey(userId string, apiName string) {
func DeleteUserApiKey(userId string, apiKey string) {
database.DB.Where("id = ?", apiKey).Where("user_id = ?", userId).Delete(&structs.UserApiKey{})
/*
SendMessageToUser(userId, "", structs.SendSocketMessage{
Cmd: utils.SentCmdDeletedUserApiKey,
Body: apiKey,
}) */
SendMessageToUser(userId, "", structs.SendSocketMessage{
SendMessageToUserWithTopic(userId, utils.SubscribedTopicUserProfile, structs.SendSocketMessage{
Cmd: utils.SentCmdDeletedUserApiKey,
Body: apiKey,
})

View File

@ -337,6 +337,11 @@ func RunHub() {
grouptasks.InstallGlobalPythonPackages(data.Conn.Locals("userId").(string))
break
case utils.ReceivedCmdSubscribeToTopic:
log.Info().Msgf("Received subscribe to topic: %v", receivedMessage.Body)
cache.SubscribeSocketClientToTopic(data.Conn.Locals("sessionId").(string), receivedMessage.Body["topic"].(string))
break
default:
log.Error().Msgf("Received unknown message: %v", receivedMessage)
break