diff --git a/modules/cache/robots.go b/modules/cache/robots.go index eb74d42..13bf816 100644 --- a/modules/cache/robots.go +++ b/modules/cache/robots.go @@ -48,10 +48,9 @@ func GetAllRobots(query rspagination.PageQuery) structs.RobotsResponse { Name: v.Name, Status: v.Status, Address: v.Address, - CurrentJobId: v.CurrentJobId, CurrentJobName: v.CurrentJobName, JobsWaitingCount: v.JobsWaitingCount, - JobsWaitingNameList: v.JobsWaitingNameList, + JobsWaitingNameList: utils.GetJobsWaitingNameList(v.JobsWaitingNameList), FirmwareVersion: v.FirmwareVersion, ConnectedAt: v.ConnectedAt, CreatedAt: v.CreatedAt, diff --git a/modules/robot/robot.go b/modules/robot/robot.go index 21d051c..d203805 100644 --- a/modules/robot/robot.go +++ b/modules/robot/robot.go @@ -2,6 +2,7 @@ package robot import ( "encoding/json" + "fmt" "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/database" "jannex/robot-control-manager/modules/logger" @@ -91,6 +92,114 @@ func UpdateRobotStatus(robot *structs.Robot, status uint8) { }) } +func ProcessJobTask(r *structs.Robot, jobId string, jobName string) { + if r.CurrentJobId == "" { + UpdateRobotCurrentJob(r, jobId, jobName) + } + + if r.CurrentJobId != "" && r.CurrentJobId != jobId { + CountUpJobsWaiting(r, jobId, jobName) + } + + for r.CurrentJobId != "" && r.CurrentJobId != jobId { + // wait for current job to finish + fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount) + time.Sleep(2 * time.Second) + } + + RemoveJobFromJobsWaitingList(r, jobId) + UpdateRobotCurrentJob(r, jobId, jobName) + + fmt.Println("processing job", jobId) +} + +func UpdateRobotCurrentJob(r *structs.Robot, jobId string, jobName string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + if r.CurrentJobId == jobId { + return + } + + r.CurrentJobId = jobId + r.CurrentJobName = jobName + + database.DB.Model(&structs.Robot{}). + Where("id = ?", r.Id). + Updates(map[string]interface{}{"current_job_id": jobId, "current_job_name": jobName}) + + log.Info().Msgf("Robot %s current job id updated to %s", r.Name, jobId) + + BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdUpdateRobotCurrentJob, + Body: struct { + RobotId string + JobName string + JobsWaitingNameList []string + }{ + RobotId: r.Id, + JobName: jobName, + JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), + }, + }) +} + +func CountUpJobsWaiting(r *structs.Robot, jobId string, jobName string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + r.JobsWaitingCount++ + + r.JobsWaitingNameList = append(r.JobsWaitingNameList, structs.JobWaitingName{ + JobId: jobId, + Name: jobName, + }) + + BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, + Body: struct { + RobotId string + JobsWaitingCount int + JobsWaitingNameList []string + }{ + RobotId: r.Id, + JobsWaitingCount: r.JobsWaitingCount, + JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), + }, + }) +} + +func RemoveJobFromJobsWaitingList(r *structs.Robot, jobId string) { + for i, j := range r.JobsWaitingNameList { + if j.JobId == jobId { + r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...) + break + } + } +} + +func CountDownJobsWaiting(r *structs.Robot, jobId string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + if r.JobsWaitingCount > 0 { + r.JobsWaitingCount-- + } + + BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdUpdateRobotJobsWaitingCount, + Body: struct { + RobotId string + JobsWaitingCount int + JobsWaitingNameList []string + }{ + RobotId: r.Id, + JobsWaitingCount: r.JobsWaitingCount, + JobsWaitingNameList: utils.GetJobsWaitingNameList(r.JobsWaitingNameList), + }, + }) +} + func BroadcastSSEMessage(message structs.SSEMessage) { marshaledMessage, err := json.Marshal(message) @@ -106,3 +215,24 @@ func BroadcastSSEMessage(message structs.SSEMessage) { } } } + +/* +func AddJobNameToJobsWaitingList(r *structs.Robot, jobName string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + r.JobsWaitingNameList = append(r.JobsWaitingNameList, jobName) +} + +func RemoveJobNameFromJobsWaitingList(r *structs.Robot, jobName string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + for i, j := range r.JobsWaitingNameList { + if j == jobName { + r.JobsWaitingNameList = append(r.JobsWaitingNameList[:i], r.JobsWaitingNameList[i+1:]...) + break + } + } +} +*/ diff --git a/modules/structs/robot.go b/modules/structs/robot.go index d1b8a42..333485d 100644 --- a/modules/structs/robot.go +++ b/modules/structs/robot.go @@ -1,7 +1,6 @@ package structs import ( - "fmt" "sync" "time" ) @@ -16,21 +15,25 @@ type Robot struct { FirmwareVersion string CurrentJobId string CurrentJobName string - JobMutex sync.Mutex `gorm:"-"` - JobsWaitingCount int `gorm:"-"` - JobsWaitingNameList []string `gorm:"-"` - LastTaskAt time.Time `gorm:"-"` - ConnectedAt time.Time `gorm:"-"` + JobMutex sync.Mutex `gorm:"-"` + JobsWaitingCount int `gorm:"-"` + JobsWaitingNameList []JobWaitingName `gorm:"-"` + LastTaskAt time.Time `gorm:"-"` + ConnectedAt time.Time `gorm:"-"` CreatedAt time.Time } +type JobWaitingName struct { + JobId string + Name string +} + type APIRobot struct { Id string Type uint8 Name string Status uint8 Address string - CurrentJobId string CurrentJobName string JobsWaitingCount int JobsWaitingNameList []string @@ -39,13 +42,14 @@ type APIRobot struct { CreatedAt time.Time } +/* func (r *Robot) CountUpJobsWaiting() { r.JobMutex.Lock() defer r.JobMutex.Unlock() r.JobsWaitingCount++ -} - +} */ +/* func (r *Robot) CountDownJobsWaiting() { r.JobMutex.Lock() defer r.JobMutex.Unlock() @@ -54,12 +58,13 @@ func (r *Robot) CountDownJobsWaiting() { r.JobsWaitingCount-- } } - +*/ +/* func (r *Robot) ProcessJobTask(jobId string) { fmt.Println("wait for job", jobId) if r.CurrentJobId == "" { - r.SetCurrentJobId(jobId) + robot.UpdateRobotCurrentJobId(r, jobId) } if r.CurrentJobId != "" && r.CurrentJobId != jobId { @@ -72,17 +77,18 @@ func (r *Robot) ProcessJobTask(jobId string) { time.Sleep(2 * time.Second) } - r.SetCurrentJobId(jobId) + robot.UpdateRobotCurrentJobId(r, jobId) fmt.Println("processing job", jobId) -} +} */ +/* func (r *Robot) SetCurrentJobId(jobId string) { r.JobMutex.Lock() defer r.JobMutex.Unlock() r.CurrentJobId = jobId -} +} */ type UnauthorizedRobot struct { Id string diff --git a/modules/utils/globals.go b/modules/utils/globals.go index 8da7d89..fa9efc9 100644 --- a/modules/utils/globals.go +++ b/modules/utils/globals.go @@ -24,12 +24,14 @@ const ( ) const ( - SSESentCmdUpdateRobotStatus = 1 - SSESentCmdAddUnauthorizedRobot = 2 - SSESentCmdAddRobot = 3 - SSESentCmdRemoveUnauthorizedRobot = 4 - SSESentCmdRemoveRobot = 5 - SSESentCmdRobotUpdated = 6 + SSESentCmdUpdateRobotStatus = 1 + SSESentCmdAddUnauthorizedRobot = 2 + SSESentCmdAddRobot = 3 + SSESentCmdRemoveUnauthorizedRobot = 4 + SSESentCmdRemoveRobot = 5 + SSESentCmdRobotUpdated = 6 + SSESentCmdUpdateRobotCurrentJob = 7 + SSESentCmdUpdateRobotJobsWaitingCount = 8 ) var ( diff --git a/modules/utils/utils.go b/modules/utils/utils.go index 7b0a2e2..3f1fb6c 100644 --- a/modules/utils/utils.go +++ b/modules/utils/utils.go @@ -1,5 +1,7 @@ package utils +import "jannex/robot-control-manager/modules/structs" + func GetRobotTypeString(t uint8) string { switch t { case RobotTypeRex: @@ -27,3 +29,13 @@ func GetRobotStatusString(s uint8) string { return "unknown" } } + +func GetJobsWaitingNameList(list []structs.JobWaitingName) []string { + var jobsWaitingNameList []string + + for _, j := range list { + jobsWaitingNameList = append(jobsWaitingNameList, j.Name) + } + + return jobsWaitingNameList +} diff --git a/routers/api/v1/control/control.go b/routers/api/v1/control/control.go index 719f7d5..30556d7 100644 --- a/routers/api/v1/control/control.go +++ b/routers/api/v1/control/control.go @@ -3,7 +3,9 @@ package control import ( "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/request" + "jannex/robot-control-manager/modules/robot" "jannex/robot-control-manager/modules/structs" + "jannex/robot-control-manager/modules/utils" "git.ex.umbach.dev/Alex/roese-utils/rsutils" "github.com/gofiber/fiber/v2" @@ -47,18 +49,21 @@ func ControlRex(c *fiber.Ctx) error { fmt.Printf("y-value: %d\n", *body.Task.Y) } */ - robot := cache.GetRobotByName(body.RobotName) + r := cache.GetRobotByName(body.RobotName) - if robot == nil { + if r == nil { return c.SendStatus(fiber.StatusUnprocessableEntity) } - robot.ProcessJobTask(body.JobId) + robot.ProcessJobTask(r, body.JobId, body.JobName) + robot.UpdateRobotStatus(r, utils.RobotStatusRunning) - if err := request.Request(fiber.MethodPost, robot.Address+":5000/api/v1/control", body.Task); err != nil { + if err := request.Request(fiber.MethodPost, r.Address+":5000/api/v1/control", body.Task); err != nil { return c.JSON(structs.StatusResponse{Status: "err"}) } + robot.UpdateRobotStatus(r, utils.RobotStatusIdle) + return c.JSON(structs.StatusResponse{Status: "ok"}) } @@ -86,13 +91,14 @@ func FinishControlRex(c *fiber.Ctx) error { return c.SendStatus(fiber.StatusBadRequest) } - robot := cache.GetRobotByName(body.RobotName) + r := cache.GetRobotByName(body.RobotName) - if robot == nil || robot.CurrentJobId != body.JobId { + if r == nil || r.CurrentJobId != body.JobId { return c.SendStatus(fiber.StatusUnprocessableEntity) } - robot.SetCurrentJobId("") + robot.CountDownJobsWaiting(r, body.JobId) + robot.UpdateRobotCurrentJob(r, "", "") return c.SendStatus(fiber.StatusOK) } diff --git a/testclient/testrobot.py b/testclient/testrobot.py index a7a5f5f..4c8e943 100644 --- a/testclient/testrobot.py +++ b/testclient/testrobot.py @@ -35,7 +35,7 @@ def hello(): print("controlling robot", body) - time.sleep(15) + time.sleep(3) print("robot controlled")