robot-control-manager/modules/robot/robot.go

239 lines
5.7 KiB
Go

package robot
import (
"encoding/json"
"fmt"
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/database"
"jannex/robot-control-manager/modules/logger"
"jannex/robot-control-manager/modules/request"
"jannex/robot-control-manager/modules/structs"
"jannex/robot-control-manager/modules/utils"
"time"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func RobotPingHandler() {
ticker := time.NewTicker(utils.RobotPingHandlerInterval * time.Second)
for range ticker.C {
for _, robot := range cache.GetRobots() {
if robot.Status == utils.RobotStatusOffline {
continue
}
err := request.Request(fiber.MethodGet, robot.Address+":5000/api/v1/ping", nil)
if err != nil {
logger.AddSystemLog("Robot ping request failed: %v", err)
if robot.PingRetries == utils.RobotPingRetries {
UpdateRobotStatus(robot, utils.RobotStatusOffline)
logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries)
continue
}
robot.PingRetries++
logger.AddSystemLog("Trying to ping robot %s for %v times failed", robot.Name, robot.PingRetries)
if robot.Status != utils.RobotStatusConnecting {
UpdateRobotStatus(robot, utils.RobotStatusConnecting)
}
continue
}
if robot.Status == utils.RobotStatusConnecting {
UpdateRobotStatus(robot, utils.RobotStatusIdle)
}
if robot.PingRetries != 0 {
robot.PingRetries = 0
}
}
}
}
func LoadRobotsFromDatabase() {
var robots []structs.Robot
database.DB.Find(&robots)
cache.AddRobots(robots)
for _, robot := range cache.GetRobots() {
if robot.Status == utils.RobotStatusOffline {
continue
}
UpdateRobotStatus(robot, utils.RobotStatusConnecting)
}
}
func UpdateRobotStatus(robot *structs.Robot, status uint8) {
robot.Status = status
database.DB.Model(&structs.Robot{}).
Where("id = ?", robot.Id).
Update("status", status)
BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdUpdateRobotStatus,
Body: struct {
RobotId string
Status uint8
}{
RobotId: robot.Id,
Status: status,
},
})
}
func ProcessJobTask(r *structs.Robot, jobId string, jobName string) {
if r.CurrentJobId == "" {
UpdateRobotCurrentJob(r, jobId, jobName)
}
if r.CurrentJobId != "" && r.CurrentJobId != jobId {
CountUpJobsWaiting(r, jobId, jobName)
}
for r.CurrentJobId != "" && r.CurrentJobId != jobId {
// wait for current job to finish
fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount)
time.Sleep(2 * time.Second)
}
RemoveJobFromJobsWaitingList(r, jobId)
UpdateRobotCurrentJob(r, jobId, jobName)
fmt.Println("processing job", jobId)
}
func UpdateRobotCurrentJob(r *structs.Robot, jobId string, jobName string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
if r.CurrentJobId == jobId {
return
}
r.CurrentJobId = jobId
r.CurrentJobName = jobName
database.DB.Model(&structs.Robot{}).
Where("id = ?", r.Id).
Updates(map[string]interface{}{"current_job_id": jobId, "current_job_name": jobName})
log.Info().Msgf("Robot %s current job id updated to %s", r.Name, jobId)
BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdUpdateRobotCurrentJob,
Body: struct {
RobotId string
JobName string
JobsWaitingNameList []string
}{
RobotId: r.Id,
JobName: jobName,
JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList),
},
})
}
func CountUpJobsWaiting(r *structs.Robot, jobId string, jobName string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
r.JobsWaitingCount++
r.JobsWaitingNameList = append(r.JobsWaitingNameList, structs.JobWaitingName{
JobId: jobId,
Name: jobName,
})
BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount,
Body: struct {
RobotId string
JobsWaitingCount int
JobsWaitingNameList []string
}{
RobotId: r.Id,
JobsWaitingCount: r.JobsWaitingCount,
JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList),
},
})
}
func RemoveJobFromJobsWaitingList(r *structs.Robot, jobId string) {
for i, j := range r.JobsWaitingNameList {
if j.JobId == jobId {
r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...)
break
}
}
}
func CountDownJobsWaiting(r *structs.Robot, jobId string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
if r.JobsWaitingCount > 0 {
r.JobsWaitingCount--
}
BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount,
Body: struct {
RobotId string
JobsWaitingCount int
JobsWaitingNameList []string
}{
RobotId: r.Id,
JobsWaitingCount: r.JobsWaitingCount,
JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList),
},
})
}
func BroadcastSSEMessage(message structs.SSEMessage) {
marshaledMessage, err := json.Marshal(message)
if err != nil {
log.Error().Msgf("Error marshaling SSE message: %v", err)
return
}
for clientId, sseClient := range cache.GetSSEClients() {
sseClient.MessageChannel <- structs.SSEClientChannelMessage{
ClientId: clientId,
Message: marshaledMessage,
}
}
}
/*
func AddJobNameToJobsWaitingList(r *structs.Robot, jobName string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
r.JobsWaitingNameList = append(r.JobsWaitingNameList, jobName)
}
func RemoveJobNameFromJobsWaitingList(r *structs.Robot, jobName string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
for i, j := range r.JobsWaitingNameList {
if j == jobName {
r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...)
break
}
}
}
*/