470 lines
13 KiB
Go
470 lines
13 KiB
Go
package grouptasks
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"janex/admin-dashboard-backend/modules/cache"
|
|
"janex/admin-dashboard-backend/modules/database"
|
|
"janex/admin-dashboard-backend/modules/structs"
|
|
"janex/admin-dashboard-backend/modules/systempermissions"
|
|
"janex/admin-dashboard-backend/modules/utils"
|
|
"janex/admin-dashboard-backend/socketclients"
|
|
llog "log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
var (
|
|
rootPath = "./groupTasks/"
|
|
groupsPath = rootPath + "groups/"
|
|
runningTasksPath = rootPath + "runningTasks/"
|
|
publicPath = "./public/grouptasks/"
|
|
)
|
|
|
|
func LoadGroups(category string) {
|
|
entries, err := os.ReadDir(groupsPath)
|
|
|
|
if err != nil {
|
|
llog.Fatal(err)
|
|
return
|
|
}
|
|
|
|
if category != "" {
|
|
cache.RemoveAllCategoryGroupsByCategory(category)
|
|
}
|
|
|
|
var updatedGroups []structs.Group
|
|
|
|
for _, entry := range entries {
|
|
files, err := os.ReadDir(groupsPath + entry.Name())
|
|
|
|
if err != nil {
|
|
log.Error().Msg("Failed to read groups directory files, error: " + err.Error())
|
|
return
|
|
}
|
|
|
|
for _, file := range files {
|
|
if file.Name() == "index.json" {
|
|
content, err := os.ReadFile(groupsPath + entry.Name() + "/index.json")
|
|
|
|
if err != nil {
|
|
log.Error().Msg("Failed to read file content, error: " + err.Error())
|
|
return
|
|
}
|
|
|
|
var group structs.Group
|
|
|
|
json.Unmarshal(content, &group)
|
|
|
|
group.Id = entry.Name()
|
|
|
|
if category == "" || group.Category == category {
|
|
cache.AddCategoryGroup(group)
|
|
|
|
updatedGroups = append(updatedGroups, group)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if category != "" {
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdGroupTasksReloaded,
|
|
Body: struct {
|
|
Category string
|
|
UpdatedGroups []structs.Group
|
|
}{
|
|
Category: category,
|
|
UpdatedGroups: updatedGroups,
|
|
},
|
|
})
|
|
}
|
|
|
|
systempermissions.AddDynamicGroupTasksPermissions()
|
|
}
|
|
|
|
const (
|
|
RunGroupTaskStartTypeNormal = 0
|
|
RunGroupTaskStartTypeTryAgain = 1
|
|
)
|
|
|
|
type RunGroupTaskArgs struct {
|
|
CreatorUserId string
|
|
StartType uint8
|
|
GroupTaskId string
|
|
Category string
|
|
GroupId string
|
|
Step uint8
|
|
TaskStepId string
|
|
GlobalInputs string
|
|
TaskInputs string
|
|
}
|
|
|
|
type InputParameters struct {
|
|
ParameterName string `json:"parameterName"`
|
|
Value string `json:"value"`
|
|
}
|
|
|
|
type FoundFile struct {
|
|
Path string
|
|
FileName string
|
|
}
|
|
|
|
func RunGroupTask(args RunGroupTaskArgs) {
|
|
categoryGroup := GetCategoryGroupTaskByCategoryAndGroupId(args.Category, args.GroupId)
|
|
|
|
groupTaskStep := structs.GroupTaskSteps{
|
|
CreatorUserId: args.CreatorUserId,
|
|
GroupTasksId: args.GroupTaskId,
|
|
Step: args.Step,
|
|
Status: utils.GroupTasksStatusRunning,
|
|
Inputs: args.TaskInputs,
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
// task type
|
|
if args.StartType == RunGroupTaskStartTypeNormal {
|
|
groupTaskStep.Id = uuid.New().String()
|
|
|
|
database.DB.Create(&groupTaskStep)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdNewGroupTaskStep,
|
|
Body: groupTaskStep,
|
|
})
|
|
} else if args.StartType == RunGroupTaskStartTypeTryAgain {
|
|
groupTaskStep.Id = args.TaskStepId
|
|
|
|
updateGroupTaskSteps(groupTaskStep)
|
|
}
|
|
|
|
// set group task to running
|
|
dbGroupTask := updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
Status: utils.GroupTasksStatusRunning,
|
|
})
|
|
|
|
// global inputs
|
|
var globalInputParameters []InputParameters
|
|
|
|
if len(args.GlobalInputs) > 0 { // global inputs given in args because the group task was just created
|
|
if err := json.Unmarshal([]byte(args.GlobalInputs), &globalInputParameters); err != nil {
|
|
log.Error().Msgf("err unmarshalling global inputs %s", err.Error())
|
|
}
|
|
} else { // global inputs not given in args - fetch it from the database
|
|
if err := json.Unmarshal([]byte(dbGroupTask.GlobalInputs), &globalInputParameters); err != nil {
|
|
log.Error().Msgf("err unmarshalling global inputs %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// task parameters
|
|
log.Debug().Msgf("script path %s", categoryGroup.Tasks[args.Step-1].ScriptPath)
|
|
//commandArgs := []string{root + categoryGroup.Id + "/" + categoryGroup.Tasks[args.Step-1].ScriptPath}
|
|
commandArgs := []string{categoryGroup.Tasks[args.Step-1].ScriptPath}
|
|
|
|
if len(categoryGroup.Tasks[args.Step-1].Parameters) != 0 && len(args.TaskInputs) == 0 {
|
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
Status: utils.GroupTasksStatusInputRequired,
|
|
})
|
|
|
|
groupTaskStep.Status = utils.GroupTasksStatusInputRequired
|
|
|
|
updateGroupTaskSteps(groupTaskStep)
|
|
return
|
|
} else if len(args.TaskInputs) > 0 {
|
|
var taskParameterInputs []InputParameters
|
|
|
|
if err := json.Unmarshal([]byte(args.TaskInputs), &taskParameterInputs); err != nil {
|
|
log.Error().Msgf("err unmarshalling task parameter inputs %s", err.Error())
|
|
}
|
|
|
|
for _, taskParameterInput := range taskParameterInputs {
|
|
commandArgs = append(commandArgs, taskParameterInput.Value)
|
|
}
|
|
}
|
|
|
|
// create running task folder
|
|
if _, err := os.Stat(runningTasksPath + groupTaskStep.GroupTasksId); errors.Is(err, os.ErrNotExist) {
|
|
if err := os.Mkdir(runningTasksPath+groupTaskStep.GroupTasksId, os.ModePerm); err != nil {
|
|
log.Error().Msgf("Error creating running tasks folder %s", err.Error())
|
|
}
|
|
|
|
// copy scripts to group tasks folder
|
|
err := filepath.Walk(groupsPath+categoryGroup.Id, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
log.Error().Msgf("Error walk %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
if !info.IsDir() && filepath.Ext(path) == ".py" {
|
|
bytesRead, err := ioutil.ReadFile(path)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Error reading file %s", err.Error())
|
|
}
|
|
|
|
err = ioutil.WriteFile(runningTasksPath+groupTaskStep.GroupTasksId+"/"+info.Name(), bytesRead, 0644)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Error writing file %s", err.Error())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Failed to walk through task folder %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// execute command
|
|
cmd := exec.Command("python3", commandArgs...)
|
|
//cmd.Dir = "/home/alex/Documents/coding/projects/janex/admin-dashboard/backend/groupTasks/groups/production1/"
|
|
// path needs to be set here as the python scripts will use the path as base path to create files for example
|
|
cmd.Dir = runningTasksPath + groupTaskStep.GroupTasksId + "/"
|
|
out, err := cmd.CombinedOutput()
|
|
|
|
// execute script
|
|
//cmd, err := exec.Command("python3", commandArgs...).Output()
|
|
|
|
//cmdLog := string(cmd)
|
|
cmdLog := string(out)
|
|
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
exitCode := exitErr.ExitCode()
|
|
log.Error().Msgf("exit code %d", exitCode)
|
|
cmdLog += fmt.Sprintf("\nExit code: %v", exitCode)
|
|
}
|
|
|
|
log.Error().Msgf("error exec command %s", err.Error())
|
|
groupTaskStep.Status = utils.GroupTasksStatusFailed
|
|
} else {
|
|
groupTaskStep.Status = utils.GroupTasksStatusFinished
|
|
}
|
|
|
|
fmt.Println(cmdLog)
|
|
|
|
groupTaskStep.Log = cmdLog
|
|
groupTaskStep.EndedAt = time.Now()
|
|
|
|
// looking for files which are created by the scripts
|
|
var foundFiles []FoundFile
|
|
|
|
if err = filepath.Walk(runningTasksPath+groupTaskStep.GroupTasksId+"/", func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
log.Error().Msgf("Error walk %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
if !info.IsDir() && filepath.Ext(path) != ".py" && !strings.Contains(path, "/oldFiles/") {
|
|
foundFiles = append(foundFiles, FoundFile{Path: path, FileName: info.Name()})
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
log.Error().Msgf("Failed look for created files %s", err.Error())
|
|
}
|
|
|
|
if len(foundFiles) > 0 {
|
|
// creating folder for the old files which can be used by the python scripts for the next steps
|
|
if _, err := os.Stat(runningTasksPath + groupTaskStep.GroupTasksId + "/oldFiles"); errors.Is(err, os.ErrNotExist) {
|
|
if err := os.Mkdir(runningTasksPath+groupTaskStep.GroupTasksId+"/oldFiles", os.ModePerm); err != nil {
|
|
log.Error().Msgf("Error creating old files folder %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// copy files to public directory to access it via web ui
|
|
if _, err := os.Stat(publicPath + groupTaskStep.GroupTasksId); errors.Is(err, os.ErrNotExist) {
|
|
if err := os.Mkdir(publicPath+groupTaskStep.GroupTasksId, os.ModePerm); err != nil {
|
|
log.Error().Msgf("Error creating task public folder %s", err.Error())
|
|
}
|
|
}
|
|
|
|
var taskFiles []structs.GroupTaskStepFile
|
|
|
|
for _, foundFile := range foundFiles {
|
|
bytesRead, err := ioutil.ReadFile(foundFile.Path)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Error reading file %s", err.Error())
|
|
}
|
|
|
|
err = ioutil.WriteFile(runningTasksPath+groupTaskStep.GroupTasksId+"/oldFiles/"+foundFile.FileName, bytesRead, 0644)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Error writing file %s", err.Error())
|
|
}
|
|
|
|
systemFileName := uuid.New().String() + filepath.Ext(foundFile.FileName)
|
|
|
|
err = ioutil.WriteFile(publicPath+groupTaskStep.GroupTasksId+"/"+systemFileName, bytesRead, 0644)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Error writing file %s", err.Error())
|
|
}
|
|
|
|
err = os.Remove(runningTasksPath + groupTaskStep.GroupTasksId + "/" + foundFile.FileName)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Failed to delete created file by task %s", err.Error())
|
|
}
|
|
|
|
taskFiles = append(taskFiles, structs.GroupTaskStepFile{
|
|
OriginalFileName: foundFile.FileName,
|
|
SystemFileName: systemFileName,
|
|
})
|
|
}
|
|
|
|
marshaledTaskFiles, err := json.Marshal(taskFiles)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Failed to marshal task files %s", err.Error())
|
|
} else {
|
|
groupTaskStep.Files = string(marshaledTaskFiles)
|
|
}
|
|
}
|
|
|
|
// update group task step
|
|
|
|
updateGroupTaskSteps(groupTaskStep)
|
|
|
|
if int(args.Step) < len(categoryGroup.Tasks) {
|
|
if groupTaskStep.Status == utils.GroupTasksStatusFailed {
|
|
// set group task to failed
|
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
Status: utils.GroupTasksStatusFailed,
|
|
})
|
|
} else {
|
|
args.StartType = RunGroupTaskStartTypeNormal
|
|
args.Step = args.Step + 1
|
|
|
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
CurrentTasksStep: args.Step,
|
|
})
|
|
|
|
// clear task parameters, because otherwise the next task would have the parameters from the previous task
|
|
args.TaskInputs = ""
|
|
|
|
RunGroupTask(args)
|
|
}
|
|
} else {
|
|
// set group task to finished
|
|
updateGroupTask(groupTaskStep.GroupTasksId, structs.GroupTasks{
|
|
Status: utils.GroupTasksStatusFinished,
|
|
EndedAt: time.Now(),
|
|
})
|
|
|
|
err = os.RemoveAll(runningTasksPath + groupTaskStep.GroupTasksId + "/")
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("Failed to delete running task folder %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Updates group task and send it to websocket users
|
|
func updateGroupTask(groupTasksId string, updates structs.GroupTasks) structs.GroupTasks {
|
|
database.DB.Model(&structs.GroupTasks{}).Where("id = ?", groupTasksId).Updates(updates)
|
|
|
|
var dbGroupTask structs.GroupTasks
|
|
|
|
database.DB.First(&dbGroupTask, "id = ?", groupTasksId)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdUpdateGroupTask,
|
|
Body: dbGroupTask,
|
|
})
|
|
|
|
return dbGroupTask
|
|
}
|
|
|
|
// Updates group task steps and send it to websocket users
|
|
func updateGroupTaskSteps(groupTaskStep structs.GroupTaskSteps) {
|
|
database.DB.Model(&structs.GroupTaskSteps{}).Where("id = ?", groupTaskStep.Id).Updates(groupTaskStep)
|
|
|
|
socketclients.BroadcastMessage(structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdUpdateGroupTaskStep,
|
|
Body: groupTaskStep,
|
|
})
|
|
}
|
|
|
|
func GetAllGroupTasks() []structs.GroupTasks {
|
|
var groupTasks []structs.GroupTasks
|
|
|
|
database.DB.Find(&groupTasks)
|
|
|
|
return groupTasks
|
|
}
|
|
|
|
func GetAllGroupTasksSteps() []structs.GroupTaskSteps {
|
|
var groupTaskStepsLogs []structs.GroupTaskSteps
|
|
|
|
database.DB.Find(&groupTaskStepsLogs)
|
|
|
|
lockedGroupTaskSteps := cache.GetLockedGroupTaskSteps()
|
|
groupTaskStepsInputs := cache.GetGroupTaskStepsInputs()
|
|
|
|
for i, groupTaskStep := range groupTaskStepsLogs {
|
|
if groupTaskStep.Status == utils.GroupTasksStatusInputRequired {
|
|
groupTaskStepsLogs[i].Inputs = cache.GetGroupTaskStepsInputsValue(groupTaskStepsInputs, groupTaskStep.GroupTasksId, groupTaskStep.Step)
|
|
|
|
for _, lockedGroupTaskStep := range lockedGroupTaskSteps {
|
|
if groupTaskStep.GroupTasksId == lockedGroupTaskStep.GroupTaskId {
|
|
groupTaskStepsLogs[i].LockedByUserId = lockedGroupTaskStep.LockedByUserId
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return groupTaskStepsLogs
|
|
}
|
|
|
|
func GetCategoryGroupTaskByCategoryAndGroupId(category string, groupId string) structs.Group {
|
|
for _, categoryGroup := range cache.GetCategoryGroups() {
|
|
if categoryGroup.Category == category {
|
|
for _, group := range categoryGroup.Groups {
|
|
if group.Id == groupId {
|
|
return group
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return structs.Group{}
|
|
}
|
|
|
|
func StartUnlockLockedGroupTaskStepsTicker() {
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
|
|
for range ticker.C {
|
|
for index, taskStep := range cache.GetLockedGroupTaskSteps() {
|
|
if time.Since(taskStep.LockedAt).Seconds() > 3 {
|
|
cache.RemoveLockedGroupTaskStep(index)
|
|
|
|
socketclients.BroadcastMessage(
|
|
structs.SendSocketMessage{
|
|
Cmd: utils.SentCmdTaskUnlocked,
|
|
Body: struct {
|
|
GroupTaskId string
|
|
Step uint8
|
|
RememberId string
|
|
}{
|
|
GroupTaskId: taskStep.GroupTaskId,
|
|
Step: taskStep.Step,
|
|
RememberId: taskStep.RememberId,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|