package socketserver import ( "encoding/json" "fmt" "janex/admin-dashboard-backend/grouptasks" "janex/admin-dashboard-backend/modules/cache" "janex/admin-dashboard-backend/modules/database" "janex/admin-dashboard-backend/modules/structs" "janex/admin-dashboard-backend/modules/utils" "janex/admin-dashboard-backend/socketclients" "time" "github.com/gofiber/websocket/v2" "github.com/google/uuid" "github.com/rs/zerolog/log" ) var register = make(chan *structs.SocketClient) var broadcast = make(chan structs.SocketMessage) var unregister = make(chan *websocket.Conn) func RunHub() { for { select { case newSocketClient := <-register: userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId")) sessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("sessionId")) // close connection instantly if sessionId is empty if sessionId == "" { newSocketClient.SendUnauthorizedCloseMessage() continue } newSocketClient.SessionId = sessionId newSocketClient.UserId = userId cache.AddSocketClient(sessionId, newSocketClient) log.Debug().Msgf("clients: %d", len(cache.GetSocketClients())) log.Debug().Msgf("REGISTER CLIENT: %s", sessionId) var user structs.User database.DB.First(&user, "id = ?", userId) newSocketClient.SendMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdInitUserSocketConnection, Body: structs.InitUserSocketConnection{ User: structs.UserData{ Username: user.Username, Email: user.Email, Sessions: socketclients.GetUserSessions(userId), }, CategoryGroups: cache.GetCategoryGroupsSorted(), GroupTasks: grouptasks.GetAllGroupTasks(), GroupTasksSteps: grouptasks.GetAllGroupTasksSteps(), }, }) socketclients.UpdateConnectedUsers() socketclients.UpdateUserSessionsForUser(userId, sessionId) case data := <-broadcast: var receivedMessage structs.ReceivedMessage if err := json.Unmarshal(data.Msg, &receivedMessage); err != nil { log.Error().Msgf("Failed to unmarshal received msg, err: %s", err) continue } log.Info().Msgf("Received message: %s", receivedMessage, receivedMessage.Cmd) switch receivedMessage.Cmd { case utils.ReceivedCmdStartGroupTasks: log.Debug().Msgf("received start group tasks %s", receivedMessage.Body) category := receivedMessage.Body["category"].(string) groupId := receivedMessage.Body["id"].(string) globalInputsJsonString := utils.MarshalJson(receivedMessage.Body["globalInputs"]) groupTaskId := uuid.New().String() groupTasks := &structs.GroupTasks{ Id: groupTaskId, Category: category, GroupId: groupId, GroupName: receivedMessage.Body["groupName"].(string), CurrentTasksStep: 1, NumberOfSteps: uint8(receivedMessage.Body["numberOfSteps"].(float64)), Status: structs.GroupTasksStatusRunning, GlobalInputs: globalInputsJsonString, StartedAt: time.Now(), RememberId: receivedMessage.Body["rememberId"].(string), } database.DB.Create(groupTasks) socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdNewGroupTaskStarted, Body: groupTasks, }) go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{ StartType: grouptasks.RunGroupTaskStartTypeNormal, GroupTaskId: groupTaskId, Category: category, GroupId: groupId, Step: 1, TaskStepId: "", GlobalInputs: globalInputsJsonString, }) break case utils.ReceivedCmdTaskFailedTryAgainRunTaskStep: go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{ StartType: grouptasks.RunGroupTaskStartTypeTryAgain, GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Category: receivedMessage.Body["category"].(string), GroupId: receivedMessage.Body["groupId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), TaskStepId: receivedMessage.Body["taskStepId"].(string), }) break case utils.ReceivedCmdTaskContinueTaskStep: go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{ StartType: grouptasks.RunGroupTaskStartTypeTryAgain, GroupTaskId: receivedMessage.Body["groupTaskId"].(string), Category: receivedMessage.Body["category"].(string), GroupId: receivedMessage.Body["groupId"].(string), Step: uint8(receivedMessage.Body["step"].(float64)), TaskStepId: receivedMessage.Body["taskStepId"].(string), TaskInputs: utils.MarshalJson(receivedMessage.Body["taskInputs"]), }) break case utils.ReceivedCmdReloadGroupTasks: category := receivedMessage.Body["category"].(string) socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdReloadingGroupTasks, Body: category, }) grouptasks.LoadGroups(category) break default: log.Error().Msgf("Received unknown message: %s", receivedMessage) break } case connection := <-unregister: cache.DeleteClientByConn(connection) socketclients.UpdateUserSessionsForUser(connection.Locals("userId").(string), connection.Locals("sessionId").(string)) socketclients.UpdateConnectedUsers() } } }