package grouptasks import ( "encoding/json" "errors" "fmt" "io/ioutil" "janex/admin-dashboard-backend/modules/cache" "janex/admin-dashboard-backend/modules/database" "janex/admin-dashboard-backend/modules/structs" "janex/admin-dashboard-backend/modules/utils" "janex/admin-dashboard-backend/socketclients" "os" "os/exec" "path/filepath" "strings" "time" "github.com/google/uuid" "github.com/rs/zerolog/log" ) var ( rootPath = "./groupTasks/" groupsPath = rootPath + "groups/" runningTasksPath = rootPath + "runningTasks/" publicPath = "./public/grouptasks/" ) func LoadGroups(category string) { entries, err := os.ReadDir(groupsPath) if err != nil { log.Error().Msgf("Failed to read groups directory, error: " + err.Error()) return } if category != "" { cache.RemoveAllCategoryGroupsByCategory(category) } var updatedGroups []structs.Group for _, entry := range entries { files, err := os.ReadDir(groupsPath + entry.Name()) if err != nil { log.Error().Msg("Failed to read groups directory files, error: " + err.Error()) return } for _, file := range files { if file.Name() == "index.json" { content, err := os.ReadFile(groupsPath + entry.Name() + "/index.json") if err != nil { log.Error().Msg("Failed to read file content, error: " + err.Error()) return } var group structs.Group json.Unmarshal(content, &group) group.Id = entry.Name() if category == "" || group.Category == category { cache.AddCategoryGroup(group) updatedGroups = append(updatedGroups, group) } } } } if category != "" { socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdGroupTasksReloaded, Body: struct { Category string UpdatedGroups []structs.Group }{ Category: category, UpdatedGroups: updatedGroups, }, }) } } const ( RunGroupTaskStartTypeNormal = 0 RunGroupTaskStartTypeTryAgain = 1 ) type RunGroupTaskArgs struct { CreatorUserId string StartType uint8 GroupTaskId string Category string GroupId string Step uint8 TaskStepId string GlobalInputs string TaskInputs string } type InputParameters struct { ParameterName string `json:"parameterName"` Value string `json:"value"` } type FoundFile struct { Path string FileName string } func RunGroupTask(args RunGroupTaskArgs) { categoryGroup := GetCategoryGroupTaskByCategoryAndGroupId(args.Category, args.GroupId) groupTaskStep := structs.GroupTaskSteps{ CreatorUserId: args.CreatorUserId, GroupTasksId: args.GroupTaskId, Step: args.Step, Status: utils.GroupTasksStatusRunning, Inputs: args.TaskInputs, StartedAt: time.Now(), } // task type if args.StartType == RunGroupTaskStartTypeNormal { groupTaskStep.Id = uuid.New().String() database.DB.Create(&groupTaskStep) socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdNewGroupTaskStep, Body: groupTaskStep, }) } else if args.StartType == RunGroupTaskStartTypeTryAgain { groupTaskStep.Id = args.TaskStepId updateGroupTaskSteps(groupTaskStep) } // set group task to running dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ Status: utils.GroupTasksStatusRunning, }) // global inputs var globalInputParameters []InputParameters if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created if err := json.Unmarshal([]byte(args.GlobalInputs), &globalInputParameters); err != nil { log.Error().Msgf("err unmarshalling global inputs %s", err.Error()) } } else { // global inputs not given in args - fetch it from the database if err := json.Unmarshal([]byte(dbGroupTask.GlobalInputs), &globalInputParameters); err != nil { log.Error().Msgf("err unmarshalling global inputs %s", err.Error()) } } // task parameters log.Debug().Msgf("script path %s", categoryGroup.Tasks[args.Step-1].ScriptPath) //commandArgs := []string{root + categoryGroup.Id + "/" + categoryGroup.Tasks[args.Step-1].ScriptPath} commandArgs := []string{categoryGroup.Tasks[args.Step-1].ScriptPath} if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.TaskInputs) == 0 { updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ Status: utils.GroupTasksStatusInputRequired, }) groupTaskStep.Status = utils.GroupTasksStatusInputRequired updateGroupTaskSteps(groupTaskStep) return } else if len(args.TaskInputs) > 0 { var taskParameterInputs []InputParameters if err := json.Unmarshal([]byte(args.TaskInputs), &taskParameterInputs); err != nil { log.Error().Msgf("err unmarshalling task parameter inputs %s", err.Error()) } for _, taskParameterInput := range taskParameterInputs { commandArgs = append(commandArgs, taskParameterInput.Value) } } log.Debug().Msgf("runningTasksPath %s", runningTasksPath) // create running task folder if _, err := os.Stat(runningTasksPath + groupTaskStep.GroupTasksId); errors.Is(err, os.ErrNotExist) { if err := os.Mkdir(runningTasksPath+groupTaskStep.GroupTasksId, os.ModePerm); err != nil { log.Error().Msgf("Error creating running tasks folder %s", err.Error()) } // copy scripts to group tasks folder err := filepath.Walk(groupsPath+categoryGroup.Id, func(path string, info os.FileInfo, err error) error { if err != nil { log.Error().Msgf("Error walk %s", err.Error()) return err } if !info.IsDir() && filepath.Ext(path) == ".py" { log.Info().Msgf("path %s info %s", path, info.Name()) bytesRead, err := ioutil.ReadFile(path) if err != nil { log.Error().Msgf("Error reading file %s", err.Error()) } err = ioutil.WriteFile(runningTasksPath+groupTaskStep.GroupTasksId+"/"+info.Name(), bytesRead, 0644) if err != nil { log.Error().Msgf("Error writing file %s", err.Error()) } } return nil }) if err != nil { log.Error().Msgf("Failed to walk through task folder %s", err.Error()) } } // execute command cmd := exec.Command("python3", commandArgs...) //cmd.Dir = "/home/alex/Documents/coding/projects/janex/admin-dashboard/backend/groupTasks/groups/production1/" // path needs to be set here as the python scripts will use the path as base path to create files for example cmd.Dir = runningTasksPath + groupTaskStep.GroupTasksId + "/" out, err := cmd.CombinedOutput() // execute script //cmd, err := exec.Command("python3", commandArgs...).Output() //cmdLog := string(cmd) cmdLog := string(out) if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { exitCode := exitErr.ExitCode() log.Error().Msgf("exit code %d", exitCode) cmdLog += fmt.Sprintf("\nExit code: %v", exitCode) } log.Error().Msgf("error exec command %s", err.Error()) groupTaskStep.Status = utils.GroupTasksStatusFailed } else { groupTaskStep.Status = utils.GroupTasksStatusFinished } fmt.Println(cmdLog) groupTaskStep.Log = cmdLog groupTaskStep.EndedAt = time.Now() // looking for files which are created by the scripts var foundFiles []FoundFile if err = filepath.Walk(runningTasksPath+groupTaskStep.GroupTasksId+"/", func(path string, info os.FileInfo, err error) error { if err != nil { log.Error().Msgf("Error walk %s", err.Error()) return err } if !info.IsDir() && filepath.Ext(path) != ".py" && !strings.Contains(path, "/oldFiles/") { log.Info().Msgf("found path %s info %s", path, info.Name()) foundFiles = append(foundFiles, FoundFile{Path: path, FileName: info.Name()}) } return nil }); err != nil { log.Error().Msgf("Failed look for created files %s", err.Error()) } if len(foundFiles) > 0 { // creating folder for the old files which can be used by the python scripts for the next steps if _, err := os.Stat(runningTasksPath + groupTaskStep.GroupTasksId + "/oldFiles"); errors.Is(err, os.ErrNotExist) { if err := os.Mkdir(runningTasksPath+groupTaskStep.GroupTasksId+"/oldFiles", os.ModePerm); err != nil { log.Error().Msgf("Error creating old files folder %s", err.Error()) } } // copy files to public directory to access it via web ui if _, err := os.Stat(publicPath + groupTaskStep.GroupTasksId); errors.Is(err, os.ErrNotExist) { if err := os.Mkdir(publicPath+groupTaskStep.GroupTasksId, os.ModePerm); err != nil { log.Error().Msgf("Error creating task public folder %s", err.Error()) } } var taskFiles []structs.GroupTaskStepFile for _, foundFile := range foundFiles { log.Info().Msgf("found file %s", foundFile) bytesRead, err := ioutil.ReadFile(foundFile.Path) if err != nil { log.Error().Msgf("Error reading file %s", err.Error()) } err = ioutil.WriteFile(runningTasksPath+groupTaskStep.GroupTasksId+"/oldFiles/"+foundFile.FileName, bytesRead, 0644) if err != nil { log.Error().Msgf("Error writing file %s", err.Error()) } systemFileName := uuid.New().String() + filepath.Ext(foundFile.FileName) log.Debug().Msgf("systemFileName: %s", systemFileName) err = ioutil.WriteFile(publicPath+groupTaskStep.GroupTasksId+"/"+systemFileName, bytesRead, 0644) if err != nil { log.Error().Msgf("Error writing file %s", err.Error()) } err = os.Remove(runningTasksPath + groupTaskStep.GroupTasksId + "/" + foundFile.FileName) if err != nil { log.Error().Msgf("Failed to delete created file by task %s", err.Error()) } taskFiles = append(taskFiles, structs.GroupTaskStepFile{ OriginalFileName: foundFile.FileName, SystemFileName: systemFileName, }) } marshaledTaskFiles, err := json.Marshal(taskFiles) if err != nil { log.Error().Msgf("Failed to marshal task files %s", err.Error()) } else { groupTaskStep.Files = string(marshaledTaskFiles) } } // update group task step updateGroupTaskSteps(groupTaskStep) if int(args.Step) < len(categoryGroup.Tasks) { if groupTaskStep.Status == utils.GroupTasksStatusFailed { // set group task to failed updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ Status: utils.GroupTasksStatusFailed, }) } else { args.StartType = RunGroupTaskStartTypeNormal args.Step = args.Step + 1 updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ CurrentTasksStep: args.Step, }) // clear task parameters, because otherwise the next task would have the parameters from the previous task args.TaskInputs = "" RunGroupTask(args) } } else { // set group task to finished updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{ Status: utils.GroupTasksStatusFinished, EndedAt: time.Now(), }) err = os.RemoveAll(runningTasksPath + groupTaskStep.GroupTasksId + "/") if err != nil { log.Error().Msgf("Failed to delete running task folder %s", err.Error()) } } } // Updates group task and send it to websocket users func updateGroupTask(groupTasksId string, updates structs.GroupTasks) structs.GroupTasks { database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTasksId).Updates(updates) var dbGroupTask structs.GroupTasks database.DB.First(&dbGroupTask, "id = ?", groupTasksId) socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdUpdateGroupTask, Body: dbGroupTask, }) return dbGroupTask } // Updates group task steps and send it to websocket users func updateGroupTaskSteps(groupTaskStep structs.GroupTaskSteps) { database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep) socketclients.BroadcastMessage(structs.SendSocketMessage{ Cmd: utils.SentCmdUpdateGroupTaskStep, Body: groupTaskStep, }) } func GetAllGroupTasks() []structs.GroupTasks { var groupTasks []structs.GroupTasks database.DB.Find(&groupTasks) return groupTasks } func GetAllGroupTasksSteps() []structs.GroupTaskSteps { var groupTaskStepsLogs []structs.GroupTaskSteps database.DB.Find(&groupTaskStepsLogs) lockedGroupTaskSteps := cache.GetLockedGroupTaskSteps() groupTaskStepsInputs := cache.GetGroupTaskStepsInputs() for i, groupTaskStep := range groupTaskStepsLogs { if groupTaskStep.Status == utils.GroupTasksStatusInputRequired { groupTaskStepsLogs[i].Inputs = cache.GetGroupTaskStepsInputsValue(groupTaskStepsInputs, groupTaskStep.GroupTasksId, groupTaskStep.Step) for _, lockedGroupTaskStep := range lockedGroupTaskSteps { if groupTaskStep.GroupTasksId == lockedGroupTaskStep.GroupTaskId { groupTaskStepsLogs[i].LockedByUserId = lockedGroupTaskStep.LockedByUserId } } } } return groupTaskStepsLogs } func GetCategoryGroupTaskByCategoryAndGroupId(category string, groupId string) structs.Group { for _, categoryGroup := range cache.GetCategoryGroups() { if categoryGroup.Category == category { for _, group := range categoryGroup.Groups { if group.Id == groupId { return group } } } } return structs.Group{} } func StartUnlockLockedGroupTaskStepsTicker() { ticker := time.NewTicker(1 * time.Second) for range ticker.C { for index, taskStep := range cache.GetLockedGroupTaskSteps() { if time.Since(taskStep.LockedAt).Seconds() > 3 { log.Debug().Msgf("Unlocked task step %v", index) cache.RemoveLockedGroupTaskStep(index) socketclients.BroadcastMessage( structs.SendSocketMessage{ Cmd: utils.SentCmdTaskUnlocked, Body: struct { GroupTaskId string Step uint8 RememberId string }{ GroupTaskId: taskStep.GroupTaskId, Step: taskStep.Step, RememberId: taskStep.RememberId, }, }) } } } }