update robot and sse improvement

main
alex 2023-10-14 20:42:31 +02:00
parent c8ea1f6ee9
commit e13598dfda
16 changed files with 384 additions and 75 deletions

7
commit_and_push.sh Executable file
View File

@ -0,0 +1,7 @@
git add *
read -p "Commit message: " commit_message
git commit -m "$commit_message"
git push -u origin main

View File

@ -75,11 +75,10 @@ func main() {
// TODO: Remove this
cache.SetPermitJoin(true)
robot.LoadRobotsFromDatabase()
go robot.RobotPingHandler()
rcmlogger.AddSystemLog("Server started")
robot.LoadRobotsFromDatabase()
go robot.RobotPingHandler()
app.Listen(config.Cfg.Host + ":" + config.Cfg.Port)
}

View File

@ -22,8 +22,9 @@ func AddRobots(newRobots []structs.Robot) {
rMu.Lock()
defer rMu.Unlock()
for _, r := range newRobots {
robots[r.Id] = &r
for i := range newRobots {
r := &newRobots[i]
robots[r.Id] = r
}
}
@ -38,10 +39,23 @@ func GetAllRobots(query rspagination.PageQuery) structs.RobotsResponse {
rMu.RLock()
defer rMu.RUnlock()
var r []structs.Robot
var r []structs.APIRobot
for _, v := range robots {
r = append(r, *v)
r = append(r, structs.APIRobot{
Id: v.Id,
Type: v.Type,
Name: v.Name,
Status: v.Status,
Address: v.Address,
CurrentJobId: v.CurrentJobId,
CurrentJobName: v.CurrentJobName,
JobsWaitingCount: v.JobsWaitingCount,
JobsWaitingNameList: v.JobsWaitingNameList,
FirmwareVersion: v.FirmwareVersion,
ConnectedAt: v.ConnectedAt,
CreatedAt: v.CreatedAt,
})
}
start, end := rspagination.GetPage(len(r), query.Page, utils.RobotsPageLimit)
@ -78,9 +92,29 @@ func GetRobotByName(robotName string) *structs.Robot {
return nil
}
func GetRobotById(robotId string) *structs.Robot {
rMu.RLock()
defer rMu.RUnlock()
return robots[robotId]
}
func RemoveRobotById(robotId string) {
rMu.Lock()
defer rMu.Unlock()
delete(robots, robotId)
}
func IsRobotNameInList(robotName string) bool {
rMu.RLock()
defer rMu.RUnlock()
for _, r := range robots {
if r.Name == robotName {
return true
}
}
return false
}

32
modules/cache/sse.go vendored Normal file
View File

@ -0,0 +1,32 @@
package cache
import (
"jannex/robot-control-manager/modules/structs"
"sync"
"github.com/google/uuid"
)
var sseClients = make(map[uuid.UUID]structs.SSEClient)
var sMu sync.RWMutex
func AddSSEClient(client structs.SSEClient) {
sMu.Lock()
defer sMu.Unlock()
sseClients[uuid.New()] = client
}
func DeleteSSEClient(id uuid.UUID) {
sMu.Lock()
defer sMu.Unlock()
delete(sseClients, id)
}
func GetSSEClients() map[uuid.UUID]structs.SSEClient {
sMu.RLock()
defer sMu.RUnlock()
return sseClients
}

View File

@ -15,6 +15,7 @@ type Config struct {
Host string
Port string
LogManagerServerUrl string
SSEServerEnabled bool
MariaDB MariaDB
}
@ -41,6 +42,7 @@ func LoadConfig() {
Host: os.Getenv("HOST"),
Port: os.Getenv("PORT"),
LogManagerServerUrl: os.Getenv("LOG_MANAGER_SERVER_URL"),
SSEServerEnabled: os.Getenv("SSE_SERVER_ENABLED") == "true",
MariaDB: MariaDB{
Hostname: os.Getenv("MARIADB_HOSTNAME"),
Port: os.Getenv("MARIADB_PORT"),

View File

@ -2,6 +2,7 @@ package request
import (
"encoding/json"
"errors"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
@ -32,14 +33,17 @@ func Request(requestMethod string, url string, body any) error {
return err
}
code, body, errB := a.Bytes()
code, _, errB := a.Bytes()
if len(errB) != 0 {
log.Error().Msgf("Failed to parse bytes %v", errB)
return errB[0]
}
log.Info().Msgf("code %v body %v", code, body)
if code != 200 {
log.Error().Msgf("Request failed with code %v", code)
return errors.New("request failed")
}
return nil
}

View File

@ -1,6 +1,7 @@
package robot
import (
"encoding/json"
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/database"
"jannex/robot-control-manager/modules/logger"
@ -10,6 +11,7 @@ import (
"time"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func RobotPingHandler() {
@ -30,8 +32,6 @@ func RobotPingHandler() {
UpdateRobotStatus(robot, utils.RobotStatusOffline)
logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries)
//cache.RemoveRobotById(robot.Id)
continue
}
@ -79,5 +79,30 @@ func UpdateRobotStatus(robot *structs.Robot, status uint8) {
Where("id = ?", robot.Id).
Update("status", status)
logger.AddSystemLog("Robot %s status changed to %s", robot.Name, utils.GetRobotStatusString(status))
BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdUpdateRobotStatus,
Body: struct {
RobotId string
Status uint8
}{
RobotId: robot.Id,
Status: status,
},
})
}
func BroadcastSSEMessage(message structs.SSEMessage) {
marshaledMessage, err := json.Marshal(message)
if err != nil {
log.Error().Msgf("Error marshaling SSE message: %v", err)
return
}
for clientId, sseClient := range cache.GetSSEClients() {
sseClient.MessageChannel <- structs.SSEClientChannelMessage{
ClientId: clientId,
Message: marshaledMessage,
}
}
}

View File

@ -13,6 +13,7 @@ type Robot struct {
Status uint8
PingRetries uint8 `gorm:"-"`
Address string
FirmwareVersion string
CurrentJobId string
CurrentJobName string
JobMutex sync.Mutex `gorm:"-"`
@ -23,6 +24,21 @@ type Robot struct {
CreatedAt time.Time
}
type APIRobot struct {
Id string
Type uint8
Name string
Status uint8
Address string
CurrentJobId string
CurrentJobName string
JobsWaitingCount int
JobsWaitingNameList []string
FirmwareVersion string
ConnectedAt time.Time
CreatedAt time.Time
}
func (r *Robot) CountUpJobsWaiting() {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
@ -69,11 +85,12 @@ func (r *Robot) SetCurrentJobId(jobId string) {
}
type UnauthorizedRobot struct {
Id string
Type uint8
Address string
ConnectedAt time.Time
CreatedAt time.Time
Id string
Type uint8
Address string
FirmwareVersion string
ConnectedAt time.Time
CreatedAt time.Time
}
// swagger:model FirstRequestBody
@ -95,7 +112,7 @@ type StatusResponse struct {
// swagger:model RobotsResponse
type RobotsResponse struct {
Robots []Robot
Robots []APIRobot
TotalPages int
}
@ -117,3 +134,8 @@ type RobotFinishBody struct {
RobotName string
JobId string
}
type UpdateRobotBody struct {
RobotId string
Name string
}

View File

@ -0,0 +1,17 @@
package structs
import "github.com/google/uuid"
type SSEClient struct {
MessageChannel chan SSEClientChannelMessage
}
type SSEClientChannelMessage struct {
ClientId uuid.UUID
Message []byte
}
type SSEMessage struct {
Cmd int
Body any
}

View File

@ -5,6 +5,9 @@ const (
RobotPingHandlerInterval = 5 // seconds
RobotsPageLimit = 10
UnauthorizedRobotsPageLimit = 10
minRobotNameLength = "2"
maxRobotNameLength = "30"
)
const (
@ -20,30 +23,18 @@ const (
RobotTypeYeet = 2
)
func GetRobotTypeString(t uint8) string {
switch t {
case RobotTypeRex:
return "rex"
case RobotTypeYeet:
return "yeet"
default:
return "unknown"
}
}
const (
SSESentCmdUpdateRobotStatus = 1
SSESentCmdAddUnauthorizedRobot = 2
SSESentCmdAddRobot = 3
SSESentCmdRemoveUnauthorizedRobot = 4
SSESentCmdRemoveRobot = 5
SSESentCmdRobotUpdated = 6
)
func GetRobotStatusString(s uint8) string {
switch s {
case RobotStatusIdle:
return "idle"
case RobotStatusRunning:
return "running"
case RobotStatusConnecting:
return "connecting"
case RobotStatusError:
return "error"
case RobotStatusOffline:
return "offline"
default:
return "unknown"
var (
generalRules = map[string]string{
"Name": "required,min=" + minRobotNameLength + ",max=" + maxRobotNameLength,
"Type": "required,numeric",
}
}
)

29
modules/utils/utils.go Normal file
View File

@ -0,0 +1,29 @@
package utils
func GetRobotTypeString(t uint8) string {
switch t {
case RobotTypeRex:
return "rex"
case RobotTypeYeet:
return "yeet"
default:
return "unknown"
}
}
func GetRobotStatusString(s uint8) string {
switch s {
case RobotStatusIdle:
return "idle"
case RobotStatusRunning:
return "running"
case RobotStatusConnecting:
return "connecting"
case RobotStatusError:
return "error"
case RobotStatusOffline:
return "offline"
default:
return "unknown"
}
}

View File

@ -1,5 +1,12 @@
package utils
func ValidatorInit() {
import (
"jannex/robot-control-manager/modules/structs"
"git.ex.umbach.dev/Alex/roese-utils/rsvalidator"
)
func ValidatorInit() {
rsvalidator.Validate.RegisterStructValidationMapRules(generalRules,
structs.UpdateRobotBody{})
}

View File

@ -4,6 +4,7 @@ import (
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/database"
"jannex/robot-control-manager/modules/logger"
"jannex/robot-control-manager/modules/robot"
"jannex/robot-control-manager/modules/structs"
"jannex/robot-control-manager/modules/utils"
"time"
@ -60,28 +61,33 @@ func FirstRequest(c *fiber.Ctx) error {
now := time.Now()
newUnauthorizedRobot := structs.UnauthorizedRobot{
Id: body.Id,
Type: body.Type,
Address: c.IP(),
ConnectedAt: now,
CreatedAt: now,
Id: body.Id,
Type: body.Type,
Address: c.IP(),
FirmwareVersion: body.FirmwareVersion,
ConnectedAt: now,
CreatedAt: now,
}
cache.AddUnauthorizedRobot(&newUnauthorizedRobot)
logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdAddUnauthorizedRobot,
Body: &newUnauthorizedRobot,
})
// TODO: send robot to sse
logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
} else {
newRobot := structs.Robot{
Id: body.Id,
Type: body.Type,
Name: foundRobot.Name,
Status: utils.RobotStatusIdle,
Address: c.IP(),
CurrentJobId: "",
ConnectedAt: time.Now(),
CreatedAt: foundRobot.CreatedAt,
Id: body.Id,
Type: body.Type,
Name: foundRobot.Name,
Status: utils.RobotStatusIdle,
Address: c.IP(),
CurrentJobId: "",
FirmwareVersion: body.FirmwareVersion,
ConnectedAt: time.Now(),
CreatedAt: foundRobot.CreatedAt,
}
cache.AddRobot(&newRobot)
@ -89,13 +95,18 @@ func FirstRequest(c *fiber.Ctx) error {
database.DB.Model(&structs.Robot{}).
Where("id = ?", newRobot.Id).
Updates(structs.Robot{
Status: newRobot.Status,
Address: newRobot.Address,
Status: newRobot.Status,
Type: newRobot.Type,
Address: newRobot.Address,
FirmwareVersion: newRobot.FirmwareVersion,
})
logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdAddRobot,
Body: &newRobot,
})
// TODO: send robot to sse
logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
}
return c.JSON(structs.StatusResponse{Status: "ok"})
@ -143,13 +154,14 @@ func AuthorizeRobot(c *fiber.Ctx) error {
}
newRobot := structs.Robot{
Id: params.RobotId,
Type: unauthorizedRobot.Type,
Name: uuid.New().String(),
Address: unauthorizedRobot.Address,
Status: utils.RobotStatusIdle,
ConnectedAt: unauthorizedRobot.ConnectedAt,
CreatedAt: unauthorizedRobot.CreatedAt,
Id: params.RobotId,
Type: unauthorizedRobot.Type,
Name: uuid.New().String(),
Address: unauthorizedRobot.Address,
Status: utils.RobotStatusIdle,
FirmwareVersion: unauthorizedRobot.FirmwareVersion,
ConnectedAt: unauthorizedRobot.ConnectedAt,
CreatedAt: unauthorizedRobot.CreatedAt,
}
cache.AddRobot(&newRobot)
@ -157,6 +169,11 @@ func AuthorizeRobot(c *fiber.Ctx) error {
cache.RemoveUnauthorizedRobotById(params.RobotId)
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdAddRobot,
Body: &newRobot,
})
logger.AddSystemLog("Robot authorized with id %v and type %v", params.RobotId, utils.GetRobotTypeString(unauthorizedRobot.Type))
return c.SendStatus(fiber.StatusOK)
@ -192,6 +209,11 @@ func DeleteRobot(c *fiber.Ctx) error {
database.DB.Delete(&structs.Robot{}, "id = ?", params.RobotId)
cache.RemoveRobotById(params.RobotId)
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdRemoveRobot,
Body: params.RobotId,
})
logger.AddSystemLog("Robot deleted with id %v", params.RobotId)
return c.SendStatus(fiber.StatusOK)
@ -226,7 +248,69 @@ func DenyUnauthorizedRobot(c *fiber.Ctx) error {
cache.RemoveUnauthorizedRobotById(params.RobotId)
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdRemoveUnauthorizedRobot,
Body: params.RobotId,
})
logger.AddSystemLog("Unauthorized robot denied with id %v", params.RobotId)
return c.SendStatus(fiber.StatusOK)
}
func UpdateRobot(c *fiber.Ctx) error {
// swagger:operation PATCH /robot robot robotUpdate
// ---
// summary: Update robot.
// consumes:
// - application/json
// produces:
// - application/json
// parameters:
// - in: body
// name: body
// description: Update robot body.
// required: true
// schema:
// "$ref": "#/definitions/Robot"
// responses:
// "200":
// description: Robot updated
// "400":
// description: Invalid request body
// "422":
// description: Robot not found
var body structs.UpdateRobotBody
if err := rsutils.BodyParserHelper(c, &body); err != nil {
return c.SendStatus(fiber.StatusBadRequest)
}
if !cache.IsRobotInList(body.RobotId) {
logger.AddSystemLog("Robot with id %v not found", body.RobotId)
return c.SendStatus(fiber.StatusUnprocessableEntity)
}
foundRobot := cache.GetRobotById(body.RobotId)
if foundRobot == nil || cache.IsRobotNameInList(body.Name) {
return c.SendStatus(fiber.StatusUnprocessableEntity)
}
foundRobot.Name = body.Name
database.DB.Model(&structs.Robot{}).
Where("id = ?", foundRobot.Id).
Update("name", foundRobot.Name)
robot.BroadcastSSEMessage(structs.SSEMessage{
Cmd: utils.SSESentCmdRobotUpdated,
Body: body,
})
logger.AddSystemLog("Robot with id %v name changed to %v", body.RobotId, body.Name)
return c.SendStatus(fiber.StatusOK)
}

52
routers/api/v1/sse/sse.go Normal file
View File

@ -0,0 +1,52 @@
package sse
import (
"bufio"
"fmt"
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/config"
"jannex/robot-control-manager/modules/structs"
"github.com/gofiber/fiber/v2"
"github.com/valyala/fasthttp"
)
func SSE(c *fiber.Ctx) error {
if !config.Cfg.SSEServerEnabled {
return c.SendStatus(fiber.StatusNotFound)
}
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
var sseclient structs.SSEClient
sseclient.MessageChannel = make(chan structs.SSEClientChannelMessage)
cache.AddSSEClient(sseclient)
for message := range sseclient.MessageChannel {
fmt.Fprintf(w, "data: %s\n\n", message.Message)
err := w.Flush()
if err != nil {
// Refreshing page in web browser will establish a new
// SSE connection, but only (the last) one is alive, so
// dead connections must be closed here.
for id, sseClient := range cache.GetSSEClients() {
if id == message.ClientId {
close(sseClient.MessageChannel)
cache.DeleteSSEClient(id)
}
}
}
}
}))
return nil
}

View File

@ -5,6 +5,7 @@ import (
"jannex/robot-control-manager/routers/api/v1/permitjoin"
"jannex/robot-control-manager/routers/api/v1/robot"
"jannex/robot-control-manager/routers/api/v1/robots"
"jannex/robot-control-manager/routers/api/v1/sse"
"github.com/gofiber/fiber/v2"
)
@ -17,6 +18,7 @@ func SetupRoutes(app *fiber.App) {
r.Post("/authorize/:robotId", robot.AuthorizeRobot)
r.Delete("/:robotId", robot.DeleteRobot)
r.Delete("/deny/:robotId", robot.DenyUnauthorizedRobot)
r.Patch("/", robot.UpdateRobot)
rs := v1.Group("/robots")
rs.Get("/", robots.GetRobots)
@ -31,5 +33,7 @@ func SetupRoutes(app *fiber.App) {
pj := v1.Group("/permitjoin")
pj.Post("/:enabled", permitjoin.SetPermitJoin)
v1.Get("/sse", sse.SSE)
app.Static("/", "./public/")
}

View File

@ -8,15 +8,15 @@ robot_control_server_url = 'http://localhost:50055/v1'
class RexRobot:
def __init__(self):
self.id = "B24"
self.version = "0.0.1"
self.id = "B29"
self.firmwareVersion = "0.0.1"
self.currentJobId = ""
# connecting with robot server
print("connecting with robot server")
res = requests.api.post(robot_control_server_url + "/robot",
json={'id': self.id, 'type': 1, 'version': self.version})
json={'id': self.id, 'type': 1, 'firmwareVersion': self.firmwareVersion})
if res.status_code == 403:
print("permit join disabled")
@ -48,4 +48,4 @@ def ping():
return jsonify({'status': 'ok'})
if __name__ == '__main__':
app.run(debug=False)
app.run(debug=False, port=5000)