166 lines
5.3 KiB
Go
166 lines
5.3 KiB
Go
package socketserver
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"janex/admin-dashboard-backend/grouptasks"
|
|
"janex/admin-dashboard-backend/modules/cache"
|
|
"janex/admin-dashboard-backend/modules/database"
|
|
"janex/admin-dashboard-backend/modules/structs"
|
|
"janex/admin-dashboard-backend/modules/utils"
|
|
"janex/admin-dashboard-backend/socketclients"
|
|
"time"
|
|
|
|
"github.com/gofiber/websocket/v2"
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
var register = make(chan *structs.SocketClient)
|
|
var broadcast = make(chan structs.SocketMessage)
|
|
var unregister = make(chan *websocket.Conn)
|
|
|
|
func RunHub() {
|
|
for {
|
|
select {
|
|
case newSocketClient := <-register:
|
|
userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId"))
|
|
sessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("sessionId"))
|
|
|
|
// close connection instantly if sessionId is empty
|
|
if sessionId == "<nil>" {
|
|
newSocketClient.SendUnauthorizedCloseMessage()
|
|
continue
|
|
}
|
|
|
|
newSocketClient.SessionId = sessionId
|
|
newSocketClient.UserId = userId
|
|
|
|
cache.AddSocketClient(sessionId, newSocketClient)
|
|
|
|
log.Debug().Msgf("clients: %d", len(cache.GetSocketClients()))
|
|
log.Debug().Msgf("REGISTER CLIENT: %s", sessionId)
|
|
|
|
var user structs.User
|
|
|
|
database.DB.First(&user, "id = ?", userId)
|
|
|
|
newSocketClient.SendMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdInitUserSocketConnection,
|
|
Body: structs.InitUserSocketConnection{
|
|
User: structs.UserData{
|
|
Id: user.Id,
|
|
Username: user.Username,
|
|
Email: user.Email,
|
|
Sessions: socketclients.GetUserSessions(userId),
|
|
},
|
|
CategoryGroups: cache.GetCategoryGroupsSorted(),
|
|
GroupTasks: grouptasks.GetAllGroupTasks(),
|
|
GroupTasksSteps: grouptasks.GetAllGroupTasksSteps(),
|
|
AllUsers: socketclients.GetAllUsers(),
|
|
},
|
|
})
|
|
|
|
socketclients.UpdateConnectedUsers(userId)
|
|
socketclients.UpdateUserSessionsForUser(userId, sessionId)
|
|
|
|
case data := <-broadcast:
|
|
var receivedMessage structs.ReceivedMessage
|
|
|
|
if err := json.Unmarshal(data.Msg, &receivedMessage); err != nil {
|
|
log.Error().Msgf("Failed to unmarshal received msg, err: %s", err)
|
|
continue
|
|
}
|
|
|
|
log.Info().Msgf("Received message: %s", receivedMessage, receivedMessage.Cmd)
|
|
|
|
switch receivedMessage.Cmd {
|
|
case utils.ReceivedCmdStartGroupTasks:
|
|
log.Debug().Msgf("received start group tasks %s", receivedMessage.Body)
|
|
|
|
category := receivedMessage.Body["category"].(string)
|
|
groupId := receivedMessage.Body["id"].(string)
|
|
|
|
globalInputsJsonString := utils.MarshalJson(receivedMessage.Body["globalInputs"])
|
|
|
|
groupTaskId := uuid.New().String()
|
|
|
|
groupTasks := &structs.GroupTasks{
|
|
Id: groupTaskId,
|
|
Category: category,
|
|
GroupId: groupId,
|
|
GroupName: receivedMessage.Body["groupName"].(string),
|
|
CurrentTasksStep: 1,
|
|
NumberOfSteps: uint8(receivedMessage.Body["numberOfSteps"].(float64)),
|
|
Status: structs.GroupTasksStatusRunning,
|
|
GlobalInputs: globalInputsJsonString,
|
|
StartedAt: time.Now(),
|
|
RememberId: receivedMessage.Body["rememberId"].(string),
|
|
}
|
|
|
|
database.DB.Create(groupTasks)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdNewGroupTaskStarted,
|
|
Body: groupTasks,
|
|
})
|
|
|
|
go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{
|
|
StartType: grouptasks.RunGroupTaskStartTypeNormal,
|
|
GroupTaskId: groupTaskId,
|
|
Category: category,
|
|
GroupId: groupId,
|
|
Step: 1,
|
|
TaskStepId: "",
|
|
GlobalInputs: globalInputsJsonString,
|
|
})
|
|
break
|
|
case utils.ReceivedCmdTaskFailedTryAgainRunTaskStep:
|
|
go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{
|
|
StartType: grouptasks.RunGroupTaskStartTypeTryAgain,
|
|
GroupTaskId: receivedMessage.Body["groupTaskId"].(string),
|
|
Category: receivedMessage.Body["category"].(string),
|
|
GroupId: receivedMessage.Body["groupId"].(string),
|
|
Step: uint8(receivedMessage.Body["step"].(float64)),
|
|
TaskStepId: receivedMessage.Body["taskStepId"].(string),
|
|
})
|
|
break
|
|
case utils.ReceivedCmdTaskContinueTaskStep:
|
|
go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{
|
|
StartType: grouptasks.RunGroupTaskStartTypeTryAgain,
|
|
GroupTaskId: receivedMessage.Body["groupTaskId"].(string),
|
|
Category: receivedMessage.Body["category"].(string),
|
|
GroupId: receivedMessage.Body["groupId"].(string),
|
|
Step: uint8(receivedMessage.Body["step"].(float64)),
|
|
TaskStepId: receivedMessage.Body["taskStepId"].(string),
|
|
TaskInputs: utils.MarshalJson(receivedMessage.Body["taskInputs"]),
|
|
})
|
|
|
|
break
|
|
case utils.ReceivedCmdReloadGroupTasks:
|
|
category := receivedMessage.Body["category"].(string)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdReloadingGroupTasks,
|
|
Body: category,
|
|
})
|
|
|
|
grouptasks.LoadGroups(category)
|
|
|
|
break
|
|
|
|
default:
|
|
log.Error().Msgf("Received unknown message: %s", receivedMessage)
|
|
break
|
|
}
|
|
|
|
case connection := <-unregister:
|
|
cache.DeleteClientByConn(connection)
|
|
|
|
socketclients.UpdateUserSessionsForUser(connection.Locals("userId").(string), connection.Locals("sessionId").(string))
|
|
|
|
socketclients.UpdateConnectedUsers(connection.Locals("userId").(string))
|
|
}
|
|
}
|
|
}
|