From ec1653eee37d8e0fd2c261bddcc4fbc6404e88ff Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 9 Jun 2023 14:16:27 +0200 Subject: [PATCH] running tasks --- grouptasks/grouptasks.go | 323 --------------------------------------- 1 file changed, 323 deletions(-) delete mode 100644 grouptasks/grouptasks.go diff --git a/grouptasks/grouptasks.go b/grouptasks/grouptasks.go deleted file mode 100644 index 8d40974..0000000 --- a/grouptasks/grouptasks.go +++ /dev/null @@ -1,323 +0,0 @@ -package grouptasks - -import ( - "encoding/json" - "fmt" - "janex/admin-dashboard-backend/modules/cache" - "janex/admin-dashboard-backend/modules/database" - "janex/admin-dashboard-backend/modules/structs" - "janex/admin-dashboard-backend/modules/utils" - "janex/admin-dashboard-backend/socketclients" - "os" - "os/exec" - "time" - - "github.com/google/uuid" - "github.com/rs/zerolog/log" -) - -var root = "./groupTasks/groups/" - -func LoadGroups(category string) { - entries, err := os.ReadDir(root) - - if err != nil { - log.Error().Msg("Failed to read groups directory, error: " + err.Error()) - return - } - - if category != "" { - cache.RemoveAllCategoryGroupsByCategory(category) - } - - var updatedGroups []structs.Group - - for _, entry := range entries { - files, err := os.ReadDir(root + entry.Name()) - - if err != nil { - log.Error().Msg("Failed to read groups directory files, error: " + err.Error()) - return - } - - for _, file := range files { - if file.Name() == "index.json" { - content, err := os.ReadFile(root + entry.Name() + "/index.json") - - if err != nil { - log.Error().Msg("Failed to read file content, error: " + err.Error()) - return - } - - var group structs.Group - - json.Unmarshal(content, &group) - - group.Id = entry.Name() - - if category == "" || group.Category == category { - cache.AddCategoryGroup(group) - - updatedGroups = append(updatedGroups, group) - } - } - } - } - - if category != "" { - socketclients.BroadcastMessage(structs.SendSocketMessage{ - Cmd: utils.SentCmdGroupTasksReloaded, - Body: struct { - Category string - UpdatedGroups []structs.Group - }{ - Category: category, - UpdatedGroups: updatedGroups, - }, - }) - } -} - -const ( - RunGroupTaskStartTypeNormal = 0 - RunGroupTaskStartTypeTryAgain = 1 -) - -type RunGroupTaskArgs struct { - CreatorUserId string - StartType uint8 - GroupTaskId string - Category string - GroupId string - Step uint8 - TaskStepId string - GlobalInputs string - TaskInputs string -} - -type InputParameters struct { - ParameterName string `json:"parameterName"` - Value string `json:"value"` -} - -func RunGroupTask(args RunGroupTaskArgs) { - categoryGroup := GetCategoryGroupTaskByCategoryAndGroupId(args.Category, args.GroupId) - - groupTaskStep := structs.GroupTaskSteps{ - CreatorUserId: args.CreatorUserId, - GroupTasksId: args.GroupTaskId, - Step: args.Step, - Status: utils.GroupTasksStatusRunning, - Inputs: args.TaskInputs, - StartedAt: time.Now(), - } - - // task type - if args.StartType == RunGroupTaskStartTypeNormal { - groupTaskStep.Id = uuid.New().String() - - database.DB.Create(&groupTaskStep) - - socketclients.BroadcastMessage(structs.SendSocketMessage{ - Cmd: utils.SentCmdNewGroupTaskStep, - Body: groupTaskStep, - }) - } else if args.StartType == RunGroupTaskStartTypeTryAgain { - groupTaskStep.Id = args.TaskStepId - - updateGroupTaskSteps(groupTaskStep) - } - - // set group task to running - dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ - Status: utils.GroupTasksStatusRunning, - }) - - // global inputs - var globalInputParameters []InputParameters - - if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created - if err := json.Unmarshal([]byte(args.GlobalInputs), &globalInputParameters); err != nil { - log.Error().Msgf("err unmarshalling global inputs %s", err.Error()) - } - } else { // global inputs not given in args - fetch it from the database - if err := json.Unmarshal([]byte(dbGroupTask.GlobalInputs), &globalInputParameters); err != nil { - log.Error().Msgf("err unmarshalling global inputs %s", err.Error()) - } - } - - // task parameters - log.Debug().Msgf("script path %s", categoryGroup.Tasks[args.Step-1].ScriptPath) - commandArgs := []string{root + categoryGroup.Id + "/" + categoryGroup.Tasks[args.Step-1].ScriptPath} - - if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.TaskInputs) == 0 { - updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ - Status: utils.GroupTasksStatusInputRequired, - }) - - groupTaskStep.Status = utils.GroupTasksStatusInputRequired - - updateGroupTaskSteps(groupTaskStep) - return - } else if len(args.TaskInputs) > 0 { - var taskParameterInputs []InputParameters - - if err := json.Unmarshal([]byte(args.TaskInputs), &taskParameterInputs); err != nil { - log.Error().Msgf("err unmarshalling task parameter inputs %s", err.Error()) - } - - for _, taskParameterInput := range taskParameterInputs { - commandArgs = append(commandArgs, taskParameterInput.Value) - } - } - - // execute script - cmd, err := exec.Command("python3", commandArgs...).Output() - - cmdLog := string(cmd) - - if err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - exitCode := exitErr.ExitCode() - log.Error().Msgf("exit code %d", exitCode) - cmdLog += fmt.Sprintf("\nExit code: %v", exitCode) - } - - log.Error().Msgf("error exec command %s", err.Error()) - groupTaskStep.Status = utils.GroupTasksStatusFailed - } else { - groupTaskStep.Status = utils.GroupTasksStatusFinished - } - - fmt.Println(cmdLog) - - groupTaskStep.Log = cmdLog - groupTaskStep.EndedAt = time.Now() - - updateGroupTaskSteps(groupTaskStep) - - if int(args.Step) < len(categoryGroup.Tasks) { - if groupTaskStep.Status == utils.GroupTasksStatusFailed { - // set group task to failed - updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ - Status: utils.GroupTasksStatusFailed, - }) - } else { - args.StartType = RunGroupTaskStartTypeNormal - args.Step = args.Step + 1 - - updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ - CurrentTasksStep: args.Step, - }) - - // clear task parameters, because otherwise the next task would have the parameters from the previous task - args.TaskInputs = "" - - RunGroupTask(args) - } - } else { - // set group task to finished - updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ - Status: utils.GroupTasksStatusFinished, - EndedAt: time.Now(), - }) - } -} - -// Updates group task and send it to websocket users -func updateGroupTask(groupTasksId string, updates structs.GroupTasks) structs.GroupTasks { - database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTasksId).Updates(updates) - - var dbGroupTask structs.GroupTasks - - database.DB.First(&dbGroupTask, "id = ?", groupTasksId) - - socketclients.BroadcastMessage(structs.SendSocketMessage{ - Cmd: utils.SentCmdUpdateGroupTask, - Body: dbGroupTask, - }) - - return dbGroupTask -} - -// Updates group task steps and send it to websocket users -func updateGroupTaskSteps(groupTaskStep structs.GroupTaskSteps) { - database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep) - - socketclients.BroadcastMessage(structs.SendSocketMessage{ - Cmd: utils.SentCmdUpdateGroupTaskStep, - Body: groupTaskStep, - }) -} - -func GetAllGroupTasks() []structs.GroupTasks { - var groupTasks []structs.GroupTasks - - database.DB.Find(&groupTasks) - - return groupTasks -} - -func GetAllGroupTasksSteps() []structs.GroupTaskSteps { - var groupTaskStepsLogs []structs.GroupTaskSteps - - database.DB.Find(&groupTaskStepsLogs) - - lockedGroupTaskSteps := cache.GetLockedGroupTaskSteps() - groupTaskStepsInputs := cache.GetGroupTaskStepsInputs() - - for i, groupTaskStep := range groupTaskStepsLogs { - if groupTaskStep.Status == utils.GroupTasksStatusInputRequired { - groupTaskStepsLogs[i].Inputs = cache.GetGroupTaskStepsInputsValue(groupTaskStepsInputs, groupTaskStep.GroupTasksId, groupTaskStep.Step) - - for _, lockedGroupTaskStep := range lockedGroupTaskSteps { - if groupTaskStep.GroupTasksId == lockedGroupTaskStep.GroupTaskId { - groupTaskStepsLogs[i].LockedByUserId = lockedGroupTaskStep.LockedByUserId - } - } - } - } - - return groupTaskStepsLogs -} - -func GetCategoryGroupTaskByCategoryAndGroupId(category string, groupId string) structs.Group { - for _, categoryGroup := range cache.GetCategoryGroups() { - if categoryGroup.Category == category { - for _, group := range categoryGroup.Groups { - if group.Id == groupId { - return group - } - } - } - } - - return structs.Group{} -} - -func StartUnlockLockedGroupTaskStepsTicker() { - ticker := time.NewTicker(1 * time.Second) - - for range ticker.C { - for index, taskStep := range cache.GetLockedGroupTaskSteps() { - if time.Since(taskStep.LockedAt).Seconds() > 3 { - log.Debug().Msgf("Unlocked task step %v", index) - cache.RemoveLockedGroupTaskStep(index) - - socketclients.BroadcastMessage( - structs.SendSocketMessage{ - Cmd: utils.SentCmdTaskUnlocked, - Body: struct { - GroupTaskId string - Step uint8 - RememberId string - }{ - GroupTaskId: taskStep.GroupTaskId, - Step: taskStep.Step, - RememberId: taskStep.RememberId, - }, - }) - } - } - } -}