support for required inputs to continue to the next task step
parent
af43378d70
commit
18b7d63363
|
@ -67,17 +67,17 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type RunGroupTaskArgs struct {
|
type RunGroupTaskArgs struct {
|
||||||
StartType uint8
|
StartType uint8
|
||||||
GroupTaskId string
|
GroupTaskId string
|
||||||
Category string
|
Category string
|
||||||
GroupId string
|
GroupId string
|
||||||
Step uint8
|
Step uint8
|
||||||
TaskStepId string
|
TaskStepId string
|
||||||
GlobalInputs string
|
GlobalInputs string
|
||||||
Parameters string
|
TaskParameters string
|
||||||
}
|
}
|
||||||
|
|
||||||
type GlobalInputParameters struct {
|
type InputParameters struct {
|
||||||
ParameterName string `json:"parameterName"`
|
ParameterName string `json:"parameterName"`
|
||||||
Value string `json:"value"`
|
Value string `json:"value"`
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,7 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
StartedAt: time.Now(),
|
StartedAt: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// task type
|
||||||
if args.StartType == RunGroupTaskStartTypeNormal {
|
if args.StartType == RunGroupTaskStartTypeNormal {
|
||||||
groupTaskStep.Id = uuid.New().String()
|
groupTaskStep.Id = uuid.New().String()
|
||||||
|
|
||||||
|
@ -110,36 +111,15 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
groupTaskStep.Id = args.TaskStepId
|
groupTaskStep.Id = args.TaskStepId
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
updateGroupTaskSteps(groupTaskStep)
|
||||||
|
|
||||||
/*
|
|
||||||
database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTaskStep,
|
|
||||||
Body: groupTaskStep,
|
|
||||||
}) */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// set group task to running
|
// set group task to running
|
||||||
dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
||||||
Status: structs.GroupTasksStatusRunning,
|
Status: structs.GroupTasksStatusRunning,
|
||||||
})
|
})
|
||||||
/*
|
|
||||||
database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTaskStep.GroupTasksId).Updates(structs.GroupTasks{
|
|
||||||
Status: structs.GroupTasksStatusRunning,
|
|
||||||
})
|
|
||||||
|
|
||||||
var dbGroupTask structs.GroupTasks
|
// global inputs
|
||||||
|
var globalInputParameters []InputParameters
|
||||||
database.DB.First(&dbGroupTask, "id = ?", groupTaskStep.GroupTasksId)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTask,
|
|
||||||
Body: dbGroupTask,
|
|
||||||
})*/
|
|
||||||
|
|
||||||
// check task parameters
|
|
||||||
var globalInputParameters []GlobalInputParameters
|
|
||||||
|
|
||||||
if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created
|
if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created
|
||||||
log.Info().Msgf("global inputs given %s", args.GlobalInputs)
|
log.Info().Msgf("global inputs given %s", args.GlobalInputs)
|
||||||
|
@ -155,10 +135,13 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// task parameters
|
||||||
log.Info().Msgf("unmarshalled global inputs %s", globalInputParameters)
|
log.Info().Msgf("unmarshalled global inputs %s", globalInputParameters)
|
||||||
log.Info().Msgf("task parameters %s", categoryGroup.Tasks[args.Step-1].Parameters)
|
log.Info().Msgf("task parameters %s", categoryGroup.Tasks[args.Step-1].Parameters)
|
||||||
|
|
||||||
if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.Parameters) == 0 {
|
commandArgs := []string{root + categoryGroup.Id + "/" + categoryGroup.Tasks[args.Step-1].ScriptPath}
|
||||||
|
|
||||||
|
if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.TaskParameters) == 0 {
|
||||||
log.Error().Msg("task parameters not specified")
|
log.Error().Msg("task parameters not specified")
|
||||||
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
||||||
|
@ -169,10 +152,25 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
updateGroupTaskSteps(groupTaskStep)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
var taskParameterInputs []InputParameters
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(args.TaskParameters), &taskParameterInputs); err != nil {
|
||||||
|
log.Error().Msgf("err unmarshalling task parameter inputs %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("task parameters %s", taskParameterInputs)
|
||||||
|
|
||||||
|
for _, taskParameterInput := range taskParameterInputs {
|
||||||
|
//commandArgs = append(commandArgs, "--"+taskParameterInput.ParameterName)
|
||||||
|
commandArgs = append(commandArgs, taskParameterInput.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("task parameters %s", commandArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute script
|
// execute script
|
||||||
cmd, err := exec.Command("python3", root+categoryGroup.Id+"/"+categoryGroup.Tasks[args.Step-1].ScriptPath).Output()
|
cmd, err := exec.Command("python3", commandArgs...).Output()
|
||||||
|
|
||||||
cmdLog := string(cmd)
|
cmdLog := string(cmd)
|
||||||
|
|
||||||
|
@ -196,32 +194,11 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
|
|
||||||
updateGroupTaskSteps(groupTaskStep)
|
updateGroupTaskSteps(groupTaskStep)
|
||||||
|
|
||||||
/*
|
|
||||||
database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTaskStep,
|
|
||||||
Body: groupTaskStep,
|
|
||||||
}) */
|
|
||||||
|
|
||||||
log.Info().Msgf("run next task")
|
log.Info().Msgf("run next task")
|
||||||
|
|
||||||
if int(args.Step) < len(categoryGroup.Tasks) {
|
if int(args.Step) < len(categoryGroup.Tasks) {
|
||||||
if groupTaskStep.Status == structs.GroupTasksStatusFailed {
|
if groupTaskStep.Status == structs.GroupTasksStatusFailed {
|
||||||
// set group task to failed
|
// set group task to failed
|
||||||
/*database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTaskStep.GroupTasksId).Updates(structs.GroupTasks{
|
|
||||||
Status: structs.GroupTasksStatusFailed,
|
|
||||||
})
|
|
||||||
|
|
||||||
var dbGroupTask structs.GroupTasks
|
|
||||||
|
|
||||||
database.DB.First(&dbGroupTask, "id = ?", groupTaskStep.GroupTasksId)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTask,
|
|
||||||
Body: dbGroupTask,
|
|
||||||
}) */
|
|
||||||
|
|
||||||
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
||||||
Status: structs.GroupTasksStatusFailed,
|
Status: structs.GroupTasksStatusFailed,
|
||||||
})
|
})
|
||||||
|
@ -233,20 +210,11 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
CurrentTasksStep: args.Step,
|
CurrentTasksStep: args.Step,
|
||||||
})
|
})
|
||||||
|
|
||||||
/*database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTaskStep.GroupTasksId).Updates(structs.GroupTasks{
|
|
||||||
CurrentTasksStep: args.Step,
|
|
||||||
})
|
|
||||||
|
|
||||||
var dbGroupTask structs.GroupTasks
|
|
||||||
|
|
||||||
database.DB.First(&dbGroupTask, "id = ?", groupTaskStep.GroupTasksId)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTask,
|
|
||||||
Body: dbGroupTask,
|
|
||||||
})*/
|
|
||||||
|
|
||||||
log.Debug().Msgf("RUN NEXT TASK %s", groupTaskStep)
|
log.Debug().Msgf("RUN NEXT TASK %s", groupTaskStep)
|
||||||
|
|
||||||
|
// clear task parameters, because otherwise the next task would have the parameters from the previous task
|
||||||
|
args.TaskParameters = ""
|
||||||
|
|
||||||
RunGroupTask(args)
|
RunGroupTask(args)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -256,21 +224,6 @@ func RunGroupTask(args RunGroupTaskArgs) {
|
||||||
EndedAt: time.Now(),
|
EndedAt: time.Now(),
|
||||||
})
|
})
|
||||||
|
|
||||||
/*
|
|
||||||
database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTaskStep.GroupTasksId).Updates(structs.GroupTasks{
|
|
||||||
Status: structs.GroupTasksStatusFinished,
|
|
||||||
EndedAt: time.Now(),
|
|
||||||
})
|
|
||||||
|
|
||||||
var dbGroupTask structs.GroupTasks
|
|
||||||
|
|
||||||
database.DB.First(&dbGroupTask, "id = ?", groupTaskStep.GroupTasksId)
|
|
||||||
|
|
||||||
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
||||||
Cmd: utils.SentCmdUpdateGroupTask,
|
|
||||||
Body: dbGroupTask,
|
|
||||||
})*/
|
|
||||||
|
|
||||||
log.Info().Msg("SET TO FINISHED")
|
log.Info().Msg("SET TO FINISHED")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ const (
|
||||||
const (
|
const (
|
||||||
ReceivedCmdStartGroupTasks = 1
|
ReceivedCmdStartGroupTasks = 1
|
||||||
ReceivedCmdTaskFailedTryAgainRunTaskStep = 2
|
ReceivedCmdTaskFailedTryAgainRunTaskStep = 2
|
||||||
|
ReceivedCmdTaskContinueTaskStep = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -132,6 +132,35 @@ func RunHub() {
|
||||||
Step: uint8(receivedMessage.Body["step"].(float64)),
|
Step: uint8(receivedMessage.Body["step"].(float64)),
|
||||||
TaskStepId: receivedMessage.Body["taskStepId"].(string),
|
TaskStepId: receivedMessage.Body["taskStepId"].(string),
|
||||||
})
|
})
|
||||||
|
break
|
||||||
|
case utils.ReceivedCmdTaskContinueTaskStep:
|
||||||
|
log.Info().Msgf("task continue task %s", receivedMessage.Body)
|
||||||
|
|
||||||
|
taskInputs := receivedMessage.Body["taskInputs"]
|
||||||
|
|
||||||
|
var taskInputsJsonString string
|
||||||
|
|
||||||
|
if taskInputs != nil {
|
||||||
|
jsonString, err := json.Marshal(taskInputs)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Msgf("Failed to marshal task inputs %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
taskInputsJsonString = string(jsonString)
|
||||||
|
}
|
||||||
|
|
||||||
|
go grouptasks.RunGroupTask(grouptasks.RunGroupTaskArgs{
|
||||||
|
StartType: grouptasks.RunGroupTaskStartTypeTryAgain,
|
||||||
|
GroupTaskId: receivedMessage.Body["groupTaskId"].(string),
|
||||||
|
Category: receivedMessage.Body["category"].(string),
|
||||||
|
GroupId: receivedMessage.Body["groupId"].(string),
|
||||||
|
Step: uint8(receivedMessage.Body["step"].(float64)),
|
||||||
|
TaskStepId: receivedMessage.Body["taskStepId"].(string),
|
||||||
|
TaskParameters: taskInputsJsonString,
|
||||||
|
})
|
||||||
|
|
||||||
break
|
break
|
||||||
default:
|
default:
|
||||||
log.Error().Msgf("Received unknown message: %s", receivedMessage)
|
log.Error().Msgf("Received unknown message: %s", receivedMessage)
|
||||||
|
|
Loading…
Reference in New Issue