From 6f9831191623e589e57cde2563f3d868cd491486 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 12 Oct 2023 22:20:41 +0200 Subject: [PATCH] robot handling --- main.go | 9 ++- modules/cache/robots.go | 9 +++ modules/logger/logger.go | 4 -- modules/request/request.go | 45 +++++++++++++ modules/robot/robot.go | 83 ++++++++++++++++++++++++ modules/structs/robot.go | 80 +++++++++++++++++++---- modules/utils/globals.go | 35 ++++++++-- routers/api/v1/control/control.go | 85 ++++++------------------- routers/api/v1/permitjoin/permitjoin.go | 7 -- routers/api/v1/robot/robot.go | 19 +++--- routers/router/router.go | 4 +- testclient/testrobot.py | 7 +- 12 files changed, 278 insertions(+), 109 deletions(-) create mode 100644 modules/request/request.go create mode 100644 modules/robot/robot.go diff --git a/main.go b/main.go index c785335..9ea161d 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/config" "jannex/robot-control-manager/modules/database" + "jannex/robot-control-manager/modules/robot" "jannex/robot-control-manager/modules/utils" "jannex/robot-control-manager/routers/router" "os" @@ -47,7 +48,7 @@ MARIADB_PASSWORD=db_password MARIADB_DATABASE_NAME=db_database_name`) config.LoadConfig() - rslogger.InitLogger(config.Cfg.Debug, config.Cfg.ColorizedOutput, config.Cfg.LogManagerServerUrl) + rslogger.InitLogger(false, config.Cfg.ColorizedOutput, config.Cfg.LogManagerServerUrl) if os.Getenv("DOCKER") != "" { fmt.Println("Waiting for mariadb docker") @@ -74,6 +75,12 @@ func main() { // TODO: Remove this cache.SetPermitJoin(true) + robot.LoadRobotsFromDatabase() + + fmt.Println("loaded robots", cache.GetAllRobots()) + + go robot.RobotPingHandler() + rcmlogger.AddSystemLog("Server started") app.Listen(config.Cfg.Host + ":" + config.Cfg.Port) diff --git a/modules/cache/robots.go b/modules/cache/robots.go index 78e3ed3..18454b0 100644 --- a/modules/cache/robots.go +++ b/modules/cache/robots.go @@ -15,6 +15,15 @@ func AddRobot(robot *structs.Robot) { robots[robot.Id] = robot } +func AddRobots(newRobots []structs.Robot) { + rMu.Lock() + defer rMu.Unlock() + + for _, r := range newRobots { + robots[r.Id] = &r + } +} + func GetRobots() map[string]*structs.Robot { rMu.RLock() defer rMu.RUnlock() diff --git a/modules/logger/logger.go b/modules/logger/logger.go index b9eac45..417e38a 100644 --- a/modules/logger/logger.go +++ b/modules/logger/logger.go @@ -2,16 +2,12 @@ package logger import ( "fmt" - "jannex/robot-control-manager/modules/config" "git.ex.umbach.dev/Alex/roese-utils/rslogger" "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" ) func AddSystemLog(format string, v ...any) { - log.Info().Msgf("serverurl %v", config.Cfg.LogManagerServerUrl) - go rslogger.LogManagerRequestClient(fiber.MethodPost, rslogger.LogManagerRequestBody{ Type: "robot-control-manager", Logs: []string{"I " + rslogger.GetTime() + fmt.Sprintf(format, v...)}}) diff --git a/modules/request/request.go b/modules/request/request.go new file mode 100644 index 0000000..764da83 --- /dev/null +++ b/modules/request/request.go @@ -0,0 +1,45 @@ +package request + +import ( + "encoding/json" + + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" +) + +func Request(requestMethod string, url string, body any) error { + a := fiber.AcquireAgent() + + req := a.Request() + req.Header.SetMethod(requestMethod) + + addr := "http://" + url + + req.SetRequestURI(addr) + req.Header.SetContentType("application/json") + + reqestBodyBytes, err := json.Marshal(body) + + if err != nil { + log.Error().Msgf("Failed to marshal request body, err: %s", err) + return err + } + + req.SetBody(reqestBodyBytes) + + if err := a.Parse(); err != nil { + log.Error().Msgf("Failed to parse request, err: %s", err) + return err + } + + code, body, errB := a.Bytes() + + if len(errB) != 0 { + log.Error().Msgf("Failed to parse bytes %v", errB) + return errB[0] + } + + log.Info().Msgf("code %v body %v", code, body) + + return nil +} diff --git a/modules/robot/robot.go b/modules/robot/robot.go new file mode 100644 index 0000000..64f7830 --- /dev/null +++ b/modules/robot/robot.go @@ -0,0 +1,83 @@ +package robot + +import ( + "jannex/robot-control-manager/modules/cache" + "jannex/robot-control-manager/modules/database" + "jannex/robot-control-manager/modules/logger" + "jannex/robot-control-manager/modules/request" + "jannex/robot-control-manager/modules/structs" + "jannex/robot-control-manager/modules/utils" + "time" + + "github.com/gofiber/fiber/v2" +) + +func RobotPingHandler() { + ticker := time.NewTicker(utils.RobotPingHandlerInterval * time.Second) + + for range ticker.C { + for _, robot := range cache.GetRobots() { + if robot.Status == utils.RobotStatusOffline { + continue + } + + err := request.Request(fiber.MethodGet, robot.Address+":5000/api/v1/ping", nil) + + if err != nil { + logger.AddSystemLog("Robot ping request failed: %v", err) + + if robot.PingRetries == utils.RobotPingRetries { + UpdateRobotStatus(robot, utils.RobotStatusOffline) + + logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries) + + cache.RemoveRobotById(robot.Id) + continue + } + + robot.PingRetries++ + + logger.AddSystemLog("Trying to ping robot %s for %v times failed", robot.Name, robot.PingRetries) + + if robot.Status != utils.RobotStatusConnecting { + UpdateRobotStatus(robot, utils.RobotStatusConnecting) + } + continue + } + + if robot.Status == utils.RobotStatusConnecting { + UpdateRobotStatus(robot, utils.RobotStatusIdle) + } + + if robot.PingRetries != 0 { + robot.PingRetries = 0 + } + } + } +} + +func LoadRobotsFromDatabase() { + var robots []structs.Robot + + database.DB.Find(&robots) + + cache.AddRobots(robots) + + for _, robot := range cache.GetRobots() { + if robot.Status == utils.RobotStatusOffline { + continue + } + + UpdateRobotStatus(robot, utils.RobotStatusConnecting) + } +} + +func UpdateRobotStatus(robot *structs.Robot, status uint8) { + robot.Status = status + + database.DB.Model(&structs.Robot{}). + Where("id = ?", robot.Id). + Update("status", status) + + logger.AddSystemLog("Robot %s status changed to %s", robot.Name, utils.GetRobotStatusString(status)) +} diff --git a/modules/structs/robot.go b/modules/structs/robot.go index 5f9801f..05e9545 100644 --- a/modules/structs/robot.go +++ b/modules/structs/robot.go @@ -1,18 +1,71 @@ package structs -import "time" +import ( + "fmt" + "sync" + "time" +) type Robot struct { - Id string - Type uint8 - Name string - Status uint8 - Address string `gorm:"-"` - CurrentJobId string - CurrentTask string - LastTaskAt time.Time `gorm:"-"` - ConnectedAt time.Time `gorm:"-"` - CreatedAt time.Time + Id string + Type uint8 + Name string + Status uint8 + PingRetries uint8 `gorm:"-"` + Address string + CurrentJobId string + CurrentJobName string + JobMutex sync.Mutex `gorm:"-"` + JobsWaitingCount int `gorm:"-"` + JobsWaitingNameList []string `gorm:"-"` + LastTaskAt time.Time `gorm:"-"` + ConnectedAt time.Time `gorm:"-"` + CreatedAt time.Time +} + +func (r *Robot) CountUpJobsWaiting() { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + r.JobsWaitingCount++ +} + +func (r *Robot) CountDownJobsWaiting() { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + if r.JobsWaitingCount > 0 { + r.JobsWaitingCount-- + } +} + +func (r *Robot) ProcessJobTask(jobId string) { + fmt.Println("wait for job", jobId) + + if r.CurrentJobId == "" { + r.SetCurrentJobId(jobId) + } + + if r.CurrentJobId != "" && r.CurrentJobId != jobId { + r.CountUpJobsWaiting() + } + + for r.CurrentJobId != "" && r.CurrentJobId != jobId { + // wait for current job to finish + fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount) + time.Sleep(2 * time.Second) + } + + r.SetCurrentJobId(jobId) + + fmt.Println("processing job", jobId) +} + +func (r *Robot) SetCurrentJobId(jobId string) { + r.JobMutex.Lock() + defer r.JobMutex.Unlock() + + r.CurrentJobId = jobId } type UnauthorizedRobot struct { @@ -53,3 +106,8 @@ type RobotIdParams struct { type RobotNameParams struct { RobotName string } + +type RobotFinishBody struct { + RobotName string + JobId string +} diff --git a/modules/utils/globals.go b/modules/utils/globals.go index 9b1e6a7..21734fc 100644 --- a/modules/utils/globals.go +++ b/modules/utils/globals.go @@ -1,15 +1,21 @@ package utils const ( - RobotStatusIdle uint8 = iota - RobotStatusRunning - RobotStatusError - RobotStatusOffline + RobotPingRetries = 3 + RobotPingHandlerInterval = 5 // seconds ) const ( - RobotTypeRex uint8 = iota - RobotTypeYeet + RobotStatusIdle = 1 + RobotStatusRunning = 2 + RobotStatusConnecting = 3 + RobotStatusError = 4 + RobotStatusOffline = 5 +) + +const ( + RobotTypeRex = 1 + RobotTypeYeet = 2 ) func GetRobotTypeString(t uint8) string { @@ -22,3 +28,20 @@ func GetRobotTypeString(t uint8) string { return "unknown" } } + +func GetRobotStatusString(s uint8) string { + switch s { + case RobotStatusIdle: + return "idle" + case RobotStatusRunning: + return "running" + case RobotStatusConnecting: + return "connecting" + case RobotStatusError: + return "error" + case RobotStatusOffline: + return "offline" + default: + return "unknown" + } +} diff --git a/routers/api/v1/control/control.go b/routers/api/v1/control/control.go index c29f9a2..719f7d5 100644 --- a/routers/api/v1/control/control.go +++ b/routers/api/v1/control/control.go @@ -1,18 +1,16 @@ package control import ( - "encoding/json" - "fmt" "jannex/robot-control-manager/modules/cache" + "jannex/robot-control-manager/modules/request" "jannex/robot-control-manager/modules/structs" "git.ex.umbach.dev/Alex/roese-utils/rsutils" "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" ) func ControlRex(c *fiber.Ctx) error { - // swagger:operation POST /control/0 control controlRex + // swagger:operation POST /control/1 control controlRex // --- // summary: Control Rex. // description: | @@ -36,77 +34,36 @@ func ControlRex(c *fiber.Ctx) error { // "422": // description: Robot not found - log.Info().Msgf("body %v", string(c.Body())) - var body structs.ControlBody if err := rsutils.BodyParserHelper(c, &body); err != nil { return c.SendStatus(fiber.StatusBadRequest) } - log.Info().Msgf("ControlRex: %v", body) - - if body.Task.Y == nil { - fmt.Println("y not sent") - } else { - fmt.Printf("y-value: %d\n", *body.Task.Y) - } + /* + if body.Task.Y == nil { + fmt.Println("y not sent") + } else { + fmt.Printf("y-value: %d\n", *body.Task.Y) + } */ robot := cache.GetRobotByName(body.RobotName) - if robot.Id == "" { + if robot == nil { return c.SendStatus(fiber.StatusUnprocessableEntity) } - log.Info().Msgf("robot %v", robot) + robot.ProcessJobTask(body.JobId) - // wait until the robot finished the job - - for robot.CurrentJobId != "" { - robot.CurrentJobId = body.JobId - log.Debug().Msgf("robot is free %v", body.JobId) - - controlRexRequest(robot.Address, body) - break + if err := request.Request(fiber.MethodPost, robot.Address+":5000/api/v1/control", body.Task); err != nil { + return c.JSON(structs.StatusResponse{Status: "err"}) } - return c.SendStatus(fiber.StatusOK) -} - -func controlRexRequest(address string, body any) { - a := fiber.AcquireAgent() - - // TODO: port only for testing - remove it - url := "http://" + address + ":5000" + "/api/v1/control" - - req := a.Request() - req.Header.SetMethod(fiber.MethodPost) - req.SetRequestURI(url) - req.Header.SetContentType("application/json") - - log.Info().Msgf("url %s", url) - - reqestBodyBytes, err := json.Marshal(body) - - if err != nil { - log.Error().Msgf("Failed to marshal request body, err: %s", err) - return - } - - req.SetBody(reqestBodyBytes) - - if err := a.Parse(); err != nil { - log.Error().Msgf("Failed to parse request, err: %s", err) - return - } - - code, body, _ := a.Bytes() - - log.Info().Msgf("code %v body %v", code, body) + return c.JSON(structs.StatusResponse{Status: "ok"}) } func FinishControlRex(c *fiber.Ctx) error { - // swagger:operation POST /control/0/finish/{robotName} control finishControlRex + // swagger:operation POST /control/1/finish control finishControlRex // --- // summary: Finish control Rex. // description: | @@ -123,23 +80,19 @@ func FinishControlRex(c *fiber.Ctx) error { // "422": // description: Robot not found - var params structs.RobotNameParams + var body structs.RobotFinishBody - if err := rsutils.ParamsParserHelper(c, ¶ms); err != nil { + if err := rsutils.BodyParserHelper(c, &body); err != nil { return c.SendStatus(fiber.StatusBadRequest) } - log.Debug().Msgf("before finish control robot %v", cache.GetRobotByName(params.RobotName).CurrentJobId) + robot := cache.GetRobotByName(body.RobotName) - robot := cache.GetRobotByName(params.RobotName) - - if robot.Id == "" { + if robot == nil || robot.CurrentJobId != body.JobId { return c.SendStatus(fiber.StatusUnprocessableEntity) } - robot.CurrentJobId = "" - - log.Debug().Msgf("finish control robot %v", cache.GetRobotByName(params.RobotName).CurrentJobId) + robot.SetCurrentJobId("") return c.SendStatus(fiber.StatusOK) } diff --git a/routers/api/v1/permitjoin/permitjoin.go b/routers/api/v1/permitjoin/permitjoin.go index 0ffa04b..70f5d61 100644 --- a/routers/api/v1/permitjoin/permitjoin.go +++ b/routers/api/v1/permitjoin/permitjoin.go @@ -7,7 +7,6 @@ import ( "git.ex.umbach.dev/Alex/roese-utils/rsutils" "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" ) func SetPermitJoin(c *fiber.Ctx) error { @@ -38,15 +37,9 @@ func SetPermitJoin(c *fiber.Ctx) error { if params.Enabled == 0 { cache.SetPermitJoin(false) - - log.Debug().Msgf("Permit join disabled") - logger.AddSystemLog("Permit join disabled") } else { cache.SetPermitJoin(true) - - log.Debug().Msgf("Permit join enabled") - logger.AddSystemLog("Permit join enabled") } diff --git a/routers/api/v1/robot/robot.go b/routers/api/v1/robot/robot.go index 62655d9..3e65a4a 100644 --- a/routers/api/v1/robot/robot.go +++ b/routers/api/v1/robot/robot.go @@ -11,7 +11,6 @@ import ( "git.ex.umbach.dev/Alex/roese-utils/rsutils" "github.com/gofiber/fiber/v2" "github.com/google/uuid" - "github.com/rs/zerolog/log" ) func FirstRequest(c *fiber.Ctx) error { @@ -52,8 +51,6 @@ func FirstRequest(c *fiber.Ctx) error { database.DB.First(&foundRobot, "id = ?", body.Id) if !cache.IsPermitJoinEnabled() && foundRobot.Id == "" { - log.Info().Msgf("Permit join is enabled") - logger.AddSystemLog("Unauthorized robot tried to connect with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) return c.SendStatus(fiber.StatusForbidden) @@ -72,8 +69,6 @@ func FirstRequest(c *fiber.Ctx) error { cache.AddUnauthorizedRobot(&newUnauthorizedRobot) - log.Debug().Msgf("Added unauthorized robot %s (%v)", body.Id, body.Type) - logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) // TODO: send robot to sse @@ -85,22 +80,24 @@ func FirstRequest(c *fiber.Ctx) error { Status: utils.RobotStatusIdle, Address: c.IP(), CurrentJobId: "", - CurrentTask: "", ConnectedAt: time.Now(), CreatedAt: foundRobot.CreatedAt, } cache.AddRobot(&newRobot) - log.Debug().Msgf("Added robot %s (%v)", body.Id, body.Type) + database.DB.Model(&structs.Robot{}). + Where("id = ?", newRobot.Id). + Updates(structs.Robot{ + Status: newRobot.Status, + Address: newRobot.Address, + }) logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) // TODO: send robot to sse } - log.Debug().Msgf("robots %v", cache.GetUnauthorizedRobots()) - return c.JSON(structs.StatusResponse{Status: "ok"}) } @@ -134,6 +131,8 @@ func AuthorizeRobot(c *fiber.Ctx) error { } if !cache.IsUnauthorizedRobotInList(params.RobotId) { + logger.AddSystemLog("Unauthorized robot with id %v not found", params.RobotId) + return c.SendStatus(fiber.StatusUnprocessableEntity) } @@ -158,8 +157,6 @@ func AuthorizeRobot(c *fiber.Ctx) error { cache.RemoveUnauthorizedRobotById(params.RobotId) - log.Info().Msgf("Authorize robot %s", params.RobotId) - logger.AddSystemLog("Robot authorized with id %v and type %v", params.RobotId, utils.GetRobotTypeString(unauthorizedRobot.Type)) return c.SendStatus(fiber.StatusOK) diff --git a/routers/router/router.go b/routers/router/router.go index 72d0ca0..8bfaed3 100644 --- a/routers/router/router.go +++ b/routers/router/router.go @@ -22,8 +22,8 @@ func SetupRoutes(app *fiber.App) { rs.Get("/", robots.GetRobots) c := v1.Group("/control") - c.Post("/0", control.ControlRex) - c.Post("/0/finish/:robotName", control.FinishControlRex) + c.Post("/1", control.ControlRex) + c.Post("/1/finish", control.FinishControlRex) pj := v1.Group("/permitjoin") pj.Post("/:enabled", permitjoin.SetPermitJoin) diff --git a/testclient/testrobot.py b/testclient/testrobot.py index 14fdca5..59612be 100644 --- a/testclient/testrobot.py +++ b/testclient/testrobot.py @@ -16,7 +16,7 @@ class RexRobot: print("connecting with robot server") res = requests.api.post(robot_control_server_url + "/robot", - json={'id': self.id, 'type': 0, 'version': self.version}) + json={'id': self.id, 'type': 1, 'version': self.version}) if res.status_code == 403: print("permit join disabled") @@ -42,5 +42,10 @@ def hello(): return jsonify({'status': 'ok'}) +@app.route('/api/v1/ping', methods=['GET']) +def ping(): + print("pong") + return jsonify({'status': 'ok'}) + if __name__ == '__main__': app.run(debug=False) \ No newline at end of file