running tasks
parent
046b2e0bfd
commit
ec1653eee3
|
@ -1,323 +0,0 @@
|
||||||
package grouptasks
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"janex/admin-dashboard-backend/modules/cache"
|
|
||||||
"janex/admin-dashboard-backend/modules/database"
|
|
||||||
"janex/admin-dashboard-backend/modules/structs"
|
|
||||||
"janex/admin-dashboard-backend/modules/utils"
|
|
||||||
"janex/admin-dashboard-backend/socketclients"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
var root = "./groupTasks/groups/"
|
|
||||||
|
|
||||||
func LoadGroups(category string) {
|
|
||||||
entries, err := os.ReadDir(root)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Msg("Failed to read groups directory, error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if category != "" {
|
|
||||||
cache.RemoveAllCategoryGroupsByCategory(category)
|
|
||||||
}
|
|
||||||
|
|
||||||
var updatedGroups []structs.Group
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
|
||||||
files, err := os.ReadDir(root + entry.Name())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Msg("Failed to read groups directory files, error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, file := range files {
|
|
||||||
if file.Name() == "index.json" {
|
|
||||||
content, err := os.ReadFile(root + entry.Name() + "/index.json")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Msg("Failed to read file content, error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var group structs.Group
|
|
||||||
|
|
||||||
json.Unmarshal(content, &group)
|
|
||||||
|
|
||||||
group.Id = entry.Name()
|
|
||||||
|
|
||||||
if category == "" || group.Category == category {
|
|
||||||
cache.AddCategoryGroup(group)
|
|
||||||
|
|
||||||
updatedGroups = append(updatedGroups, group)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if category != "" {
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdGroupTasksReloaded,
|
|
||||||
Body: struct {
|
|
||||||
Category string
|
|
||||||
UpdatedGroups []structs.Group
|
|
||||||
}{
|
|
||||||
Category: category,
|
|
||||||
UpdatedGroups: updatedGroups,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
RunGroupTaskStartTypeNormal = 0
|
|
||||||
RunGroupTaskStartTypeTryAgain = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
type RunGroupTaskArgs struct {
|
|
||||||
CreatorUserId string
|
|
||||||
StartType uint8
|
|
||||||
GroupTaskId string
|
|
||||||
Category string
|
|
||||||
GroupId string
|
|
||||||
Step uint8
|
|
||||||
TaskStepId string
|
|
||||||
GlobalInputs string
|
|
||||||
TaskInputs string
|
|
||||||
}
|
|
||||||
|
|
||||||
type InputParameters struct {
|
|
||||||
ParameterName string `json:"parameterName"`
|
|
||||||
Value string `json:"value"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunGroupTask(args RunGroupTaskArgs) {
|
|
||||||
categoryGroup := GetCategoryGroupTaskByCategoryAndGroupId(args.Category, args.GroupId)
|
|
||||||
|
|
||||||
groupTaskStep := structs.GroupTaskSteps{
|
|
||||||
CreatorUserId: args.CreatorUserId,
|
|
||||||
GroupTasksId: args.GroupTaskId,
|
|
||||||
Step: args.Step,
|
|
||||||
Status: utils.GroupTasksStatusRunning,
|
|
||||||
Inputs: args.TaskInputs,
|
|
||||||
StartedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// task type
|
|
||||||
if args.StartType == RunGroupTaskStartTypeNormal {
|
|
||||||
groupTaskStep.Id = uuid.New().String()
|
|
||||||
|
|
||||||
database.DB.Create(&groupTaskStep)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdNewGroupTaskStep,
|
|
||||||
Body: groupTaskStep,
|
|
||||||
})
|
|
||||||
} else if args.StartType == RunGroupTaskStartTypeTryAgain {
|
|
||||||
groupTaskStep.Id = args.TaskStepId
|
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set group task to running
|
|
||||||
dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
||||||
Status: utils.GroupTasksStatusRunning,
|
|
||||||
})
|
|
||||||
|
|
||||||
// global inputs
|
|
||||||
var globalInputParameters []InputParameters
|
|
||||||
|
|
||||||
if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created
|
|
||||||
if err := json.Unmarshal([]byte(args.GlobalInputs), &globalInputParameters); err != nil {
|
|
||||||
log.Error().Msgf("err unmarshalling global inputs %s", err.Error())
|
|
||||||
}
|
|
||||||
} else { // global inputs not given in args - fetch it from the database
|
|
||||||
if err := json.Unmarshal([]byte(dbGroupTask.GlobalInputs), &globalInputParameters); err != nil {
|
|
||||||
log.Error().Msgf("err unmarshalling global inputs %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// task parameters
|
|
||||||
log.Debug().Msgf("script path %s", categoryGroup.Tasks[args.Step-1].ScriptPath)
|
|
||||||
commandArgs := []string{root + categoryGroup.Id + "/" + categoryGroup.Tasks[args.Step-1].ScriptPath}
|
|
||||||
|
|
||||||
if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.TaskInputs) == 0 {
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
||||||
Status: utils.GroupTasksStatusInputRequired,
|
|
||||||
})
|
|
||||||
|
|
||||||
groupTaskStep.Status = utils.GroupTasksStatusInputRequired
|
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
|
||||||
return
|
|
||||||
} else if len(args.TaskInputs) > 0 {
|
|
||||||
var taskParameterInputs []InputParameters
|
|
||||||
|
|
||||||
if err := json.Unmarshal([]byte(args.TaskInputs), &taskParameterInputs); err != nil {
|
|
||||||
log.Error().Msgf("err unmarshalling task parameter inputs %s", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, taskParameterInput := range taskParameterInputs {
|
|
||||||
commandArgs = append(commandArgs, taskParameterInput.Value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// execute script
|
|
||||||
cmd, err := exec.Command("python3", commandArgs...).Output()
|
|
||||||
|
|
||||||
cmdLog := string(cmd)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
||||||
exitCode := exitErr.ExitCode()
|
|
||||||
log.Error().Msgf("exit code %d", exitCode)
|
|
||||||
cmdLog += fmt.Sprintf("\nExit code: %v", exitCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Error().Msgf("error exec command %s", err.Error())
|
|
||||||
groupTaskStep.Status = utils.GroupTasksStatusFailed
|
|
||||||
} else {
|
|
||||||
groupTaskStep.Status = utils.GroupTasksStatusFinished
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(cmdLog)
|
|
||||||
|
|
||||||
groupTaskStep.Log = cmdLog
|
|
||||||
groupTaskStep.EndedAt = time.Now()
|
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
|
||||||
|
|
||||||
if int(args.Step) < len(categoryGroup.Tasks) {
|
|
||||||
if groupTaskStep.Status == utils.GroupTasksStatusFailed {
|
|
||||||
// set group task to failed
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
||||||
Status: utils.GroupTasksStatusFailed,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
args.StartType = RunGroupTaskStartTypeNormal
|
|
||||||
args.Step = args.Step + 1
|
|
||||||
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
||||||
CurrentTasksStep: args.Step,
|
|
||||||
})
|
|
||||||
|
|
||||||
// clear task parameters, because otherwise the next task would have the parameters from the previous task
|
|
||||||
args.TaskInputs = ""
|
|
||||||
|
|
||||||
RunGroupTask(args)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// set group task to finished
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
||||||
Status: utils.GroupTasksStatusFinished,
|
|
||||||
EndedAt: time.Now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updates group task and send it to websocket users
|
|
||||||
func updateGroupTask(groupTasksId string, updates structs.GroupTasks) structs.GroupTasks {
|
|
||||||
database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTasksId).Updates(updates)
|
|
||||||
|
|
||||||
var dbGroupTask structs.GroupTasks
|
|
||||||
|
|
||||||
database.DB.First(&dbGroupTask, "id = ?", groupTasksId)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTask,
|
|
||||||
Body: dbGroupTask,
|
|
||||||
})
|
|
||||||
|
|
||||||
return dbGroupTask
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updates group task steps and send it to websocket users
|
|
||||||
func updateGroupTaskSteps(groupTaskStep structs.GroupTaskSteps) {
|
|
||||||
database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTaskStep,
|
|
||||||
Body: groupTaskStep,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetAllGroupTasks() []structs.GroupTasks {
|
|
||||||
var groupTasks []structs.GroupTasks
|
|
||||||
|
|
||||||
database.DB.Find(&groupTasks)
|
|
||||||
|
|
||||||
return groupTasks
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetAllGroupTasksSteps() []structs.GroupTaskSteps {
|
|
||||||
var groupTaskStepsLogs []structs.GroupTaskSteps
|
|
||||||
|
|
||||||
database.DB.Find(&groupTaskStepsLogs)
|
|
||||||
|
|
||||||
lockedGroupTaskSteps := cache.GetLockedGroupTaskSteps()
|
|
||||||
groupTaskStepsInputs := cache.GetGroupTaskStepsInputs()
|
|
||||||
|
|
||||||
for i, groupTaskStep := range groupTaskStepsLogs {
|
|
||||||
if groupTaskStep.Status == utils.GroupTasksStatusInputRequired {
|
|
||||||
groupTaskStepsLogs[i].Inputs = cache.GetGroupTaskStepsInputsValue(groupTaskStepsInputs, groupTaskStep.GroupTasksId, groupTaskStep.Step)
|
|
||||||
|
|
||||||
for _, lockedGroupTaskStep := range lockedGroupTaskSteps {
|
|
||||||
if groupTaskStep.GroupTasksId == lockedGroupTaskStep.GroupTaskId {
|
|
||||||
groupTaskStepsLogs[i].LockedByUserId = lockedGroupTaskStep.LockedByUserId
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return groupTaskStepsLogs
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetCategoryGroupTaskByCategoryAndGroupId(category string, groupId string) structs.Group {
|
|
||||||
for _, categoryGroup := range cache.GetCategoryGroups() {
|
|
||||||
if categoryGroup.Category == category {
|
|
||||||
for _, group := range categoryGroup.Groups {
|
|
||||||
if group.Id == groupId {
|
|
||||||
return group
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return structs.Group{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartUnlockLockedGroupTaskStepsTicker() {
|
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
|
||||||
|
|
||||||
for range ticker.C {
|
|
||||||
for index, taskStep := range cache.GetLockedGroupTaskSteps() {
|
|
||||||
if time.Since(taskStep.LockedAt).Seconds() > 3 {
|
|
||||||
log.Debug().Msgf("Unlocked task step %v", index)
|
|
||||||
cache.RemoveLockedGroupTaskStep(index)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(
|
|
||||||
structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdTaskUnlocked,
|
|
||||||
Body: struct {
|
|
||||||
GroupTaskId string
|
|
||||||
Step uint8
|
|
||||||
RememberId string
|
|
||||||
}{
|
|
||||||
GroupTaskId: taskStep.GroupTaskId,
|
|
||||||
Step: taskStep.Step,
|
|
||||||
RememberId: taskStep.RememberId,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue