package robot import ( "encoding/json" "fmt" "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/database" "jannex/robot-control-manager/modules/logger" "jannex/robot-control-manager/modules/request" "jannex/robot-control-manager/modules/structs" "jannex/robot-control-manager/modules/utils" "time" "git.ex.umbach.dev/Alex/roese-utils/rslogger" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" ) func RobotPingHandler() { ticker := time.NewTicker(utils.RobotPingHandlerInterval * time.Second) for range ticker.C { for _, robot := range cache.GetRobots() { if robot.Status == utils.RobotStatusOffline { continue } err := request.Request(fiber.MethodGet, robot.Address+":5000/api/v1/ping", nil) if err != nil { logger.AddSystemLog(rslogger.LogTypeWarning, "Robot ping request failed: %v", err) logger.AddRobotLog(rslogger.LogTypeWarning, robot.Type, robot.Id, "Robot ping request failed: %v", err) if robot.PingRetries == utils.RobotPingRetries { UpdateRobotStatus(robot, utils.RobotStatusOffline) logger.AddSystemLog(rslogger.LogTypeWarning, "Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries) logger.AddRobotLog(rslogger.LogTypeWarning, robot.Type, robot.Id, "Robot marked as offline because %v attempts have already been made to reach it", utils.RobotPingRetries) continue } robot.PingRetries++ logger.AddSystemLog(rslogger.LogTypeWarning, "Trying to ping robot %s for %v times failed", robot.Name, robot.PingRetries) logger.AddRobotLog(rslogger.LogTypeWarning, robot.Type, robot.Id, "Trying to ping robot for %v times failed", robot.PingRetries) if robot.Status != utils.RobotStatusConnecting { UpdateRobotStatus(robot, utils.RobotStatusConnecting) } continue } if robot.Status == utils.RobotStatusConnecting { if robot.CurrentJobId != "" { UpdateRobotStatus(robot, utils.RobotStatusWaiting) } else { UpdateRobotStatus(robot, utils.RobotStatusIdle) } robot.ConnectedAt = time.Now() BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdRobotUpdated, Body: struct { RobotId string ConnectedAt time.Time }{ RobotId: robot.Id, ConnectedAt: robot.ConnectedAt, }, }) logger.AddRobotLog(rslogger.LogTypeInfo, robot.Type, robot.Id, "Robot connection established by ping") } if robot.PingRetries != 0 { robot.PingRetries = 0 } } } } // This function will free up the job of a robot if the last task // of a job is older than the given duration (job not finished by requester) func RobotFreeUpJobHandler() { ticker := time.NewTicker(utils.RobotFreeUpJobInterval) for range ticker.C { for _, robot := range cache.GetRobots() { if robot.Status == utils.RobotStatusOffline || robot.CurrentJobId == "" || robot.LastTaskAt.IsZero() || time.Since(robot.LastTaskAt) < utils.RobotFreeUpJobInterval || robot.Status == utils.RobotStatusError && robot.Status != utils.RobotStatusWaiting { continue } logger.AddSystemLog(rslogger.LogTypeError, "Robot %s job with id %s and name %s freed up because it was not correctly finished", robot.Name, robot.CurrentJobId, robot.CurrentJobName) logger.AddRobotLog(rslogger.LogTypeError, robot.Type, robot.Id, "Job with id %s and name %s freed up because it was not correctly finished", robot.CurrentJobId, robot.CurrentJobName) FreeUpJob(robot, utils.RobotStatusError) } } } func FreeUpJob(robot *structs.Robot, newStatus uint8) { UpdateRobotCurrentJob(robot, "", "") UpdateRobotStatus(robot, newStatus) CountDownJobsWaiting(robot) } func LoadRobotsFromDatabase() { var robots []structs.Robot database.DB.Find(&robots) cache.AddRobots(robots) for _, robot := range cache.GetRobots() { if robot.Status == utils.RobotStatusOffline { continue } UpdateRobotStatus(robot, utils.RobotStatusConnecting) } } func UpdateRobotStatus(robot *structs.Robot, status uint8) { robot.Status = status database.DB.Model(&structs.Robot{}). Where("id = ?", robot.Id). Update("status", status) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotStatus, Body: struct { RobotId string Status uint8 }{ RobotId: robot.Id, Status: status, }, }) logger.AddRobotLog(rslogger.LogTypeInfo, robot.Type, robot.Id, "Robot status updated to %s", utils.GetRobotStatusString(status)) } func ProcessJobTask(r *structs.Robot, jobId string, jobName string) { if r.CurrentJobId == "" { UpdateRobotCurrentJob(r, jobId, jobName) } if r.CurrentJobId != "" && r.CurrentJobId != jobId { CountUpJobsWaiting(r, jobId, jobName) } for r.CurrentJobId != "" && r.CurrentJobId != jobId && r.Status != utils.RobotStatusWaiting && r.Status != utils.RobotStatusError { // wait for current job to finish if r.Status == utils.RobotStatusOffline { logger.AddRobotLog(rslogger.LogTypeError, r.Type, r.Id, "Job with id %s and name %s canceled", jobId, jobName) return } fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount) time.Sleep(2 * time.Second) } RemoveJobFromJobsWaitingList(r, jobId) UpdateRobotCurrentJob(r, jobId, jobName) fmt.Println("processing job", jobId) } func UpdateRobotCurrentJob(r *structs.Robot, jobId string, jobName string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() /* if r.CurrentJobId == jobId { log.Info().Msgf("Robot %s current job id already set to %s", r.Name, jobId) return } */ r.CurrentJobId = jobId r.CurrentJobName = jobName database.DB.Model(&structs.Robot{}). Where("id = ?", r.Id). Updates(map[string]interface{}{"current_job_id": jobId, "current_job_name": jobName}) log.Info().Msgf("Robot %s current job id updated to %s", r.Name, jobId) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotCurrentJob, Body: struct { RobotId string JobName string JobsWaitingNameList []string }{ RobotId: r.Id, JobName: jobName, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func CountUpJobsWaiting(r *structs.Robot, jobId string, jobName string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() r.JobsWaitingCount++ r.JobsWaitingNameList = append(r.JobsWaitingNameList, structs.JobWaitingName{ JobId: jobId, Name: jobName, }) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, Body: struct { RobotId string JobsWaitingCount int JobsWaitingNameList []string }{ RobotId: r.Id, JobsWaitingCount: r.JobsWaitingCount, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func RemoveJobFromJobsWaitingList(r *structs.Robot, jobId string) { for i, j := range r.JobsWaitingNameList { if j.JobId == jobId { r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...) break } } } func CountDownJobsWaiting(r *structs.Robot) { r.JobMutex.Lock() defer r.JobMutex.Unlock() if r.JobsWaitingCount > 0 { r.JobsWaitingCount-- } BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, Body: struct { RobotId string JobsWaitingCount int JobsWaitingNameList []string }{ RobotId: r.Id, JobsWaitingCount: r.JobsWaitingCount, JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), }, }) } func BroadcastSSEMessage(message structs.SSEMessage) { marshaledMessage, err := json.Marshal(message) if err != nil { log.Error().Msgf("Error marshaling SSE message: %v", err) return } for clientId, sseClient := range cache.GetSSEClients() { sseClient.MessageChannel <- structs.SSEClientChannelMessage{ ClientId: clientId, Message: marshaledMessage, } } } var PermitJoinTimer *time.Timer func PermitJoinAutoDisableHandler() { if PermitJoinTimer != nil { PermitJoinTimer.Stop() } PermitJoinTimer = time.NewTimer(utils.RobotsPermitJoinAutoDisable * time.Second) <-PermitJoinTimer.C cache.SetPermitJoin(false) BroadcastSSEMessage(structs.SSEMessage{ Cmd: utils.SSESentCmdPermitJoinUpdated, Body: 0, }) logger.AddSystemLog(rslogger.LogTypeInfo, "Permit join disabled by timer") }