package robot import ( "encoding/json" "fmt" "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/database" "jannex/robot-control-manager/modules/logger" "jannex/robot-control-manager/modules/request" "jannex/robot-control-manager/modules/structs" "jannex/robot-control-manager/modules/utils" "time" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" ) func RobotPingHandler() { ticker := time.NewTicker(utils.RobotPingHandlerInterval * time.Second) for range ticker.C { for _, robot := range cache.GetRobots() { if robot.Status == utils.RobotStatusOffline { continue } err := request.Request(fiber.MethodGet, robot.Address+":5000/api/v1/ping", nil) if err != nil { logger.AddSystemLog("Robot ping request failed: %v", err) if robot.PingRetries == utils.RobotPingRetries { UpdateRobotStatus(robot, utils.RobotStatusOffline) logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries) continue } robot.PingRetries++ logger.AddSystemLog("Trying to ping robot %s for %v times failed", robot.Name, robot.PingRetries) if robot.Status != utils.RobotStatusConnecting { UpdateRobotStatus(robot, utils.RobotStatusConnecting) } continue } if robot.Status == utils.RobotStatusConnecting { UpdateRobotStatus(robot, utils.RobotStatusIdle) } if robot.PingRetries != 0 { robot.PingRetries = 0 } } } } func LoadRobotsFromDatabase() { var robots []structs.Robot database.DB.Find(&robots) cache.AddRobots(robots) for _, robot := range cache.GetRobots() { if robot.Status == utils.RobotStatusOffline { continue } UpdateRobotStatus(robot, utils.RobotStatusConnecting) } } func UpdateRobotStatus(robot *structs.Robot, status uint8) { robot.Status = status database.DB.Model(&structs.Robot{}). Where("id = ?", robot.Id). Update("status", status) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotStatus, Body: struct { RobotId string Status uint8 }{ RobotId: robot.Id, Status: status, }, }) } func ProcessJobTask(r *structs.Robot, jobId string, jobName string) { if r.CurrentJobId == "" { UpdateRobotCurrentJob(r, jobId, jobName) } if r.CurrentJobId != "" && r.CurrentJobId != jobId { CountUpJobsWaiting(r, jobId, jobName) } for r.CurrentJobId != "" && r.CurrentJobId != jobId { // wait for current job to finish if r.Status == utils.RobotStatusOffline { log.Info().Msgf("Robot %s offline. Job with id %s and name %s canceled", r.Name, jobId, jobName) logger.AddRobotLog(r.Type, r.Id, "Job with id %s and name %s canceled", jobId, jobName) return } fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount) time.Sleep(2 * time.Second) } RemoveJobFromJobsWaitingList(r, jobId) UpdateRobotCurrentJob(r, jobId, jobName) fmt.Println("processing job", jobId) } func UpdateRobotCurrentJob(r *structs.Robot, jobId string, jobName string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() if r.CurrentJobId == jobId { return } r.CurrentJobId = jobId r.CurrentJobName = jobName database.DB.Model(&structs.Robot{}). Where("id = ?", r.Id). Updates(map[string]interface{}{"current_job_id": jobId, "current_job_name": jobName}) log.Info().Msgf("Robot %s current job id updated to %s", r.Name, jobId) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotCurrentJob, Body: struct { RobotId string JobName string JobsWaitingNameList []string }{ RobotId: r.Id, JobName: jobName, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func CountUpJobsWaiting(r *structs.Robot, jobId string, jobName string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() r.JobsWaitingCount++ r.JobsWaitingNameList = append(r.JobsWaitingNameList, structs.JobWaitingName{ JobId: jobId, Name: jobName, }) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, Body: struct { RobotId string JobsWaitingCount int JobsWaitingNameList []string }{ RobotId: r.Id, JobsWaitingCount: r.JobsWaitingCount, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func RemoveJobFromJobsWaitingList(r *structs.Robot, jobId string) { for i, j := range r.JobsWaitingNameList { if j.JobId == jobId { r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...) break } } } func CountDownJobsWaiting(r *structs.Robot, jobId string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() if r.JobsWaitingCount > 0 { r.JobsWaitingCount-- } BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, Body: struct { RobotId string JobsWaitingCount int JobsWaitingNameList []string }{ RobotId: r.Id, JobsWaitingCount: r.JobsWaitingCount, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func BroadcastSSEMessage(message structs.SSEMessage) { marshaledMessage, err := json.Marshal(message) if err != nil { log.Error().Msgf("Error marshaling SSE message: %v", err) return } for clientId, sseClient := range cache.GetSSEClients() { sseClient.MessageChannel <- structs.SSEClientChannelMessage{ ClientId: clientId, Message: marshaledMessage, } } } var PermitJoinTimer *time.Timer func PermitJoinAutoDisableHandler() { if PermitJoinTimer != nil { PermitJoinTimer.Stop() } PermitJoinTimer = time.NewTimer(utils.RobotsPermitJoinAutoDisable * time.Second) <-PermitJoinTimer.C cache.SetPermitJoin(false) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdPermitJoinUpdated, Body: 0, }) logger.AddSystemLog("Permit join disabled by timer") }