package socketserver import ( "encoding/json" "fmt" "jannex/admin-dashboard-backend/modules/cache" "jannex/admin-dashboard-backend/modules/database" "jannex/admin-dashboard-backend/modules/grouptasks" "jannex/admin-dashboard-backend/modules/logger" "jannex/admin-dashboard-backend/modules/notification" "jannex/admin-dashboard-backend/modules/structs" "jannex/admin-dashboard-backend/modules/systempermissions" "jannex/admin-dashboard-backend/modules/utils" "jannex/admin-dashboard-backend/socketclients" "time" "git.ex.umbach.dev/Alex/roese-utils/rslogger" "git.ex.umbach.dev/Alex/roese-utils/rsutils" "github.com/gofiber/websocket/v2" "github.com/google/uuid" "github.com/rs/zerolog/log" ) var register = make(chan *structs.SocketClient) var broadcast = make(chan structs.SocketMessage) var unregister = make(chan *websocket.Conn) func RunHub() { for { select { case newSocketClient := <-register: userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId")) browserTabSession := fmt.Sprintf("%v", newSocketClient.Conn.Locals("browserTabSession")) sessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("sessionId")) // close connection instantly if sessionId is empty if sessionId == "" { newSocketClient.SendUnauthorizedCloseMessage() continue } newSocketClient.SessionId = sessionId newSocketClient.BrowserTabSession = browserTabSession newSocketClient.UserId = userId cache.AddSocketClient(newSocketClient) // check that user session is not expired var userSession structs.UserSession database.DB.First(&userSession, "id = ?", sessionId) if userSession.Id != "" && time.Now().After(userSession.ExpiresAt) { newSocketClient.SendUnauthorizedCloseMessage() database.DB.Delete(&structs.UserSession{}, "id = ?", sessionId) continue } // update session last used time database.DB.Model(&structs.UserSession{}).Where("id = ?", sessionId).Updates(structs.UserSession{ LastUsed: time.Now(), ExpiresAt: utils.GetSessionExpiresAtTime(), }) socketclients.UpdateConnectedUsers(userId) socketclients.UpdateUserSessionsForUser(userId, sessionId) logger.AddSystemLog(rslogger.LogTypeInfo, "User %v has come online", userId) case data := <-broadcast: var receivedMessage structs.ReceivedMessage if err := json.Unmarshal(data.Msg, &receivedMessage); err != nil { log.Error().Msgf("Failed to unmarshal received msg, err: %s", err) continue } log.Debug().Msgf("Received message: %v %v", receivedMessage, receivedMessage.Cmd) switch receivedMessage.Cmd { case utils.ReceivedCmdStartGroupTasks: category := receivedMessage.Body["category"].(string) if !socketclients.HasXYPermission(data.Conn.Locals("userId").(string), utils.PermissionGroupTasksOverviewXYNewTask, category) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } groupId := receivedMessage.Body["id"].(string) globalInputsJsonString := rsutils.MarshalJson(receivedMessage.Body["globalInputs"]) log.Debug().Msgf("globalInputsJsonString %v", globalInputsJsonString) grouptasks.StartGroupTask(data.Conn.Locals("userId").(string), structs.GroupTasks{ Category: category, GroupId: groupId, GroupName: receivedMessage.Body["groupName"].(string), Description: receivedMessage.Body["description"].(string), NumberOfSteps: uint8(receivedMessage.Body["numberOfSteps"].(float64)), GlobalInputs: globalInputsJsonString, RememberId: receivedMessage.Body["rememberId"].(string), }) break case utils.ReceivedCmdTaskFailedTryAgainRunTaskStep: groupTaskArgs := grouptasks.RunGroupTaskArgs{ CreatorUserId: data.Conn.Locals("userId").(string), StartType: grouptasks.RunGroupTaskStartTypeTryAgain, GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Category: receivedMessage.Body["category"].(string), GroupId: receivedMessage.Body["groupId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), TaskStepId: receivedMessage.Body["taskStepId"].(string), } go grouptasks.RunGroupTask(groupTaskArgs) logger.AddGroupTasksLog(rslogger.LogTypeInfo, "Step %s of groupTaskId %s has failed and was started by user %s to try again", groupTaskArgs.TaskStepId, groupTaskArgs.GroupTaskId, groupTaskArgs.CreatorUserId) break case utils.ReceivedCmdTaskContinueTaskStep: groupTaskArgs := grouptasks.RunGroupTaskArgs{ CreatorUserId: data.Conn.Locals("userId").(string), StartType: grouptasks.RunGroupTaskStartTypeTryAgain, GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Category: receivedMessage.Body["category"].(string), GroupId: receivedMessage.Body["groupId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), TaskStepId: receivedMessage.Body["taskStepId"].(string), TaskInputs: rsutils.MarshalJson(receivedMessage.Body["taskInputs"]), } go grouptasks.RunGroupTask(groupTaskArgs) logger.AddGroupTasksLog(rslogger.LogTypeInfo, "Step %s of groupTaskId %s was continued by user %s", groupTaskArgs.TaskStepId, groupTaskArgs.GroupTaskId, groupTaskArgs.CreatorUserId) break case utils.ReceivedCmdReloadGroupTasks: category := receivedMessage.Body["category"].(string) if !socketclients.HasXYPermission(data.Conn.Locals("userId").(string), utils.PermissionGroupTasksOverviewXYReloadGroupConfig, category) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.BroadcastMessageToUsersWithPermissionAndTopic( systempermissions.ConvertXYPermission(utils.PermissionGroupTasksOverviewXYView, category), utils.SubscribedDynamicTopicGroupTasks+category, structs.SendSocketMessage{ Cmd: utils.SentCmdReloadingGroupTasks, Body: category, }) grouptasks.ReloadCategoryGroups(category) logger.AddGroupTasksLog(rslogger.LogTypeInfo, "User %s has reloaded group tasks config of category %s", data.Conn.Locals("userId").(string), category) break case utils.ReceivedCmdTaskLocking: log.Info().Msgf("body %v", receivedMessage.Body) cache.AddLockedGroupTaskStep(structs.LockedGroupTaskSteps{ LockedByUserId: receivedMessage.Body["lockedByUserId"].(string), GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), LockedAt: time.Now(), RememberId: receivedMessage.Body["rememberId"].(string), Category: receivedMessage.Body["category"].(string), }) socketclients.BroadcastMessageToTopic( utils.SubscribedDynamicTopicGroupTasks+receivedMessage.Body["category"].(string), structs.SendSocketMessage{ Cmd: utils.SentCmdTaskLocked, Body: receivedMessage.Body, }) cache.AddGroupTaskStepsInput(structs.GroupTaskStepsInput{ GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), ParameterName: receivedMessage.Body["parameterName"].(string), Value: receivedMessage.Body["value"], }) break case utils.ReceivedCmdUpdateUserProfile: socketclients.UpdateUserProfile(data.Conn, receivedMessage.Body["changes"].(map[string]interface{})) break case utils.ReceivedCmdAdminAreaCreateNewRole: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaCreateNewRole) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } role := structs.Role{ Id: uuid.New().String(), DisplayName: "New Role " + uuid.New().String(), Description: "Role description", CreatedAt: time.Now(), SortingOrder: systempermissions.GetRoleSortingOrder(), } database.DB.Create(&role) socketclients.BroadcastMessageToTopics( []string{utils.SubscribedTopicAdminAreaRoles, utils.SubscribedTopicUsers}, structs.SendSocketMessage{ Cmd: utils.SentCmdAdminAreaNewRoleCreated, Body: role, }) logger.AddSystemLog(rslogger.LogTypeInfo, "User %s has created the role %s", data.Conn.Locals("userId").(string), role.Id) break case utils.ReceivedCmdAdminAreaUpdateRole: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaUpdateRole) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AdminAreaUpdateRole(data.Conn, receivedMessage.Body) break case utils.ReceivedCmdAdminAreaUpdateRoleSortingOrder: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaMoveRoleUpDown) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AdminAreaMoveRoleToSortingOrder(data.Conn, receivedMessage.Body) break case utils.ReceivedCmdAdminAreaDeleteRole: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaDeleteRole) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AdminAreaDeleteRole(data.Conn, receivedMessage.Body) break case utils.ReceivedCmdAllUsersUpdateUserRole: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAllUsersActionChangeRole) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.UpdateUserRole(data.Conn, receivedMessage.Body["UserId"].(string), receivedMessage.Body["RoleId"].(string)) break case utils.ReceivedCmdAllUsersCreateNewUser: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAllUsersCreateNewUser) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AllUsersCreateNewUser(data.Conn, receivedMessage.Body) break case utils.ReceivedCmdAllUsersDeleteUser: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAllUsersActionDeleteUser) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AllUsersDeleteUser(data.Conn, receivedMessage.Body["UserId"].(string)) break case utils.ReceivedCmdAllUsersUserDeactivation: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAllUsersActionUserDeactivation) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } socketclients.AllUsersUserDeactivation(data.Conn, receivedMessage.Body["UserId"].(string), receivedMessage.Body["Deactivation"].(bool)) break case utils.ReceivedCmdScannerNewScan: log.Info().Msgf("Received new scan: %v", receivedMessage.Body) socketclients.SendMessageToUserExceptBrowserTabSession(data.Conn.Locals("userId").(string), data.Conn.Locals("browserTabSession").(string), structs.SendSocketMessage{ Cmd: utils.SentCmdScannerNewScan, Body: receivedMessage.Body["Scan"], }) break case utils.ReceivedCmdHandleUserActionTaskStep: if !socketclients.HasXYPermission(data.Conn.Locals("userId").(string), utils.PermissionGroupTasksOverviewXYReloadGroupConfig, receivedMessage.Body["category"].(string)) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } grouptasks.HandleUserActionTaskStep(data.Conn.Locals("userId").(string), receivedMessage.Body) break case utils.ReceivedCmdCreateNewUserApiKey: socketclients.CreateNewUserApiKey(data.Conn.Locals("userId").(string), receivedMessage.Body["Name"].(string)) break case utils.ReceivedCmdDeleteUserApiKey: socketclients.DeleteUserApiKey(data.Conn.Locals("userId").(string), receivedMessage.Body["Id"].(string)) break case utils.ReceivedCmdGroupTasksInstallPythonPackages: if !socketclients.HasXYPermission(data.Conn.Locals("userId").(string), utils.PermissionGroupTasksOverviewXYInstallPythonPackages, receivedMessage.Body["category"].(string)) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } grouptasks.InstallPythonPackages(data.Conn.Locals("userId").(string), receivedMessage.Body["category"].(string), receivedMessage.Body["groupId"].(string)) break case utils.ReceivedCmdGroupTasksInstallGlobalPythonPackages: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionGroupTasksInstallGlobalPythonPackages) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) break } grouptasks.InstallGlobalPythonPackages(data.Conn.Locals("userId").(string)) break case utils.ReceivedCmdSubscribeToTopic: log.Debug().Msgf("Received subscribe to topic: %v", receivedMessage.Body) cache.SubscribeSocketClientToTopic(receivedMessage.Body["browserTabSession"].(string), receivedMessage.Body["topic"].(string)) break case utils.ReceivedCmdDeleteAllNotifications: notification.DeleteAllNotifications(data.Conn.Locals("userId").(string)) break case utils.ReceivedCmdDeleteOneNotification: notification.DeleteOneNotification(data.Conn.Locals("userId").(string), receivedMessage.Body["notificationId"].(string)) break case utils.ReceivedCmdAdminAreaManageCheckWhichCategoriesAreAvailable: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaManageCheckWhichCategoriesAreAvailable) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) } if err := grouptasks.CheckWhichCategoriesAreAvailable(); err != nil { log.Error().Msgf("Check which categories are available failed: err %v", err) break } socketclients.HandleCheckWhichCategoriesAreAvailable() break case utils.ReceivedCmdAdminAreaManageAddLogManagerServerConnection: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaManageAddLogManagerServerConnection) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) } socketclients.AddLogManagerServerConnection(data.Conn.Locals("userId").(string), receivedMessage.Body["DisplayName"].(string), receivedMessage.Body["Address"].(string)) break case utils.ReceivedCmdAdminAreaManageDeleteLogManagerServerConnection: if !socketclients.HasPermission(data.Conn.Locals("userId").(string), utils.PermissionAdminAreaManageRemoveLogManagerServerConnection) { socketclients.SendErrorMessageNoPermissions(data.Conn.Locals("sessionId").(string)) } socketclients.DeleteLogManagerServerConnection(data.Conn.Locals("userId").(string), receivedMessage.Body["Id"].(string)) break default: log.Error().Msgf("Received unknown message: %v", receivedMessage) break } case connection := <-unregister: cache.DeleteClientByConn(connection) if connection.Locals("userId") != nil && connection.Locals("sessionId") != nil { userId := connection.Locals("userId").(string) sessionId := connection.Locals("sessionId").(string) database.DB.Model(&structs.User{}).Where("id = ?", userId).Updates(structs.User{ LastOnline: time.Now(), }) socketclients.UpdateUserSessionsForUser(userId, sessionId) socketclients.UpdateConnectedUsers(userId) logger.AddSystemLog(rslogger.LogTypeInfo, "User %s has gone offline", userId) } } } }