From e13598dfda5f7d9cfd6973d36dc770f986f64b72 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 14 Oct 2023 20:42:31 +0200 Subject: [PATCH] update robot and sse improvement --- commit_and_push.sh | 7 ++ main.go | 7 +- modules/cache/robots.go | 42 ++++++++++- modules/cache/sse.go | 32 ++++++++ modules/config/config.go | 2 + modules/request/request.go | 8 +- modules/robot/robot.go | 31 +++++++- modules/structs/robot.go | 34 +++++++-- modules/structs/sseclient.go | 17 +++++ modules/utils/globals.go | 41 ++++------ modules/utils/utils.go | 29 ++++++++ modules/utils/validator.go | 9 ++- routers/api/v1/robot/robot.go | 136 +++++++++++++++++++++++++++------- routers/api/v1/sse/sse.go | 52 +++++++++++++ routers/router/router.go | 4 + testclient/testrobot.py | 8 +- 16 files changed, 384 insertions(+), 75 deletions(-) create mode 100755 commit_and_push.sh create mode 100644 modules/cache/sse.go create mode 100644 modules/structs/sseclient.go create mode 100644 modules/utils/utils.go create mode 100644 routers/api/v1/sse/sse.go diff --git a/commit_and_push.sh b/commit_and_push.sh new file mode 100755 index 0000000..554786f --- /dev/null +++ b/commit_and_push.sh @@ -0,0 +1,7 @@ +git add * + +read -p "Commit message: " commit_message + +git commit -m "$commit_message" + +git push -u origin main \ No newline at end of file diff --git a/main.go b/main.go index 1dd86a8..a584c5f 100644 --- a/main.go +++ b/main.go @@ -75,11 +75,10 @@ func main() { // TODO: Remove this cache.SetPermitJoin(true) - robot.LoadRobotsFromDatabase() - - go robot.RobotPingHandler() - rcmlogger.AddSystemLog("Server started") + robot.LoadRobotsFromDatabase() + go robot.RobotPingHandler() + app.Listen(config.Cfg.Host + ":" + config.Cfg.Port) } diff --git a/modules/cache/robots.go b/modules/cache/robots.go index 97ff6fc..eb74d42 100644 --- a/modules/cache/robots.go +++ b/modules/cache/robots.go @@ -22,8 +22,9 @@ func AddRobots(newRobots []structs.Robot) { rMu.Lock() defer rMu.Unlock() - for _, r := range newRobots { - robots[r.Id] = &r + for i := range newRobots { + r := &newRobots[i] + robots[r.Id] = r } } @@ -38,10 +39,23 @@ func GetAllRobots(query rspagination.PageQuery) structs.RobotsResponse { rMu.RLock() defer rMu.RUnlock() - var r []structs.Robot + var r []structs.APIRobot for _, v := range robots { - r = append(r, *v) + r = append(r, structs.APIRobot{ + Id: v.Id, + Type: v.Type, + Name: v.Name, + Status: v.Status, + Address: v.Address, + CurrentJobId: v.CurrentJobId, + CurrentJobName: v.CurrentJobName, + JobsWaitingCount: v.JobsWaitingCount, + JobsWaitingNameList: v.JobsWaitingNameList, + FirmwareVersion: v.FirmwareVersion, + ConnectedAt: v.ConnectedAt, + CreatedAt: v.CreatedAt, + }) } start, end := rspagination.GetPage(len(r), query.Page, utils.RobotsPageLimit) @@ -78,9 +92,29 @@ func GetRobotByName(robotName string) *structs.Robot { return nil } +func GetRobotById(robotId string) *structs.Robot { + rMu.RLock() + defer rMu.RUnlock() + + return robots[robotId] +} + func RemoveRobotById(robotId string) { rMu.Lock() defer rMu.Unlock() delete(robots, robotId) } + +func IsRobotNameInList(robotName string) bool { + rMu.RLock() + defer rMu.RUnlock() + + for _, r := range robots { + if r.Name == robotName { + return true + } + } + + return false +} diff --git a/modules/cache/sse.go b/modules/cache/sse.go new file mode 100644 index 0000000..3cba8b1 --- /dev/null +++ b/modules/cache/sse.go @@ -0,0 +1,32 @@ +package cache + +import ( + "jannex/robot-control-manager/modules/structs" + "sync" + + "github.com/google/uuid" +) + +var sseClients = make(map[uuid.UUID]structs.SSEClient) +var sMu sync.RWMutex + +func AddSSEClient(client structs.SSEClient) { + sMu.Lock() + defer sMu.Unlock() + + sseClients[uuid.New()] = client +} + +func DeleteSSEClient(id uuid.UUID) { + sMu.Lock() + defer sMu.Unlock() + + delete(sseClients, id) +} + +func GetSSEClients() map[uuid.UUID]structs.SSEClient { + sMu.RLock() + defer sMu.RUnlock() + + return sseClients +} diff --git a/modules/config/config.go b/modules/config/config.go index 222dfb1..6627e8b 100644 --- a/modules/config/config.go +++ b/modules/config/config.go @@ -15,6 +15,7 @@ type Config struct { Host string Port string LogManagerServerUrl string + SSEServerEnabled bool MariaDB MariaDB } @@ -41,6 +42,7 @@ func LoadConfig() { Host: os.Getenv("HOST"), Port: os.Getenv("PORT"), LogManagerServerUrl: os.Getenv("LOG_MANAGER_SERVER_URL"), + SSEServerEnabled: os.Getenv("SSE_SERVER_ENABLED") == "true", MariaDB: MariaDB{ Hostname: os.Getenv("MARIADB_HOSTNAME"), Port: os.Getenv("MARIADB_PORT"), diff --git a/modules/request/request.go b/modules/request/request.go index 764da83..662b082 100644 --- a/modules/request/request.go +++ b/modules/request/request.go @@ -2,6 +2,7 @@ package request import ( "encoding/json" + "errors" "github.com/gofiber/fiber/v2" "github.com/rs/zerolog/log" @@ -32,14 +33,17 @@ func Request(requestMethod string, url string, body any) error { return err } - code, body, errB := a.Bytes() + code, _, errB := a.Bytes() if len(errB) != 0 { log.Error().Msgf("Failed to parse bytes %v", errB) return errB[0] } - log.Info().Msgf("code %v body %v", code, body) + if code != 200 { + log.Error().Msgf("Request failed with code %v", code) + return errors.New("request failed") + } return nil } diff --git a/modules/robot/robot.go b/modules/robot/robot.go index 0606197..21d051c 100644 --- a/modules/robot/robot.go +++ b/modules/robot/robot.go @@ -1,6 +1,7 @@ package robot import ( + "encoding/json" "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/database" "jannex/robot-control-manager/modules/logger" @@ -10,6 +11,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" ) func RobotPingHandler() { @@ -30,8 +32,6 @@ func RobotPingHandler() { UpdateRobotStatus(robot, utils.RobotStatusOffline) logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries) - - //cache.RemoveRobotById(robot.Id) continue } @@ -79,5 +79,30 @@ func UpdateRobotStatus(robot *structs.Robot, status uint8) { Where("id = ?", robot.Id). Update("status", status) - logger.AddSystemLog("Robot %s status changed to %s", robot.Name, utils.GetRobotStatusString(status)) + BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdUpdateRobotStatus, + Body: struct { + RobotId string + Status uint8 + }{ + RobotId: robot.Id, + Status: status, + }, + }) +} + +func BroadcastSSEMessage(message structs.SSEMessage) { + marshaledMessage, err := json.Marshal(message) + + if err != nil { + log.Error().Msgf("Error marshaling SSE message: %v", err) + return + } + + for clientId, sseClient := range cache.GetSSEClients() { + sseClient.MessageChannel <- structs.SSEClientChannelMessage{ + ClientId: clientId, + Message: marshaledMessage, + } + } } diff --git a/modules/structs/robot.go b/modules/structs/robot.go index 873f7ea..d1b8a42 100644 --- a/modules/structs/robot.go +++ b/modules/structs/robot.go @@ -13,6 +13,7 @@ type Robot struct { Status uint8 PingRetries uint8 `gorm:"-"` Address string + FirmwareVersion string CurrentJobId string CurrentJobName string JobMutex sync.Mutex `gorm:"-"` @@ -23,6 +24,21 @@ type Robot struct { CreatedAt time.Time } +type APIRobot struct { + Id string + Type uint8 + Name string + Status uint8 + Address string + CurrentJobId string + CurrentJobName string + JobsWaitingCount int + JobsWaitingNameList []string + FirmwareVersion string + ConnectedAt time.Time + CreatedAt time.Time +} + func (r *Robot) CountUpJobsWaiting() { r.JobMutex.Lock() defer r.JobMutex.Unlock() @@ -69,11 +85,12 @@ func (r *Robot) SetCurrentJobId(jobId string) { } type UnauthorizedRobot struct { - Id string - Type uint8 - Address string - ConnectedAt time.Time - CreatedAt time.Time + Id string + Type uint8 + Address string + FirmwareVersion string + ConnectedAt time.Time + CreatedAt time.Time } // swagger:model FirstRequestBody @@ -95,7 +112,7 @@ type StatusResponse struct { // swagger:model RobotsResponse type RobotsResponse struct { - Robots []Robot + Robots []APIRobot TotalPages int } @@ -117,3 +134,8 @@ type RobotFinishBody struct { RobotName string JobId string } + +type UpdateRobotBody struct { + RobotId string + Name string +} diff --git a/modules/structs/sseclient.go b/modules/structs/sseclient.go new file mode 100644 index 0000000..9ce9e1a --- /dev/null +++ b/modules/structs/sseclient.go @@ -0,0 +1,17 @@ +package structs + +import "github.com/google/uuid" + +type SSEClient struct { + MessageChannel chan SSEClientChannelMessage +} + +type SSEClientChannelMessage struct { + ClientId uuid.UUID + Message []byte +} + +type SSEMessage struct { + Cmd int + Body any +} diff --git a/modules/utils/globals.go b/modules/utils/globals.go index c6685e8..8da7d89 100644 --- a/modules/utils/globals.go +++ b/modules/utils/globals.go @@ -5,6 +5,9 @@ const ( RobotPingHandlerInterval = 5 // seconds RobotsPageLimit = 10 UnauthorizedRobotsPageLimit = 10 + + minRobotNameLength = "2" + maxRobotNameLength = "30" ) const ( @@ -20,30 +23,18 @@ const ( RobotTypeYeet = 2 ) -func GetRobotTypeString(t uint8) string { - switch t { - case RobotTypeRex: - return "rex" - case RobotTypeYeet: - return "yeet" - default: - return "unknown" - } -} +const ( + SSESentCmdUpdateRobotStatus = 1 + SSESentCmdAddUnauthorizedRobot = 2 + SSESentCmdAddRobot = 3 + SSESentCmdRemoveUnauthorizedRobot = 4 + SSESentCmdRemoveRobot = 5 + SSESentCmdRobotUpdated = 6 +) -func GetRobotStatusString(s uint8) string { - switch s { - case RobotStatusIdle: - return "idle" - case RobotStatusRunning: - return "running" - case RobotStatusConnecting: - return "connecting" - case RobotStatusError: - return "error" - case RobotStatusOffline: - return "offline" - default: - return "unknown" +var ( + generalRules = map[string]string{ + "Name": "required,min=" + minRobotNameLength + ",max=" + maxRobotNameLength, + "Type": "required,numeric", } -} +) diff --git a/modules/utils/utils.go b/modules/utils/utils.go new file mode 100644 index 0000000..7b0a2e2 --- /dev/null +++ b/modules/utils/utils.go @@ -0,0 +1,29 @@ +package utils + +func GetRobotTypeString(t uint8) string { + switch t { + case RobotTypeRex: + return "rex" + case RobotTypeYeet: + return "yeet" + default: + return "unknown" + } +} + +func GetRobotStatusString(s uint8) string { + switch s { + case RobotStatusIdle: + return "idle" + case RobotStatusRunning: + return "running" + case RobotStatusConnecting: + return "connecting" + case RobotStatusError: + return "error" + case RobotStatusOffline: + return "offline" + default: + return "unknown" + } +} diff --git a/modules/utils/validator.go b/modules/utils/validator.go index 7ba25d0..1ce054a 100644 --- a/modules/utils/validator.go +++ b/modules/utils/validator.go @@ -1,5 +1,12 @@ package utils -func ValidatorInit() { +import ( + "jannex/robot-control-manager/modules/structs" + "git.ex.umbach.dev/Alex/roese-utils/rsvalidator" +) + +func ValidatorInit() { + rsvalidator.Validate.RegisterStructValidationMapRules(generalRules, + structs.UpdateRobotBody{}) } diff --git a/routers/api/v1/robot/robot.go b/routers/api/v1/robot/robot.go index 3e65a4a..53d1560 100644 --- a/routers/api/v1/robot/robot.go +++ b/routers/api/v1/robot/robot.go @@ -4,6 +4,7 @@ import ( "jannex/robot-control-manager/modules/cache" "jannex/robot-control-manager/modules/database" "jannex/robot-control-manager/modules/logger" + "jannex/robot-control-manager/modules/robot" "jannex/robot-control-manager/modules/structs" "jannex/robot-control-manager/modules/utils" "time" @@ -60,28 +61,33 @@ func FirstRequest(c *fiber.Ctx) error { now := time.Now() newUnauthorizedRobot := structs.UnauthorizedRobot{ - Id: body.Id, - Type: body.Type, - Address: c.IP(), - ConnectedAt: now, - CreatedAt: now, + Id: body.Id, + Type: body.Type, + Address: c.IP(), + FirmwareVersion: body.FirmwareVersion, + ConnectedAt: now, + CreatedAt: now, } cache.AddUnauthorizedRobot(&newUnauthorizedRobot) - logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdAddUnauthorizedRobot, + Body: &newUnauthorizedRobot, + }) - // TODO: send robot to sse + logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) } else { newRobot := structs.Robot{ - Id: body.Id, - Type: body.Type, - Name: foundRobot.Name, - Status: utils.RobotStatusIdle, - Address: c.IP(), - CurrentJobId: "", - ConnectedAt: time.Now(), - CreatedAt: foundRobot.CreatedAt, + Id: body.Id, + Type: body.Type, + Name: foundRobot.Name, + Status: utils.RobotStatusIdle, + Address: c.IP(), + CurrentJobId: "", + FirmwareVersion: body.FirmwareVersion, + ConnectedAt: time.Now(), + CreatedAt: foundRobot.CreatedAt, } cache.AddRobot(&newRobot) @@ -89,13 +95,18 @@ func FirstRequest(c *fiber.Ctx) error { database.DB.Model(&structs.Robot{}). Where("id = ?", newRobot.Id). Updates(structs.Robot{ - Status: newRobot.Status, - Address: newRobot.Address, + Status: newRobot.Status, + Type: newRobot.Type, + Address: newRobot.Address, + FirmwareVersion: newRobot.FirmwareVersion, }) - logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdAddRobot, + Body: &newRobot, + }) - // TODO: send robot to sse + logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type)) } return c.JSON(structs.StatusResponse{Status: "ok"}) @@ -143,13 +154,14 @@ func AuthorizeRobot(c *fiber.Ctx) error { } newRobot := structs.Robot{ - Id: params.RobotId, - Type: unauthorizedRobot.Type, - Name: uuid.New().String(), - Address: unauthorizedRobot.Address, - Status: utils.RobotStatusIdle, - ConnectedAt: unauthorizedRobot.ConnectedAt, - CreatedAt: unauthorizedRobot.CreatedAt, + Id: params.RobotId, + Type: unauthorizedRobot.Type, + Name: uuid.New().String(), + Address: unauthorizedRobot.Address, + Status: utils.RobotStatusIdle, + FirmwareVersion: unauthorizedRobot.FirmwareVersion, + ConnectedAt: unauthorizedRobot.ConnectedAt, + CreatedAt: unauthorizedRobot.CreatedAt, } cache.AddRobot(&newRobot) @@ -157,6 +169,11 @@ func AuthorizeRobot(c *fiber.Ctx) error { cache.RemoveUnauthorizedRobotById(params.RobotId) + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdAddRobot, + Body: &newRobot, + }) + logger.AddSystemLog("Robot authorized with id %v and type %v", params.RobotId, utils.GetRobotTypeString(unauthorizedRobot.Type)) return c.SendStatus(fiber.StatusOK) @@ -192,6 +209,11 @@ func DeleteRobot(c *fiber.Ctx) error { database.DB.Delete(&structs.Robot{}, "id = ?", params.RobotId) cache.RemoveRobotById(params.RobotId) + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdRemoveRobot, + Body: params.RobotId, + }) + logger.AddSystemLog("Robot deleted with id %v", params.RobotId) return c.SendStatus(fiber.StatusOK) @@ -226,7 +248,69 @@ func DenyUnauthorizedRobot(c *fiber.Ctx) error { cache.RemoveUnauthorizedRobotById(params.RobotId) + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdRemoveUnauthorizedRobot, + Body: params.RobotId, + }) + logger.AddSystemLog("Unauthorized robot denied with id %v", params.RobotId) return c.SendStatus(fiber.StatusOK) } + +func UpdateRobot(c *fiber.Ctx) error { + // swagger:operation PATCH /robot robot robotUpdate + // --- + // summary: Update robot. + // consumes: + // - application/json + // produces: + // - application/json + // parameters: + // - in: body + // name: body + // description: Update robot body. + // required: true + // schema: + // "$ref": "#/definitions/Robot" + // responses: + // "200": + // description: Robot updated + // "400": + // description: Invalid request body + // "422": + // description: Robot not found + + var body structs.UpdateRobotBody + + if err := rsutils.BodyParserHelper(c, &body); err != nil { + return c.SendStatus(fiber.StatusBadRequest) + } + + if !cache.IsRobotInList(body.RobotId) { + logger.AddSystemLog("Robot with id %v not found", body.RobotId) + + return c.SendStatus(fiber.StatusUnprocessableEntity) + } + + foundRobot := cache.GetRobotById(body.RobotId) + + if foundRobot == nil || cache.IsRobotNameInList(body.Name) { + return c.SendStatus(fiber.StatusUnprocessableEntity) + } + + foundRobot.Name = body.Name + + database.DB.Model(&structs.Robot{}). + Where("id = ?", foundRobot.Id). + Update("name", foundRobot.Name) + + robot.BroadcastSSEMessage(structs.SSEMessage{ + Cmd: utils.SSESentCmdRobotUpdated, + Body: body, + }) + + logger.AddSystemLog("Robot with id %v name changed to %v", body.RobotId, body.Name) + + return c.SendStatus(fiber.StatusOK) +} diff --git a/routers/api/v1/sse/sse.go b/routers/api/v1/sse/sse.go new file mode 100644 index 0000000..25c61a5 --- /dev/null +++ b/routers/api/v1/sse/sse.go @@ -0,0 +1,52 @@ +package sse + +import ( + "bufio" + "fmt" + "jannex/robot-control-manager/modules/cache" + "jannex/robot-control-manager/modules/config" + "jannex/robot-control-manager/modules/structs" + + "github.com/gofiber/fiber/v2" + "github.com/valyala/fasthttp" +) + +func SSE(c *fiber.Ctx) error { + if !config.Cfg.SSEServerEnabled { + return c.SendStatus(fiber.StatusNotFound) + } + + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + var sseclient structs.SSEClient + + sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage) + + cache.AddSSEClient(sseclient) + + for message := range sseclient.MessageChannel { + fmt.Fprintf(w, "data: %s\n\n", message.Message) + + err := w.Flush() + + if err != nil { + // Refreshing page in web browser will establish a new + // SSE connection, but only (the last) one is alive, so + // dead connections must be closed here. + + for id, sseClient := range cache.GetSSEClients() { + if id == message.ClientId { + close(sseClient.MessageChannel) + cache.DeleteSSEClient(id) + } + } + } + } + })) + + return nil +} diff --git a/routers/router/router.go b/routers/router/router.go index c6fe93c..84faa04 100644 --- a/routers/router/router.go +++ b/routers/router/router.go @@ -5,6 +5,7 @@ import ( "jannex/robot-control-manager/routers/api/v1/permitjoin" "jannex/robot-control-manager/routers/api/v1/robot" "jannex/robot-control-manager/routers/api/v1/robots" + "jannex/robot-control-manager/routers/api/v1/sse" "github.com/gofiber/fiber/v2" ) @@ -17,6 +18,7 @@ func SetupRoutes(app *fiber.App) { r.Post("/authorize/:robotId", robot.AuthorizeRobot) r.Delete("/:robotId", robot.DeleteRobot) r.Delete("/deny/:robotId", robot.DenyUnauthorizedRobot) + r.Patch("/", robot.UpdateRobot) rs := v1.Group("/robots") rs.Get("/", robots.GetRobots) @@ -31,5 +33,7 @@ func SetupRoutes(app *fiber.App) { pj := v1.Group("/permitjoin") pj.Post("/:enabled", permitjoin.SetPermitJoin) + v1.Get("/sse", sse.SSE) + app.Static("/", "./public/") } diff --git a/testclient/testrobot.py b/testclient/testrobot.py index 59612be..a7a5f5f 100644 --- a/testclient/testrobot.py +++ b/testclient/testrobot.py @@ -8,15 +8,15 @@ robot_control_server_url = 'http://localhost:50055/v1' class RexRobot: def __init__(self): - self.id = "B24" - self.version = "0.0.1" + self.id = "B29" + self.firmwareVersion = "0.0.1" self.currentJobId = "" # connecting with robot server print("connecting with robot server") res = requests.api.post(robot_control_server_url + "/robot", - json={'id': self.id, 'type': 1, 'version': self.version}) + json={'id': self.id, 'type': 1, 'firmwareVersion': self.firmwareVersion}) if res.status_code == 403: print("permit join disabled") @@ -48,4 +48,4 @@ def ping(): return jsonify({'status': 'ok'}) if __name__ == '__main__': - app.run(debug=False) \ No newline at end of file + app.run(debug=False, port=5000) \ No newline at end of file