robot handling

main
alex 2023-10-12 22:20:41 +02:00
parent 38caf71144
commit 6f98311916
12 changed files with 278 additions and 109 deletions

View File

@ -19,6 +19,7 @@ import (
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/config"
"jannex/robot-control-manager/modules/database"
"jannex/robot-control-manager/modules/robot"
"jannex/robot-control-manager/modules/utils"
"jannex/robot-control-manager/routers/router"
"os"
@ -47,7 +48,7 @@ MARIADB_PASSWORD=db_password
MARIADB_DATABASE_NAME=db_database_name`)
config.LoadConfig()
rslogger.InitLogger(config.Cfg.Debug, config.Cfg.ColorizedOutput, config.Cfg.LogManagerServerUrl)
rslogger.InitLogger(false, config.Cfg.ColorizedOutput, config.Cfg.LogManagerServerUrl)
if os.Getenv("DOCKER") != "" {
fmt.Println("Waiting for mariadb docker")
@ -74,6 +75,12 @@ func main() {
// TODO: Remove this
cache.SetPermitJoin(true)
robot.LoadRobotsFromDatabase()
fmt.Println("loaded robots", cache.GetAllRobots())
go robot.RobotPingHandler()
rcmlogger.AddSystemLog("Server started")
app.Listen(config.Cfg.Host + ":" + config.Cfg.Port)

View File

@ -15,6 +15,15 @@ func AddRobot(robot *structs.Robot) {
robots[robot.Id] = robot
}
func AddRobots(newRobots []structs.Robot) {
rMu.Lock()
defer rMu.Unlock()
for _, r := range newRobots {
robots[r.Id] = &r
}
}
func GetRobots() map[string]*structs.Robot {
rMu.RLock()
defer rMu.RUnlock()

View File

@ -2,16 +2,12 @@ package logger
import (
"fmt"
"jannex/robot-control-manager/modules/config"
"git.ex.umbach.dev/Alex/roese-utils/rslogger"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func AddSystemLog(format string, v ...any) {
log.Info().Msgf("serverurl %v", config.Cfg.LogManagerServerUrl)
go rslogger.LogManagerRequestClient(fiber.MethodPost, rslogger.LogManagerRequestBody{
Type: "robot-control-manager",
Logs: []string{"I " + rslogger.GetTime() + fmt.Sprintf(format, v...)}})

View File

@ -0,0 +1,45 @@
package request
import (
"encoding/json"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func Request(requestMethod string, url string, body any) error {
a := fiber.AcquireAgent()
req := a.Request()
req.Header.SetMethod(requestMethod)
addr := "http://" + url
req.SetRequestURI(addr)
req.Header.SetContentType("application/json")
reqestBodyBytes, err := json.Marshal(body)
if err != nil {
log.Error().Msgf("Failed to marshal request body, err: %s", err)
return err
}
req.SetBody(reqestBodyBytes)
if err := a.Parse(); err != nil {
log.Error().Msgf("Failed to parse request, err: %s", err)
return err
}
code, body, errB := a.Bytes()
if len(errB) != 0 {
log.Error().Msgf("Failed to parse bytes %v", errB)
return errB[0]
}
log.Info().Msgf("code %v body %v", code, body)
return nil
}

83
modules/robot/robot.go Normal file
View File

@ -0,0 +1,83 @@
package robot
import (
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/database"
"jannex/robot-control-manager/modules/logger"
"jannex/robot-control-manager/modules/request"
"jannex/robot-control-manager/modules/structs"
"jannex/robot-control-manager/modules/utils"
"time"
"github.com/gofiber/fiber/v2"
)
func RobotPingHandler() {
ticker := time.NewTicker(utils.RobotPingHandlerInterval * time.Second)
for range ticker.C {
for _, robot := range cache.GetRobots() {
if robot.Status == utils.RobotStatusOffline {
continue
}
err := request.Request(fiber.MethodGet, robot.Address+":5000/api/v1/ping", nil)
if err != nil {
logger.AddSystemLog("Robot ping request failed: %v", err)
if robot.PingRetries == utils.RobotPingRetries {
UpdateRobotStatus(robot, utils.RobotStatusOffline)
logger.AddSystemLog("Robot %s marked as offline because %v attempts have already been made to reach it", robot.Name, utils.RobotPingRetries)
cache.RemoveRobotById(robot.Id)
continue
}
robot.PingRetries++
logger.AddSystemLog("Trying to ping robot %s for %v times failed", robot.Name, robot.PingRetries)
if robot.Status != utils.RobotStatusConnecting {
UpdateRobotStatus(robot, utils.RobotStatusConnecting)
}
continue
}
if robot.Status == utils.RobotStatusConnecting {
UpdateRobotStatus(robot, utils.RobotStatusIdle)
}
if robot.PingRetries != 0 {
robot.PingRetries = 0
}
}
}
}
func LoadRobotsFromDatabase() {
var robots []structs.Robot
database.DB.Find(&robots)
cache.AddRobots(robots)
for _, robot := range cache.GetRobots() {
if robot.Status == utils.RobotStatusOffline {
continue
}
UpdateRobotStatus(robot, utils.RobotStatusConnecting)
}
}
func UpdateRobotStatus(robot *structs.Robot, status uint8) {
robot.Status = status
database.DB.Model(&structs.Robot{}).
Where("id = ?", robot.Id).
Update("status", status)
logger.AddSystemLog("Robot %s status changed to %s", robot.Name, utils.GetRobotStatusString(status))
}

View File

@ -1,20 +1,73 @@
package structs
import "time"
import (
"fmt"
"sync"
"time"
)
type Robot struct {
Id string
Type uint8
Name string
Status uint8
Address string `gorm:"-"`
PingRetries uint8 `gorm:"-"`
Address string
CurrentJobId string
CurrentTask string
CurrentJobName string
JobMutex sync.Mutex `gorm:"-"`
JobsWaitingCount int `gorm:"-"`
JobsWaitingNameList []string `gorm:"-"`
LastTaskAt time.Time `gorm:"-"`
ConnectedAt time.Time `gorm:"-"`
CreatedAt time.Time
}
func (r *Robot) CountUpJobsWaiting() {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
r.JobsWaitingCount++
}
func (r *Robot) CountDownJobsWaiting() {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
if r.JobsWaitingCount > 0 {
r.JobsWaitingCount--
}
}
func (r *Robot) ProcessJobTask(jobId string) {
fmt.Println("wait for job", jobId)
if r.CurrentJobId == "" {
r.SetCurrentJobId(jobId)
}
if r.CurrentJobId != "" && r.CurrentJobId != jobId {
r.CountUpJobsWaiting()
}
for r.CurrentJobId != "" && r.CurrentJobId != jobId {
// wait for current job to finish
fmt.Println("job is processing", r.CurrentJobId, jobId, r.JobsWaitingCount)
time.Sleep(2 * time.Second)
}
r.SetCurrentJobId(jobId)
fmt.Println("processing job", jobId)
}
func (r *Robot) SetCurrentJobId(jobId string) {
r.JobMutex.Lock()
defer r.JobMutex.Unlock()
r.CurrentJobId = jobId
}
type UnauthorizedRobot struct {
Id string
Type uint8
@ -53,3 +106,8 @@ type RobotIdParams struct {
type RobotNameParams struct {
RobotName string
}
type RobotFinishBody struct {
RobotName string
JobId string
}

View File

@ -1,15 +1,21 @@
package utils
const (
RobotStatusIdle uint8 = iota
RobotStatusRunning
RobotStatusError
RobotStatusOffline
RobotPingRetries = 3
RobotPingHandlerInterval = 5 // seconds
)
const (
RobotTypeRex uint8 = iota
RobotTypeYeet
RobotStatusIdle = 1
RobotStatusRunning = 2
RobotStatusConnecting = 3
RobotStatusError = 4
RobotStatusOffline = 5
)
const (
RobotTypeRex = 1
RobotTypeYeet = 2
)
func GetRobotTypeString(t uint8) string {
@ -22,3 +28,20 @@ func GetRobotTypeString(t uint8) string {
return "unknown"
}
}
func GetRobotStatusString(s uint8) string {
switch s {
case RobotStatusIdle:
return "idle"
case RobotStatusRunning:
return "running"
case RobotStatusConnecting:
return "connecting"
case RobotStatusError:
return "error"
case RobotStatusOffline:
return "offline"
default:
return "unknown"
}
}

View File

@ -1,18 +1,16 @@
package control
import (
"encoding/json"
"fmt"
"jannex/robot-control-manager/modules/cache"
"jannex/robot-control-manager/modules/request"
"jannex/robot-control-manager/modules/structs"
"git.ex.umbach.dev/Alex/roese-utils/rsutils"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func ControlRex(c *fiber.Ctx) error {
// swagger:operation POST /control/0 control controlRex
// swagger:operation POST /control/1 control controlRex
// ---
// summary: Control Rex.
// description: |
@ -36,77 +34,36 @@ func ControlRex(c *fiber.Ctx) error {
// "422":
// description: Robot not found
log.Info().Msgf("body %v", string(c.Body()))
var body structs.ControlBody
if err := rsutils.BodyParserHelper(c, &body); err != nil {
return c.SendStatus(fiber.StatusBadRequest)
}
log.Info().Msgf("ControlRex: %v", body)
/*
if body.Task.Y == nil {
fmt.Println("y not sent")
} else {
fmt.Printf("y-value: %d\n", *body.Task.Y)
}
} */
robot := cache.GetRobotByName(body.RobotName)
if robot.Id == "" {
if robot == nil {
return c.SendStatus(fiber.StatusUnprocessableEntity)
}
log.Info().Msgf("robot %v", robot)
robot.ProcessJobTask(body.JobId)
// wait until the robot finished the job
for robot.CurrentJobId != "" {
robot.CurrentJobId = body.JobId
log.Debug().Msgf("robot is free %v", body.JobId)
controlRexRequest(robot.Address, body)
break
if err := request.Request(fiber.MethodPost, robot.Address+":5000/api/v1/control", body.Task); err != nil {
return c.JSON(structs.StatusResponse{Status: "err"})
}
return c.SendStatus(fiber.StatusOK)
}
func controlRexRequest(address string, body any) {
a := fiber.AcquireAgent()
// TODO: port only for testing - remove it
url := "http://" + address + ":5000" + "/api/v1/control"
req := a.Request()
req.Header.SetMethod(fiber.MethodPost)
req.SetRequestURI(url)
req.Header.SetContentType("application/json")
log.Info().Msgf("url %s", url)
reqestBodyBytes, err := json.Marshal(body)
if err != nil {
log.Error().Msgf("Failed to marshal request body, err: %s", err)
return
}
req.SetBody(reqestBodyBytes)
if err := a.Parse(); err != nil {
log.Error().Msgf("Failed to parse request, err: %s", err)
return
}
code, body, _ := a.Bytes()
log.Info().Msgf("code %v body %v", code, body)
return c.JSON(structs.StatusResponse{Status: "ok"})
}
func FinishControlRex(c *fiber.Ctx) error {
// swagger:operation POST /control/0/finish/{robotName} control finishControlRex
// swagger:operation POST /control/1/finish control finishControlRex
// ---
// summary: Finish control Rex.
// description: |
@ -123,23 +80,19 @@ func FinishControlRex(c *fiber.Ctx) error {
// "422":
// description: Robot not found
var params structs.RobotNameParams
var body structs.RobotFinishBody
if err := rsutils.ParamsParserHelper(c, &params); err != nil {
if err := rsutils.BodyParserHelper(c, &body); err != nil {
return c.SendStatus(fiber.StatusBadRequest)
}
log.Debug().Msgf("before finish control robot %v", cache.GetRobotByName(params.RobotName).CurrentJobId)
robot := cache.GetRobotByName(body.RobotName)
robot := cache.GetRobotByName(params.RobotName)
if robot.Id == "" {
if robot == nil || robot.CurrentJobId != body.JobId {
return c.SendStatus(fiber.StatusUnprocessableEntity)
}
robot.CurrentJobId = ""
log.Debug().Msgf("finish control robot %v", cache.GetRobotByName(params.RobotName).CurrentJobId)
robot.SetCurrentJobId("")
return c.SendStatus(fiber.StatusOK)
}

View File

@ -7,7 +7,6 @@ import (
"git.ex.umbach.dev/Alex/roese-utils/rsutils"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)
func SetPermitJoin(c *fiber.Ctx) error {
@ -38,15 +37,9 @@ func SetPermitJoin(c *fiber.Ctx) error {
if params.Enabled == 0 {
cache.SetPermitJoin(false)
log.Debug().Msgf("Permit join disabled")
logger.AddSystemLog("Permit join disabled")
} else {
cache.SetPermitJoin(true)
log.Debug().Msgf("Permit join enabled")
logger.AddSystemLog("Permit join enabled")
}

View File

@ -11,7 +11,6 @@ import (
"git.ex.umbach.dev/Alex/roese-utils/rsutils"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
func FirstRequest(c *fiber.Ctx) error {
@ -52,8 +51,6 @@ func FirstRequest(c *fiber.Ctx) error {
database.DB.First(&foundRobot, "id = ?", body.Id)
if !cache.IsPermitJoinEnabled() && foundRobot.Id == "" {
log.Info().Msgf("Permit join is enabled")
logger.AddSystemLog("Unauthorized robot tried to connect with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
return c.SendStatus(fiber.StatusForbidden)
@ -72,8 +69,6 @@ func FirstRequest(c *fiber.Ctx) error {
cache.AddUnauthorizedRobot(&newUnauthorizedRobot)
log.Debug().Msgf("Added unauthorized robot %s (%v)", body.Id, body.Type)
logger.AddSystemLog("Unauthorized robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
// TODO: send robot to sse
@ -85,22 +80,24 @@ func FirstRequest(c *fiber.Ctx) error {
Status: utils.RobotStatusIdle,
Address: c.IP(),
CurrentJobId: "",
CurrentTask: "",
ConnectedAt: time.Now(),
CreatedAt: foundRobot.CreatedAt,
}
cache.AddRobot(&newRobot)
log.Debug().Msgf("Added robot %s (%v)", body.Id, body.Type)
database.DB.Model(&structs.Robot{}).
Where("id = ?", newRobot.Id).
Updates(structs.Robot{
Status: newRobot.Status,
Address: newRobot.Address,
})
logger.AddSystemLog("Robot connected with id %v and type %v", body.Id, utils.GetRobotTypeString(body.Type))
// TODO: send robot to sse
}
log.Debug().Msgf("robots %v", cache.GetUnauthorizedRobots())
return c.JSON(structs.StatusResponse{Status: "ok"})
}
@ -134,6 +131,8 @@ func AuthorizeRobot(c *fiber.Ctx) error {
}
if !cache.IsUnauthorizedRobotInList(params.RobotId) {
logger.AddSystemLog("Unauthorized robot with id %v not found", params.RobotId)
return c.SendStatus(fiber.StatusUnprocessableEntity)
}
@ -158,8 +157,6 @@ func AuthorizeRobot(c *fiber.Ctx) error {
cache.RemoveUnauthorizedRobotById(params.RobotId)
log.Info().Msgf("Authorize robot %s", params.RobotId)
logger.AddSystemLog("Robot authorized with id %v and type %v", params.RobotId, utils.GetRobotTypeString(unauthorizedRobot.Type))
return c.SendStatus(fiber.StatusOK)

View File

@ -22,8 +22,8 @@ func SetupRoutes(app *fiber.App) {
rs.Get("/", robots.GetRobots)
c := v1.Group("/control")
c.Post("/0", control.ControlRex)
c.Post("/0/finish/:robotName", control.FinishControlRex)
c.Post("/1", control.ControlRex)
c.Post("/1/finish", control.FinishControlRex)
pj := v1.Group("/permitjoin")
pj.Post("/:enabled", permitjoin.SetPermitJoin)

View File

@ -16,7 +16,7 @@ class RexRobot:
print("connecting with robot server")
res = requests.api.post(robot_control_server_url + "/robot",
json={'id': self.id, 'type': 0, 'version': self.version})
json={'id': self.id, 'type': 1, 'version': self.version})
if res.status_code == 403:
print("permit join disabled")
@ -42,5 +42,10 @@ def hello():
return jsonify({'status': 'ok'})
@app.route('/api/v1/ping', methods=['GET'])
def ping():
print("pong")
return jsonify({'status': 'ok'})
if __name__ == '__main__':
app.run(debug=False)