diff --git a/.devcontainer/dev.Dockerfile b/.devcontainer/dev.Dockerfile index 0093bb3..ebfab54 100755 --- a/.devcontainer/dev.Dockerfile +++ b/.devcontainer/dev.Dockerfile @@ -1,3 +1,5 @@ +FROM docker:26-cli AS dockercli + FROM golang:1.23-bullseye WORKDIR /app @@ -19,13 +21,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ pkg-config \ && rm -rf /var/lib/apt/lists/* +COPY --from=dockercli /usr/local/bin/docker /usr/local/bin/docker + COPY go.mod go.sum ./ RUN go mod download RUN go install golang.org/x/tools/gopls@v0.16.2 && \ go install honnef.co/go/tools/cmd/staticcheck@v0.5.0 -RUN CGO_ENABLED=1 go install -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv@latest +RUN go install github.com/go-delve/delve/cmd/dlv@v1.23.1 RUN mkdir -p storage && chmod 777 storage diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 3d71a13..539ae1d 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -6,7 +6,9 @@ }, "workspaceFolder": "/app", "workspaceMount": "source=${localWorkspaceFolder},target=/app,type=bind", - "appPort": ["8080:8080"], + "appPort": [ + "8080:8080" + ], "postCreateCommand": "go mod tidy", "customizations": { "vscode": { @@ -14,12 +16,16 @@ "terminal.integrated.defaultProfile.linux": "sh" }, "extensions": [ - "golang.go", + "golang.go", "ms-vscode-remote.remote-containers", "ms-azuretools.vscode-docker" ] } }, "remoteUser": "root", - "runArgs": ["--privileged"] + "runArgs": [ + "--privileged", + "-v", + "/var/run/docker.sock:/var/run/docker.sock" + ] } \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 542760e..462d60c 100755 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,7 @@ "request": "launch", "mode": "debug", "program": "cmd/server/main.go", - "buildFlags": ["-gcflags=all=-N -l"] + "buildFlags": ["-gcflags=all=-N"] }, { "name": "Launch AkoFlow Client Run", diff --git a/pkg/server/config/app_container.go b/pkg/server/config/app_container.go index 54d5fab..3f0546f 100755 --- a/pkg/server/config/app_container.go +++ b/pkg/server/config/app_container.go @@ -8,8 +8,9 @@ import ( "github.com/ovvesley/akoflow/pkg/server/config/http_helper" "github.com/ovvesley/akoflow/pkg/server/config/http_render_view" "github.com/ovvesley/akoflow/pkg/server/config/logger" + "github.com/ovvesley/akoflow/pkg/server/connector/connector_hpc" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s" - "github.com/ovvesley/akoflow/pkg/server/connector/connector_sdumont" + "github.com/ovvesley/akoflow/pkg/server/connector/connector_local" "github.com/ovvesley/akoflow/pkg/server/connector/connector_singularity" "github.com/ovvesley/akoflow/pkg/server/database/repository/activity_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/logs_repository" @@ -56,7 +57,8 @@ type AppContainerRepository struct { type AppContainerConnector struct { K8sConnector connector_k8s.IConnector SingularityConnector connector_singularity.IConnectorSingularity - SDumontConnector connector_sdumont.IConnectorSDumont + HPCRuntimeConnector connector_hpc.IConnectorHPCRuntime + LocalConnector connector_local.IConnectorLocal } type AppContainerTemplateRenderer struct { @@ -74,7 +76,7 @@ func GetEnvVars() (map[string]string, map[string]map[string]string) { envVars := make(map[string]string) envVarByRuntime := make(map[string]map[string]string) - runtimes_avaibles := []string{"k8s", "singularity", "sdumont"} + runtimes_avaibles := []string{"k8s", "singularity", "hpc"} for _, v := range os.Environ() { splitted := strings.Split(v, "=") @@ -120,7 +122,8 @@ func MakeAppContainer() AppContainer { // create the Connector instances k8sConnector := connector_k8s.New() singularityConnector := connector_singularity.New() - sdumontConnector := connector_sdumont.New() + hpcConnector := connector_hpc.New() + localConnector := connector_local.New() renderViewprovider := http_render_view.New() @@ -145,7 +148,8 @@ func MakeAppContainer() AppContainer { Connector: AppContainerConnector{ K8sConnector: k8sConnector, SingularityConnector: singularityConnector, - SDumontConnector: sdumontConnector, + HPCRuntimeConnector: hpcConnector, + LocalConnector: localConnector, }, TemplateRenderer: AppContainerTemplateRenderer{ RenderViewProvider: renderViewprovider, diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index 173e92c..2448e90 100755 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -61,8 +61,16 @@ func loadDotEnv() { if line != "" { env := strings.Split(line, "=") key := strings.TrimSpace(env[0]) - value := strings.Trim(strings.TrimSpace(env[1]), "'") + value := "" + + if len(env) == 2 { + value = strings.TrimSpace(env[1]) + } else if len(env) > 2 { + value = strings.TrimSpace(strings.Join(env[1:], "=")) + } + value = strings.TrimSpace(value) os.Setenv(key, value) + } } diff --git a/pkg/server/connector/connector_hpc/connector_hpc.go b/pkg/server/connector/connector_hpc/connector_hpc.go new file mode 100644 index 0000000..7f0108f --- /dev/null +++ b/pkg/server/connector/connector_hpc/connector_hpc.go @@ -0,0 +1,243 @@ +package connector_hpc + +import ( + "encoding/base64" + "fmt" + "os/exec" + "sync" + "syscall" + + "github.com/ovvesley/akoflow/pkg/server/entities/runtime_entity" +) + +type ConnectorHPCRuntime struct { + Runtime runtime_entity.Runtime +} + +func New() IConnectorHPCRuntime { + return &ConnectorHPCRuntime{} +} + +func (c *ConnectorHPCRuntime) SetRuntime(runtime runtime_entity.Runtime) *ConnectorHPCRuntime { + c.Runtime = runtime + return c +} + +type IConnectorHPCRuntime interface { + RunCommand(command string, args ...string) (string, error) + RunCommandWithOutput(command string, args ...string) (string, error) + RunCommandWithOutputRemote(command string, args ...string) (string, error) + IsVPNConnected() (bool, error) + ExecuteMultiplesCommand(commands []string) + SetRuntime(runtime runtime_entity.Runtime) *ConnectorHPCRuntime + BuildRemoteCommand(runtime runtime_entity.Runtime, command string) (string, error) +} + +func (c *ConnectorHPCRuntime) RunCommandWithOutputRemote(command string, args ...string) (string, error) { + fmt.Printf("Executing command: %s %v\n", command, args) + + shell := getAvailableShell() + + remoteCommand, err := c.BuildRemoteCommand(c.Runtime, command) + if err != nil { + return "", err + } + + fullCommand := append([]string{"-c", remoteCommand}, args...) + cmd := exec.Command(shell, fullCommand...) + output, err := cmd.CombinedOutput() + + println(string(output)) + if err != nil { + return "", fmt.Errorf("failed to execute command: %w", err) + } + + return string(output), nil +} + +func (c *ConnectorHPCRuntime) handleCreateSSHKey(privateKey string, publicKey string, sshConfig string) error { + if privateKey == "" || publicKey == "" || sshConfig == "" { + return nil + } + + privateKeyDecoded, err := decodeBase64(privateKey) + if err != nil { + return err + } + + publicKeyDecoded, err := decodeBase64(publicKey) + if err != nil { + return err + } + + sshConfigDecoded, err := decodeBase64(sshConfig) + if err != nil { + return err + } + + privateKeyFile, err := writeTempSSHKey(privateKeyDecoded, "id_rsa") + if err != nil { + return err + } + + publicKeyFile, err := writeTempSSHKey(publicKeyDecoded, "id_rsa.pub") + if err != nil { + removeTempSSHKey(privateKeyFile) + return err + } + + _, err = writeTempSSHKey(sshConfigDecoded, "config") + + if err != nil { + removeTempSSHKey(privateKeyFile) + removeTempSSHKey(publicKeyFile) + return err + } + + return nil +} + +func (c ConnectorHPCRuntime) BuildRemoteCommand(runtime runtime_entity.Runtime, command string) (string, error) { + username := runtime.GetCurrentRuntimeMetadata("USER") + hostname := runtime.GetCurrentRuntimeMetadata("HOST_CLUSTER") + sshKeyPrivateKey := runtime.GetCurrentRuntimeMetadata("SSHKEYPRIVK") + sshKeyPublicKey := runtime.GetCurrentRuntimeMetadata("SSHKEYPUBLK") + sshConfig := runtime.GetCurrentRuntimeMetadata("SSHCONFIG") + password := runtime.GetCurrentRuntimeMetadata("PASSWORD") + + // create .ssh/id_rsa file with the private key + c.handleCreateSSHKey(sshKeyPrivateKey, sshKeyPublicKey, sshConfig) + + if sshKeyPrivateKey != "" && sshKeyPublicKey != "" && sshConfig != "" { + return fmt.Sprintf("ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o ConnectTimeout=10 %s@%s '%s'", username, hostname, command), nil + } else if password != "" { + return fmt.Sprintf("sshpass -p '%s' ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o ConnectTimeout=10 %s@%s '%s'", password, username, hostname, command), nil + } + + return "", fmt.Errorf("no authentication method provided") +} + +func decodeBase64(encoded string) (string, error) { + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return "", fmt.Errorf("failed to decode base64: %w", err) + } + return string(decoded), nil +} + +func writeTempSSHKey(key string, filename string) (string, error) { + path := "/root/.ssh/" + + keyFile := path + filename + + _, err := exec.Command("bash", "-c", fmt.Sprintf("test -f %s", keyFile)).Output() + if err == nil { + return keyFile, nil + } + + err = exec.Command("bash", "-c", fmt.Sprintf("echo '%s' > %s && chmod 600 %s", key, keyFile, keyFile)).Run() + if err != nil { + return "", fmt.Errorf("failed to write SSH key to file: %w", err) + } + + return keyFile, nil +} + +func removeTempSSHKey(file string) { + exec.Command("bash", "-c", fmt.Sprintf("rm -f %s", file)).Run() +} + +func (s *ConnectorHPCRuntime) ExecuteMultiplesCommand(commands []string) { + var wg sync.WaitGroup + + responses := make(chan string, len(commands)) // Create a channel to receive the responses + + for _, command := range commands { + wg.Add(1) + go func(command string) { + defer wg.Done() + + shell := getAvailableShell() + + fullCommand := append([]string{"-c", command}) + cmd := exec.Command(shell, fullCommand...) + output, err := cmd.CombinedOutput() + + fmt.Printf("Executing command: %s %v\n", command, fullCommand) + + if err != nil { + fmt.Printf("failed to execute command: %s\n", err) + } + + fmt.Printf("Output: %s\n", output) + + responses <- string(output) + + }(command) + } + + wg.Wait() + + close(responses) + + for response := range responses { + fmt.Printf("Response: %s\n", response) + } +} + +func (c *ConnectorHPCRuntime) RunCommand(command string, args ...string) (string, error) { + return executeCommand(command, args...) +} + +func (c *ConnectorHPCRuntime) RunCommandWithOutput(command string, args ...string) (string, error) { + fmt.Printf("Executing command: %s %v\n", command, args) + + shell := getAvailableShell() + + fullCommand := append([]string{"-c", command}, args...) + cmd := exec.Command(shell, fullCommand...) + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("failed to execute command: %w", err) + } + + return string(output), nil + +} + +func (c *ConnectorHPCRuntime) IsVPNConnected() (bool, error) { + output, err := c.RunCommandWithOutput("ps aux | grep vpnc") + if err != nil { + return false, err + } + + if len(output) > 0 { + return true, nil + } + + return false, nil +} + +func executeCommand(command string, args ...string) (string, error) { + fmt.Printf("Executing command: %s %v\n", command, args) + + shell := getAvailableShell() + fullCommand := append([]string{"-c", command}, args...) + cmd := exec.Command(shell, fullCommand...) + cmd.SysProcAttr = &syscall.SysProcAttr{} + if err := cmd.Start(); err != nil { + return "", fmt.Errorf("failed to start command: %w", err) + } + + pid := cmd.Process.Pid + fmt.Printf("Started process with PID: %d\n", pid) + + return fmt.Sprintf("%d", pid), nil +} + +func getAvailableShell() string { + if _, err := exec.LookPath("bash"); err == nil { + return "bash" + } + return "sh" +} diff --git a/pkg/server/connector/connector_local/connector_local.go b/pkg/server/connector/connector_local/connector_local.go new file mode 100755 index 0000000..c688b61 --- /dev/null +++ b/pkg/server/connector/connector_local/connector_local.go @@ -0,0 +1,64 @@ +package connector_local + +import ( + "fmt" + "os/exec" + "syscall" +) + +type ConnectorLocal struct { +} + +func New() IConnectorLocal { + return &ConnectorLocal{} +} + +type IConnectorLocal interface { + RunCommand(command string, args ...string) (string, error) + RunCommandWithOutput(command string, args ...string) (string, error) +} + +func (c *ConnectorLocal) RunCommand(command string, args ...string) (string, error) { + return executeCommand(command, args...) +} + +func (c *ConnectorLocal) RunCommandWithOutput(command string, args ...string) (string, error) { + fmt.Printf("Executing command: %s %v\n", command, args) + + shell := getAvailableShell() + + fullCommand := append([]string{"-c", command}, args...) + cmd := exec.Command(shell, fullCommand...) + output, err := cmd.CombinedOutput() + println("Command output: ", string(output)) + if err != nil { + return "", fmt.Errorf("failed to execute command: %w", err) + } + + return string(output), nil + +} + +func executeCommand(command string, args ...string) (string, error) { + fmt.Printf("Executing command: %s %v\n", command, args) + + shell := getAvailableShell() + fullCommand := append([]string{"-c", command}, args...) + cmd := exec.Command(shell, fullCommand...) + cmd.SysProcAttr = &syscall.SysProcAttr{} + if err := cmd.Start(); err != nil { + return "", fmt.Errorf("failed to start command: %w", err) + } + + pid := cmd.Process.Pid + fmt.Printf("Started process with PID: %d\n", pid) + + return fmt.Sprintf("%d", pid), nil +} + +func getAvailableShell() string { + if _, err := exec.LookPath("bash"); err == nil { + return "bash" + } + return "sh" +} diff --git a/pkg/server/connector/connector_sdumont/connector_sdumont.go b/pkg/server/connector/connector_sdumont/connector_sdumont.go deleted file mode 100644 index 1768aaf..0000000 --- a/pkg/server/connector/connector_sdumont/connector_sdumont.go +++ /dev/null @@ -1,150 +0,0 @@ -package connector_sdumont - -import ( - "fmt" - "os/exec" - "sync" - "syscall" - - "github.com/ovvesley/akoflow/pkg/server/entities/runtime_entity" -) - -type ConnectorSDumont struct { - Runtime runtime_entity.Runtime -} - -func New() IConnectorSDumont { - return &ConnectorSDumont{} -} - -func (c *ConnectorSDumont) SetRuntime(runtime runtime_entity.Runtime) *ConnectorSDumont { - c.Runtime = runtime - return c -} - -type IConnectorSDumont interface { - RunCommand(command string, args ...string) (string, error) - RunCommandWithOutput(command string, args ...string) (string, error) - RunCommandWithOutputRemote(command string, args ...string) (string, error) - IsVPNConnected() (bool, error) - ExecuteMultiplesCommand(commands []string) - SetRuntime(runtime runtime_entity.Runtime) *ConnectorSDumont -} - -func (c *ConnectorSDumont) RunCommandWithOutputRemote(command string, args ...string) (string, error) { - fmt.Printf("Executing command: %s %v\n", command, args) - - shell := getAvailableShell() - - password := c.Runtime.GetCurrentRuntimeMetadata("PASSWORD") - username := c.Runtime.GetCurrentRuntimeMetadata("USER") - hostname := c.Runtime.GetCurrentRuntimeMetadata("HOST_CLUSTER") - - command = fmt.Sprintf("sshpass -p '%s' ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -o ConnectTimeout=10 %s@%s '%s'", password, username, hostname, command) - - fullCommand := append([]string{"-c", command}, args...) - cmd := exec.Command(shell, fullCommand...) - output, err := cmd.CombinedOutput() - - println(string(output)) - if err != nil { - return "", fmt.Errorf("failed to execute command: %w", err) - } - - return string(output), nil -} - -func (s *ConnectorSDumont) ExecuteMultiplesCommand(commands []string) { - var wg sync.WaitGroup - - responses := make(chan string, len(commands)) // Create a channel to receive the responses - - for _, command := range commands { - wg.Add(1) - go func(command string) { - defer wg.Done() - - shell := getAvailableShell() - - fullCommand := append([]string{"-c", command}) - cmd := exec.Command(shell, fullCommand...) - output, err := cmd.CombinedOutput() - - fmt.Printf("Executing command: %s %v\n", command, fullCommand) - - if err != nil { - fmt.Printf("failed to execute command: %s\n", err) - } - - fmt.Printf("Output: %s\n", output) - - responses <- string(output) - - }(command) - } - - wg.Wait() - - close(responses) - - for response := range responses { - fmt.Printf("Response: %s\n", response) - } -} - -func (c *ConnectorSDumont) RunCommand(command string, args ...string) (string, error) { - return executeCommand(command, args...) -} - -func (c *ConnectorSDumont) RunCommandWithOutput(command string, args ...string) (string, error) { - fmt.Printf("Executing command: %s %v\n", command, args) - - shell := getAvailableShell() - - fullCommand := append([]string{"-c", command}, args...) - cmd := exec.Command(shell, fullCommand...) - output, err := cmd.CombinedOutput() - if err != nil { - return "", fmt.Errorf("failed to execute command: %w", err) - } - - return string(output), nil - -} - -func (c *ConnectorSDumont) IsVPNConnected() (bool, error) { - output, err := c.RunCommandWithOutput("ps aux | grep vpnc") - if err != nil { - return false, err - } - - if len(output) > 0 { - return true, nil - } - - return false, nil -} - -func executeCommand(command string, args ...string) (string, error) { - fmt.Printf("Executing command: %s %v\n", command, args) - - shell := getAvailableShell() - fullCommand := append([]string{"-c", command}, args...) - cmd := exec.Command(shell, fullCommand...) - cmd.SysProcAttr = &syscall.SysProcAttr{} - if err := cmd.Start(); err != nil { - return "", fmt.Errorf("failed to start command: %w", err) - } - - pid := cmd.Process.Pid - fmt.Printf("Started process with PID: %d\n", pid) - - return fmt.Sprintf("%d", pid), nil -} - -func getAvailableShell() string { - if _, err := exec.LookPath("bash"); err == nil { - return "bash" - } - return "sh" -} diff --git a/pkg/server/database/model/activity_model.go b/pkg/server/database/model/activity_model.go index bfb8085..42a6833 100644 --- a/pkg/server/database/model/activity_model.go +++ b/pkg/server/database/model/activity_model.go @@ -13,6 +13,7 @@ type Activity struct { Status int `db:"status"` ProcID string `db:"proc_id"` NodeSelector string `db:"node_selector"` + MountPath string `db:"mount_path"` CreatedAt string `db:"created_at"` StartedAt string `db:"started_at"` FinishedAt string `db:"finished_at"` diff --git a/pkg/server/database/repository/activity_repository/activity_repository_create.go b/pkg/server/database/repository/activity_repository/activity_repository_create.go index df5e05f..050bf40 100755 --- a/pkg/server/database/repository/activity_repository/activity_repository_create.go +++ b/pkg/server/database/repository/activity_repository/activity_repository_create.go @@ -96,10 +96,13 @@ func (w *ActivityRepository) createActivity(namespace string, workflow workflow_ if activity.Runtime != "" { runtime = activity.Runtime } + if activity.MountPath == "" { + activity.MountPath = workflow.Spec.MountPath + } result, err := c.Exec( - "INSERT INTO "+w.tableNameActivity+" (workflow_id, namespace, name, image, runtime, resource_k8s_base64, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)", - workflowId, namespace, activity.Name, image, runtime, rawActivity, StatusCreated) + "INSERT INTO "+w.tableNameActivity+" (workflow_id, namespace, name, image, runtime, resource_k8s_base64, mount_path, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)", + workflowId, namespace, activity.Name, image, runtime, rawActivity, activity.MountPath, StatusCreated) if err != nil { println("Error creating activity" + err.Error()) diff --git a/pkg/server/engine/httpserver/cors.go b/pkg/server/engine/httpserver/cors.go new file mode 100644 index 0000000..2d538cc --- /dev/null +++ b/pkg/server/engine/httpserver/cors.go @@ -0,0 +1,17 @@ +package httpserver + +import "net/http" + +// AllowCORS is a middleware that enables CORS for all origins and methods. +func AllowCORS(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + h.ServeHTTP(w, r) + }) +} diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/runtimes.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/runtimes.tmpl.html index 10aac5e..142e7e9 100644 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/runtimes.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/runtimes.tmpl.html @@ -20,7 +20,7 @@ } function isSensitiveKey(key) { - return /token|pass|secret/i.test(key); + return /token|key|pass|secret/i.test(key); } function renderMetadata(metadata, runtimeId) { diff --git a/pkg/server/engine/httpserver/httpserver.go b/pkg/server/engine/httpserver/httpserver.go index a413716..0f56e97 100755 --- a/pkg/server/engine/httpserver/httpserver.go +++ b/pkg/server/engine/httpserver/httpserver.go @@ -18,8 +18,8 @@ import ( func HealthCheck(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) - } + func StartServer() { http.HandleFunc("GET /", http_config.KernelHandler(public_static_handler.New().Static)) @@ -65,7 +65,8 @@ func StartServer() { //http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/storages/{storageId}/", http_config.KernelHandler(workflow_api_handler.New().GetStorage)) //http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/storages/{storageId}/download-file/", http_config.KernelHandler(workflow_api_handler.New().DownloadFile)) - err := http.ListenAndServe(config.PORT_SERVER, nil) + handler := AllowCORS(http.DefaultServeMux) + err := http.ListenAndServe(config.PORT_SERVER, handler) if err != nil { println("Error starting server", err) panic(err) diff --git a/pkg/server/engine/scripts/default-slurm.sbatch b/pkg/server/engine/scripts/default-slurm.sbatch new file mode 100644 index 0000000..c43f2d1 --- /dev/null +++ b/pkg/server/engine/scripts/default-slurm.sbatch @@ -0,0 +1,6 @@ +#!/bin/bash +#SBATCH --job-name=#JOB_NAME# +#SBATCH --output=#OUTPUT# +#SBATCH --error=#ERROR# + +#COMMAND# \ No newline at end of file diff --git a/pkg/server/entities/workflow_activity_entity/workflow_activity_entity.go b/pkg/server/entities/workflow_activity_entity/workflow_activity_entity.go index 7a50ba2..a956226 100755 --- a/pkg/server/entities/workflow_activity_entity/workflow_activity_entity.go +++ b/pkg/server/entities/workflow_activity_entity/workflow_activity_entity.go @@ -25,6 +25,7 @@ type WorkflowActivities struct { CreatedAt string `yaml:"createdAt"` StartedAt string `yaml:"startedAt"` FinishedAt string `yaml:"finishedAt"` + MountPath string `yaml:"mountPath"` } type WorkflowActivityDatabase struct { @@ -39,6 +40,7 @@ type WorkflowActivityDatabase struct { ProcId *string DependOnActivity *int NodeSelector *string + MountPath *string CreatedAt *string StartedAt *string FinishedAt *string @@ -98,6 +100,14 @@ func (wfa WorkflowActivities) GetProcId() string { return wfa.ProcId } +func (wfa WorkflowActivities) GetMountPath() string { + if wfa.MountPath != "" { + return wfa.MountPath + } + + return "" +} + func (wfa WorkflowActivities) GetNodeSelector() map[string]string { wfaNodeSelector := wfa.NodeSelector @@ -172,6 +182,7 @@ func DatabaseToWorkflowActivities(params ParamsDatabaseToWorkflowActivities) Wor CpuLimit: wfa.CpuLimit, DependsOn: wfa.DependsOn, NodeSelector: wfa.NodeSelector, + MountPath: wfa.MountPath, KeepDisk: wfa.KeepDisk, CreatedAt: createdAt, StartedAt: startedAt, diff --git a/pkg/server/runtimes/hpc_runtime/hpc_runtime.go b/pkg/server/runtimes/hpc_runtime/hpc_runtime.go new file mode 100644 index 0000000..89ca44b --- /dev/null +++ b/pkg/server/runtimes/hpc_runtime/hpc_runtime.go @@ -0,0 +1,71 @@ +package hpc_runtime + +import ( + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" + "github.com/ovvesley/akoflow/pkg/server/runtimes/hpc_runtime/hpc_runtime_service" +) + +func NewHpcRuntime() *HpcRuntime { + return &HpcRuntime{ + hpcRuntimeService: hpc_runtime_service.New(), + } +} + +type HpcRuntime struct { + hpcRuntimeService *hpc_runtime_service.HPCRuntimeService + runtimeType string + runtimeName string +} + +func (h *HpcRuntime) StartConnection() error { + return nil +} + +func (h *HpcRuntime) StopConnection() error { + return nil +} + +func (h *HpcRuntime) SetRuntimeType(runtimeType string) *HpcRuntime { + h.runtimeType = runtimeType + return h +} + +func (h *HpcRuntime) SetRuntimeName(name string) *HpcRuntime { + h.runtimeName = name + return h +} + +func (h *HpcRuntime) ApplyJob(workflowID int, activityID int) bool { + h.hpcRuntimeService.ApplyJob(workflowID, activityID) + return true +} + +func (h *HpcRuntime) DeleteJob(workflowID int, activityID int) bool { + return true +} + +func (h *HpcRuntime) GetMetrics(workflowID int, activityID int) string { + return "" +} + +func (h *HpcRuntime) GetLogs(workflow workflow_entity.Workflow, workflowActivity workflow_activity_entity.WorkflowActivities) string { + return "" +} + +func (h *HpcRuntime) GetStatus(workflowID int, activityID int) string { + return "" +} + +func (h *HpcRuntime) HealthCheck() bool { + h.hpcRuntimeService.HealthCheck(h.runtimeName) + return true +} + +func (h *HpcRuntime) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { + h.hpcRuntimeService. + SetRuntimeName(h.runtimeName). + SetRuntimeType(h.runtimeType). + VerifyActivitiesWasFinished(workflow) + return true +} diff --git a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/sdumont_runtime_service.go b/pkg/server/runtimes/hpc_runtime/hpc_runtime_service/hpc_runtime_service.go similarity index 56% rename from pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/sdumont_runtime_service.go rename to pkg/server/runtimes/hpc_runtime/hpc_runtime_service/hpc_runtime_service.go index d238985..db55925 100644 --- a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/sdumont_runtime_service.go +++ b/pkg/server/runtimes/hpc_runtime/hpc_runtime_service/hpc_runtime_service.go @@ -1,4 +1,4 @@ -package sdumont_runtime_service +package hpc_runtime_service import ( "fmt" @@ -6,7 +6,7 @@ import ( "strings" "github.com/ovvesley/akoflow/pkg/server/config" - "github.com/ovvesley/akoflow/pkg/server/connector/connector_sdumont" + "github.com/ovvesley/akoflow/pkg/server/connector/connector_hpc" "github.com/ovvesley/akoflow/pkg/server/database/repository/activity_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/runtime_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_repository" @@ -15,30 +15,43 @@ import ( "github.com/ovvesley/akoflow/pkg/server/runtimes/singularity_runtime/singularity_runtime_service" ) -type SDumontRuntimeService struct { - makeSingularityActivity singularity_runtime_service.MakeSingularityActivityService - makeSBatchSDumontActivity MakeSBatchSDumontActivityService +type HPCRuntimeService struct { + makeSingularityActivity singularity_runtime_service.MakeSingularityActivityService + makeSBatchHPCRuntimeActivity MakeSBatchHPCRuntimeActivityService activityRepository activity_repository.IActivityRepository workflowRepository workflow_repository.IWorkflowRepository runtimeRepository runtime_repository.IRuntimeRepository - connectorSDumont connector_sdumont.IConnectorSDumont + connectorHPCRuntime connector_hpc.IConnectorHPCRuntime + + runtimeName string + runtimeType string +} + +func (s *HPCRuntimeService) SetRuntimeName(runtimeName string) *HPCRuntimeService { + s.runtimeName = runtimeName + return s +} + +func (s *HPCRuntimeService) SetRuntimeType(runtimeType string) *HPCRuntimeService { + s.runtimeType = runtimeType + return s } -func New() *SDumontRuntimeService { - return &SDumontRuntimeService{ +func New() *HPCRuntimeService { + return &HPCRuntimeService{ makeSingularityActivity: singularity_runtime_service.NewMakeSingularityActivityService(), activityRepository: config.App().Repository.ActivityRepository, workflowRepository: config.App().Repository.WorkflowRepository, runtimeRepository: config.App().Repository.RuntimeRepository, - connectorSDumont: config.App().Connector.SDumontConnector, + connectorHPCRuntime: config.App().Connector.HPCRuntimeConnector, } } -func (s *SDumontRuntimeService) ApplyJob(workflowID int, activityID int) string { +func (s *HPCRuntimeService) ApplyJob(workflowID int, activityID int) string { wfa, err := s.activityRepository.Find(activityID) wf, _ := s.workflowRepository.Find(wfa.WorkflowId) @@ -63,27 +76,27 @@ func (s *SDumontRuntimeService) ApplyJob(workflowID int, activityID int) string return "" } - singularitySystemCall := s.makeSingularityActivity.MakeContainerCommandActivityToSDumont(wf, wfa) - sBatchSDumontSystemCall := s.makeSBatchSDumontActivity. + singularitySystemCall := s.makeSingularityActivity.MakeContainerCommandActivityToHPC(wf, wfa) + sBatchHPCRuntimeSystemCall := s.makeSBatchHPCRuntimeActivity. SetRuntime(*runtime). SetSingularityCommand(singularitySystemCall). Handle(wf, wfa) - fmt.Println("PID: ", singularitySystemCall, sBatchSDumontSystemCall) + fmt.Println("PID: ", singularitySystemCall, sBatchHPCRuntimeSystemCall) - connected, err := s.connectorSDumont.IsVPNConnected() + connected, err := s.connectorHPCRuntime.IsVPNConnected() if err != nil { - config.App().Logger.Error("WORKER: Error checking VPN connection to SDumont Runtime.") + config.App().Logger.Error("WORKER: Error checking VPN connection to HPCRuntime.") return "" } if !connected { - config.App().Logger.Error("WORKER: VPN is not connected to SDumont Runtime. Continue to the next activity.") + config.App().Logger.Error("WORKER: VPN is not connected to HPCRuntime. Continue to the next activity.") return "" } - output, _ := s.connectorSDumont.RunCommandWithOutputRemote(sBatchSDumontSystemCall) + output, _ := s.connectorHPCRuntime.SetRuntime(*runtime).RunCommandWithOutputRemote(sBatchHPCRuntimeSystemCall) pid, err := s.extractJobID(output) @@ -115,20 +128,20 @@ func (s *SDumontRuntimeService) ApplyJob(workflowID int, activityID int) string } -func (s *SDumontRuntimeService) applyWorkflowInRuntime(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) { - config.App().Logger.Infof("WORKER: Apply workflow in SDumont Runtime") +func (s *HPCRuntimeService) applyWorkflowInRuntime(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) { + config.App().Logger.Infof("WORKER: Apply workflow in HPCRuntime") s.updateWorkflowAndActivityStatus(wfa) s.syncWorkflowVolumes(wf) } -func (s *SDumontRuntimeService) updateWorkflowAndActivityStatus(wfa workflow_activity_entity.WorkflowActivities) { +func (s *HPCRuntimeService) updateWorkflowAndActivityStatus(wfa workflow_activity_entity.WorkflowActivities) { _ = s.workflowRepository.UpdateStatus(wfa.WorkflowId, workflow_repository.StatusRunning) _ = s.activityRepository.UpdateStatus(wfa.Id, activity_repository.StatusRunning) } -func (s *SDumontRuntimeService) syncWorkflowVolumes(wf workflow_entity.Workflow) { +func (s *HPCRuntimeService) syncWorkflowVolumes(wf workflow_entity.Workflow) { volumes := wf.GetVolumes() commands := []string{} @@ -139,25 +152,22 @@ func (s *SDumontRuntimeService) syncWorkflowVolumes(wf workflow_entity.Workflow) } for _, volume := range volumes { - // Sync local to remote - command1 := fmt.Sprintf("sshpass -p '%s' ssh -o StrictHostKeyChecking=no %s@%s 'mkdir -p %s'", - runtime.GetCurrentRuntimeMetadata("PASSWORD"), - runtime.GetCurrentRuntimeMetadata("USER"), - runtime.GetCurrentRuntimeMetadata("HOST_CLUSTER"), - volume.GetRemotePath(), - ) + command1, err := s.connectorHPCRuntime.BuildRemoteCommand(*runtime, fmt.Sprintf("mkdir -p %s", volume.GetRemotePath())) + if err != nil { + config.App().Logger.Infof("WORKER: Error building remote command.") + return + } + + var command2, command3 string - command2 := fmt.Sprintf("sshpass -p '%s' rsync -ah --progress %s %s@%s:%s", - runtime.GetCurrentRuntimeMetadata("PASSWORD"), + command2 = fmt.Sprintf("rsync -ah --progress %s %s@%s:%s", volume.GetLocalPath(), runtime.GetCurrentRuntimeMetadata("USER"), runtime.GetCurrentRuntimeMetadata("HOST_CLUSTER"), volume.GetRemotePath(), ) - // Sync remote to local - command3 := fmt.Sprintf("sshpass -p '%s' rsync -ah --progress %s@%s:%s %s", - runtime.GetCurrentRuntimeMetadata("PASSWORD"), + command3 = fmt.Sprintf("rsync -ah --progress %s@%s:%s %s", runtime.GetCurrentRuntimeMetadata("USER"), runtime.GetCurrentRuntimeMetadata("HOST_CLUSTER"), volume.GetRemotePath(), @@ -169,10 +179,11 @@ func (s *SDumontRuntimeService) syncWorkflowVolumes(wf workflow_entity.Workflow) commands = append(commands, fullCommands) } - s.connectorSDumont.ExecuteMultiplesCommand(commands) + s.connectorHPCRuntime.ExecuteMultiplesCommand(commands) + } -func (s *SDumontRuntimeService) extractJobID(outputCommand string) (string, error) { +func (s *HPCRuntimeService) extractJobID(outputCommand string) (string, error) { reOutput := regexp.MustCompile(`(?m)(\d+)`) matchOutput := reOutput.FindStringSubmatch(outputCommand) @@ -186,16 +197,18 @@ func (s *SDumontRuntimeService) extractJobID(outputCommand string) (string, erro return logsOutput, nil } -func (s *SDumontRuntimeService) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { - config.App().Logger.Infof("WORKER: Verify activities was finished in SDumont Runtime") +func (s *HPCRuntimeService) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { + config.App().Logger.Infof("WORKER: Verify activities was finished in HPCRuntime") for _, activity := range workflow.Spec.Activities { - s.handleVerifyActivityWasFinished(activity, workflow) + if activity.GetRuntimeId() == s.runtimeName { + s.handleVerifyActivityWasFinished(activity, workflow) + } } return true } -func (s *SDumontRuntimeService) handleVerifyActivityWasFinished(activity workflow_activity_entity.WorkflowActivities, wf workflow_entity.Workflow) int { +func (s *HPCRuntimeService) handleVerifyActivityWasFinished(activity workflow_activity_entity.WorkflowActivities, wf workflow_entity.Workflow) int { println("Verifying activity: ", activity.Name, " with id: ", activity.Id) wfaDatabase, _ := s.activityRepository.Find(activity.Id) @@ -208,39 +221,50 @@ func (s *SDumontRuntimeService) handleVerifyActivityWasFinished(activity workflo return activity_repository.StatusCreated } - command := fmt.Sprintf(" sacct -j %s --format=JobID,JobName,Partition,Account,AllocCPUs,State,ExitCode --noheader | grep akoflow", wfaDatabase.GetProcId()) + runtime, err := s.runtimeRepository.GetByName(activity.GetRuntimeId()) + + if err != nil { + config.App().Logger.Infof("WORKER: Error getting runtime from database.") + return activity_repository.StatusRunning + } + + if wfaDatabase.GetProcId() == "" { + config.App().Logger.Infof("WORKER: Activity %d has no process ID", activity.Id) + return activity_repository.StatusRunning + } + + command := fmt.Sprintf("scontrol show job %s", wfaDatabase.GetProcId()) - output, _ := s.connectorSDumont.RunCommandWithOutputRemote(command) + output, _ := s.connectorHPCRuntime.SetRuntime(*runtime).RunCommandWithOutputRemote(command) - saactResponse, err := s.extractSacctJobID(output) + scontrolResponse, err := s.extractScontrolJob(output) if err != nil { config.App().Logger.Infof("WORKER: Error extracting job ID %s", strings.TrimSpace(wfaDatabase.GetProcId())) return activity_repository.StatusRunning } - - if saactResponse.State == "COMPLETED" { + if scontrolResponse.State == "COMPLETED" { config.App().Logger.Infof("WORKER: Activity %d finished", activity.Id) s.syncWorkflowVolumes(wf) _ = s.activityRepository.UpdateStatus(activity.Id, activity_repository.StatusFinished) return activity_repository.StatusFinished } - if saactResponse.State == "FAILED" || saactResponse.State == "CANCELLED+" || saactResponse.State == "CANCELLED" || saactResponse.State == "DEADLINE" || saactResponse.State == "TIMEOUT" || saactResponse.State == "OUT_OF_MEM+" { + if scontrolResponse.State == "FAILED" || scontrolResponse.State == "CANCELLED+" || scontrolResponse.State == "CANCELLED" || scontrolResponse.State == "DEADLINE" || scontrolResponse.State == "TIMEOUT" || scontrolResponse.State == "OUT_OF_MEM+" { config.App().Logger.Infof("WORKER: Activity %d failed", activity.Id) s.syncWorkflowVolumes(wf) _ = s.activityRepository.UpdateStatus(activity.Id, activity_repository.StatusFinished) return activity_repository.StatusFinished } - if saactResponse.State == "RUNNING" { + if scontrolResponse.State == "RUNNING" { config.App().Logger.Infof("WORKER: Activity %d running", activity.Id) _ = s.activityRepository.UpdateStatus(activity.Id, activity_repository.StatusRunning) return activity_repository.StatusRunning } - if saactResponse.State == "PENDING" { + if scontrolResponse.State == "PENDING" { config.App().Logger.Infof("WORKER: Activity %d pending", activity.Id) _ = s.activityRepository.UpdateStatus(activity.Id, activity_repository.StatusRunning) return activity_repository.StatusRunning @@ -260,39 +284,54 @@ type SaactResponse struct { ExitCode string `json:"ExitCode"` } -func (s *SDumontRuntimeService) extractSacctJobID(outputCommand string) (SaactResponse, error) { - reOutput := regexp.MustCompile(`(?m)(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(\S+)\s+(\d+:\d+)`) - match := reOutput.FindStringSubmatch(outputCommand) - - if len(match) == 0 { - return SaactResponse{}, fmt.Errorf("no match found") +func extractField(pattern, text string) (string, error) { + re := regexp.MustCompile(pattern) + match := re.FindStringSubmatch(text) + if len(match) < 2 { + return "", fmt.Errorf("field not found: %s", pattern) } + return match[1], nil +} - if len(match) < 8 { - return SaactResponse{}, fmt.Errorf("invalid output format") +func (s *HPCRuntimeService) extractScontrolJob(output string) (SaactResponse, error) { + var err error + resp := SaactResponse{} + + if resp.JobID, err = extractField(`JobId=(\d+)`, output); err != nil { + return resp, err + } + if resp.JobName, err = extractField(`JobName=([^\s]+)`, output); err != nil { + return resp, err + } + if resp.Partition, err = extractField(`Partition=([^\s]+)`, output); err != nil { + return resp, err + } + if resp.Account, err = extractField(`Account=([^\s]+|$begin:math:text$null$end:math:text$)`, output); err != nil { + return resp, err + } + if resp.AllocCPUs, err = extractField(`NumCPUs=(\d+)`, output); err != nil { + return resp, err + } + if resp.State, err = extractField(`JobState=([A-Z_]+)`, output); err != nil { + return resp, err + } + if resp.ExitCode, err = extractField(`ExitCode=(\d+:\d+)`, output); err != nil { + return resp, err } - return SaactResponse{ - JobID: match[1], - JobName: match[2], - Partition: match[3], - Account: match[4], - AllocCPUs: match[5], - State: match[6], - ExitCode: match[7], - }, nil + return resp, nil } -func (s *SDumontRuntimeService) HealthCheck(runtimeName string) bool { - config.App().Logger.Infof("WORKER: Health check SDumont Runtime") +func (s *HPCRuntimeService) HealthCheck(runtimeName string) bool { + config.App().Logger.Infof("WORKER: Health check HPCRuntime") - connected, err := s.connectorSDumont.IsVPNConnected() + connected, err := s.connectorHPCRuntime.IsVPNConnected() if err != nil { - config.App().Logger.Error("WORKER: Error checking VPN connection to SDumont Runtime.") + config.App().Logger.Error("WORKER: Error checking VPN connection to HPCRuntime.") return false } if !connected { - config.App().Logger.Error("WORKER: VPN is not connected to SDumont Runtime.") + config.App().Logger.Error("WORKER: VPN is not connected to HPCRuntime.") return false } @@ -309,17 +348,17 @@ func (s *SDumontRuntimeService) HealthCheck(runtimeName string) bool { } command := fmt.Sprintf("sinfo -p %s", runtime.GetCurrentRuntimeMetadata("QUEUE")) - output, err := s.connectorSDumont.SetRuntime(*runtime).RunCommandWithOutputRemote(command) + output, err := s.connectorHPCRuntime.SetRuntime(*runtime).RunCommandWithOutputRemote(command) if err != nil { runtime.Status = runtime_repository.STATUS_NOT_READY - config.App().Logger.Error("WORKER: Error running command in SDumont Runtime.") + config.App().Logger.Error("WORKER: Error running command in HPCRuntime.") return false } if strings.Contains(output, "No nodes available") { s.runtimeRepository.UpdateStatus(runtime, runtime_repository.STATUS_NOT_READY) - config.App().Logger.Error("WORKER: No nodes available in SDumont Runtime.") + config.App().Logger.Error("WORKER: No nodes available in HPCRuntime.") return false } diff --git a/pkg/server/runtimes/hpc_runtime/hpc_runtime_service/make_sbatch_hpc_activity_service.go b/pkg/server/runtimes/hpc_runtime/hpc_runtime_service/make_sbatch_hpc_activity_service.go new file mode 100644 index 0000000..e4e68ae --- /dev/null +++ b/pkg/server/runtimes/hpc_runtime/hpc_runtime_service/make_sbatch_hpc_activity_service.go @@ -0,0 +1,100 @@ +package hpc_runtime_service + +import ( + "encoding/base64" + "fmt" + "strings" + + "github.com/ovvesley/akoflow/pkg/server/entities/runtime_entity" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" + "github.com/ovvesley/akoflow/pkg/shared/utils/utils_read_file" +) + +func NewMakeSBatchHPCRuntimeActivityService() MakeSBatchHPCRuntimeActivityService { + return MakeSBatchHPCRuntimeActivityService{ + singularityCommand: "", + } +} + +type MakeSBatchHPCRuntimeActivityService struct { + singularityCommand string + runtime runtime_entity.Runtime +} + +func (m MakeSBatchHPCRuntimeActivityService) SetSingularityCommand(singularityCommand string) MakeSBatchHPCRuntimeActivityService { + m.singularityCommand = singularityCommand + return m +} + +func (m MakeSBatchHPCRuntimeActivityService) SetRuntime(runtime runtime_entity.Runtime) MakeSBatchHPCRuntimeActivityService { + m.runtime = runtime + return m +} + +func (m MakeSBatchHPCRuntimeActivityService) GetSingularityCommand() string { + return m.singularityCommand +} +func (m MakeSBatchHPCRuntimeActivityService) GetTemplateSbatch() string { + + templateSbatchb64 := m.runtime.GetCurrentRuntimeMetadata("SBATCHTEMPLATE") + + if templateSbatchb64 == "" { + defaultTemplate := utils_read_file.New().ReadFile(fmt.Sprintf("%s/pkg/server/engine/scripts/default-slurm.sbatch", utils_read_file.New().GetRootProjectPath())) + if defaultTemplate == "" { + fmt.Println("Error reading default sbatch template") + return "" + } + return string(defaultTemplate) + } + + templateSbatchBytes, err := base64.StdEncoding.DecodeString(templateSbatchb64) + if err != nil { + fmt.Println("Error decoding sbatch template:", err) + return "" + } + templateSbatch := string(templateSbatchBytes) + return templateSbatch +} + +func (m MakeSBatchHPCRuntimeActivityService) Handle(workflow workflow_entity.Workflow, activity workflow_activity_entity.WorkflowActivities) string { + + templateSbatch := m.GetTemplateSbatch() + + jobName := fmt.Sprintf("akoflow_%d_%d", workflow.GetId(), activity.GetId()) + + output := fmt.Sprintf("%s/akoflow_out_%d_%d.out", m.runtime.GetCurrentRuntimeMetadata("MOUNT_PATH"), workflow.GetId(), activity.GetId()) + error := fmt.Sprintf("%s/akoflow_err_%d_%d.err", m.runtime.GetCurrentRuntimeMetadata("MOUNT_PATH"), workflow.GetId(), activity.GetId()) + time := m.runtime.GetCurrentRuntimeMetadata("TIME") + partition := m.runtime.GetCurrentRuntimeMetadata("QUEUE") + ntasks := m.runtime.GetCurrentRuntimeMetadata("NTASKS") + nodes := m.runtime.GetCurrentRuntimeMetadata("NODES") + gpus := m.runtime.GetCurrentRuntimeMetadata("GPUS") + cpusPerGpu := m.runtime.GetCurrentRuntimeMetadata("CPUS_PER_GPU") + mem := m.runtime.GetCurrentRuntimeMetadata("MEM") + wrap := m.GetSingularityCommand() + + templateSbatch = strings.ReplaceAll(templateSbatch, "#JOB_NAME#", jobName) + templateSbatch = strings.ReplaceAll(templateSbatch, "#OUTPUT#", output) + templateSbatch = strings.ReplaceAll(templateSbatch, "#ERROR#", error) + templateSbatch = strings.ReplaceAll(templateSbatch, "#TIME#", time) + templateSbatch = strings.ReplaceAll(templateSbatch, "#PARTITION#", partition) + templateSbatch = strings.ReplaceAll(templateSbatch, "#NTASKS#", ntasks) + templateSbatch = strings.ReplaceAll(templateSbatch, "#NODES#", nodes) + templateSbatch = strings.ReplaceAll(templateSbatch, "#GPUS#", gpus) + templateSbatch = strings.ReplaceAll(templateSbatch, "#CPUS_PER_GPU#", cpusPerGpu) + templateSbatch = strings.ReplaceAll(templateSbatch, "#MEM#", mem) + templateSbatch = strings.ReplaceAll(templateSbatch, "#COMMAND#", wrap) + + templateSbatchBase64 := base64.StdEncoding.EncodeToString([]byte(templateSbatch)) + + templateSbatch = fmt.Sprintf("echo %s | base64 -d", templateSbatchBase64) + + command := templateSbatch + "| sbatch" + + commandBase64 := base64.StdEncoding.EncodeToString([]byte(command)) + + command = fmt.Sprintf("echo %s | base64 -d | bash", commandBase64) + + return command +} diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime.go index bde0044..3502c95 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime.go @@ -13,6 +13,7 @@ type KubernetesRuntime struct { kubernetesRuntimeService *kubernetes_runtime_service.KubernetesRuntimeService runtimeName string + runtimeType string } func New() *KubernetesRuntime { @@ -27,6 +28,11 @@ func (k *KubernetesRuntime) SetRuntimeName(name string) *KubernetesRuntime { return k } +func (k *KubernetesRuntime) SetRuntimeType(runtimeType string) *KubernetesRuntime { + k.runtimeType = runtimeType + return k +} + func (k *KubernetesRuntime) GetRuntimeName() string { return k.runtimeName } @@ -63,7 +69,10 @@ func (k *KubernetesRuntime) GetStatus(workflowID int, activityID int) string { } func (k *KubernetesRuntime) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { - k.kubernetesRuntimeService.VerifyActivitiesWasFinished(workflow) + k.kubernetesRuntimeService. + SetRuntimeName(k.runtimeName). + SetRuntimeType(k.runtimeType). + VerifyActivitiesWasFinished(workflow) return true } diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go index c003202..f0e5179 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go @@ -12,6 +12,9 @@ type KubernetesRuntimeService struct { namespace string workflowRepository workflow_repository.IWorkflowRepository activityRepository activity_repository.IActivityRepository + + runtimeName string + runtimeType string } func New() *KubernetesRuntimeService { @@ -22,6 +25,16 @@ func New() *KubernetesRuntimeService { } } +func (k *KubernetesRuntimeService) SetRuntimeName(name string) *KubernetesRuntimeService { + k.runtimeName = name + return k +} + +func (k *KubernetesRuntimeService) SetRuntimeType(runtimeType string) *KubernetesRuntimeService { + k.runtimeType = runtimeType + return k +} + func (k *KubernetesRuntimeService) ApplyJob(activityID int) { wfa, err := k.activityRepository.Find(activityID) @@ -45,7 +58,10 @@ func (k *KubernetesRuntimeService) ApplyJob(activityID int) { } func (k *KubernetesRuntimeService) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) { - NewMonitorVerifyActivityWasFinishedService().VerifyActivities(workflow) + NewMonitorVerifyActivityWasFinishedService(). + SetRuntimeName(k.runtimeName). + SetRuntimeType(k.runtimeType). + VerifyActivities(workflow) } func (k *KubernetesRuntimeService) GetLogs(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) { diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go index 566d129..7468641 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go @@ -70,10 +70,12 @@ func (m *MakeK8sActivityService) makeContainerCommandActivity(wf workflow_entity func (m *MakeK8sActivityService) setupCommandWorkdir(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) string { - workdir := wf.Spec.MountPath + "/" + wfa.GetName() + mountPath := m.makeJobVolumeMountPath(wf, wfa) + + workdir := mountPath + "/" + wfa.GetName() if wf.IsStoragePolicyDistributed() { - workdir = wf.Spec.MountPath + workdir = mountPath } command := "mkdir -p " + workdir + "; \n" @@ -128,6 +130,11 @@ func (m *MakeK8sActivityService) addCommandToMonitorDiskSpecStorage(command stri // the name of the activity should be lower case and without spaces, because it will be used as a directory name. func (m *MakeK8sActivityService) makeJobVolumeMountPath(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) string { + mountPath := wfa.GetMountPath() + if mountPath != "" { + return mountPath + } + if wf.IsStoragePolicyDistributed() { return wf.Spec.MountPath } diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/monitor_verify_activity_was_finished_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/monitor_verify_activity_was_finished_service.go index e9a77b7..80b7dad 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/monitor_verify_activity_was_finished_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/monitor_verify_activity_was_finished_service.go @@ -16,6 +16,9 @@ import ( type MonitorVerifyActivityWasFinishedService struct { namespace string + runtimeName string + runtimeType string + activityRepository activity_repository.IActivityRepository runtimeRepository runtime_repository.IRuntimeRepository logsRepository logs_repository.ILogsRepository @@ -23,6 +26,16 @@ type MonitorVerifyActivityWasFinishedService struct { connector connector_k8s.IConnector } +func (m *MonitorVerifyActivityWasFinishedService) SetRuntimeName(name string) *MonitorVerifyActivityWasFinishedService { + m.runtimeName = name + return m +} + +func (m *MonitorVerifyActivityWasFinishedService) SetRuntimeType(runtimeType string) *MonitorVerifyActivityWasFinishedService { + m.runtimeType = runtimeType + return m +} + func NewMonitorVerifyActivityWasFinishedService() *MonitorVerifyActivityWasFinishedService { return &MonitorVerifyActivityWasFinishedService{ namespace: "akoflow", @@ -51,6 +64,7 @@ func (m *MonitorVerifyActivityWasFinishedService) handleVerifyPreActivityWasFini preactivity, _ := m.activityRepository.FindPreActivity(activity.Id) runtime, err := m.runtimeRepository.GetByName(activity.GetRuntimeId()) + if err != nil { return activity_repository.StatusCreated } @@ -119,6 +133,10 @@ func (m *MonitorVerifyActivityWasFinishedService) handleVerifyActivityWasFinishe } runtime, err := m.runtimeRepository.GetByName(activity.GetRuntimeId()) + + if runtime.GetName() != m.runtimeName { + return activity_repository.StatusRunning + } if err != nil { return activity_repository.StatusCreated } diff --git a/pkg/server/runtimes/local_runtime/local_runtime.go b/pkg/server/runtimes/local_runtime/local_runtime.go new file mode 100644 index 0000000..218c06e --- /dev/null +++ b/pkg/server/runtimes/local_runtime/local_runtime.go @@ -0,0 +1,72 @@ +package local_runtime + +import ( + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" + "github.com/ovvesley/akoflow/pkg/server/runtimes/local_runtime/local_runtime_service" +) + +type LocalRuntime struct { + LocalRuntimeService local_runtime_service.LocalRuntimeService + runtimeType string + runtimeName string +} + +func (d *LocalRuntime) SetRuntimeType(runtimeType string) *LocalRuntime { + d.runtimeType = runtimeType + return d +} + +func (d *LocalRuntime) SetRuntimeName(name string) *LocalRuntime { + d.runtimeName = name + return d +} + +func (d *LocalRuntime) StartConnection() error { + return nil +} + +func (d *LocalRuntime) StopConnection() error { + return nil +} + +func (d *LocalRuntime) ApplyJob(workflowID int, activityID int) bool { + // apply job in local runtime + d.LocalRuntimeService.ApplyJob(workflowID, activityID) + + return true +} + +func (d *LocalRuntime) DeleteJob(workflowID int, activityID int) bool { + return true +} + +func (d *LocalRuntime) GetMetrics(workflowID int, activityID int) string { + return "" +} + +func (d *LocalRuntime) GetLogs(workflow workflow_entity.Workflow, workflowActivity workflow_activity_entity.WorkflowActivities) string { + return "" +} + +func (d *LocalRuntime) GetStatus(workflowID int, activityID int) string { + return "" +} + +func (k *LocalRuntime) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { + k.LocalRuntimeService. + SetRuntimeName(k.runtimeName). + SetRuntimeType(k.runtimeType). + VerifyActivitiesWasFinished(workflow) + return true +} + +func (d *LocalRuntime) HealthCheck() bool { + return true +} + +func NewLocalRuntime() *LocalRuntime { + return &LocalRuntime{ + LocalRuntimeService: local_runtime_service.NewLocalRuntimeService(), + } +} diff --git a/pkg/server/runtimes/local_runtime/local_runtime_service/local_runtime_service.go b/pkg/server/runtimes/local_runtime/local_runtime_service/local_runtime_service.go new file mode 100644 index 0000000..7728c18 --- /dev/null +++ b/pkg/server/runtimes/local_runtime/local_runtime_service/local_runtime_service.go @@ -0,0 +1,252 @@ +package local_runtime_service + +import ( + "encoding/base64" + "fmt" + "regexp" + "strings" + "time" + + "github.com/ovvesley/akoflow/pkg/server/config" + "github.com/ovvesley/akoflow/pkg/server/connector/connector_local" + "github.com/ovvesley/akoflow/pkg/server/database/repository/activity_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/logs_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/metrics_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_repository" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" + "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" + "github.com/ovvesley/akoflow/pkg/server/runtimes/singularity_runtime/singularity_runtime_service" +) + +type LocalRuntimeService struct { + activityRepository activity_repository.IActivityRepository + workflowRepository workflow_repository.IWorkflowRepository + metricsRepository metrics_repository.IMetricsRepository + logsRepository logs_repository.ILogsRepository + + localConnector connector_local.IConnectorLocal + + runtimeName string + runtimeType string +} + +func (s *LocalRuntimeService) SetRuntimeName(name string) *LocalRuntimeService { + s.runtimeName = name + return s +} + +func (s *LocalRuntimeService) SetRuntimeType(runtimeType string) *LocalRuntimeService { + s.runtimeType = runtimeType + return s +} + +func NewLocalRuntimeService() LocalRuntimeService { + return LocalRuntimeService{ + activityRepository: config.App().Repository.ActivityRepository, + workflowRepository: config.App().Repository.WorkflowRepository, + metricsRepository: config.App().Repository.MetricsRepository, + logsRepository: config.App().Repository.LogsRepository, + + localConnector: config.App().Connector.LocalConnector, + } +} + +func (s *LocalRuntimeService) ApplyJob(workflowID int, activityID int) { + wfa, err := s.activityRepository.Find(activityID) + wf, _ := s.workflowRepository.Find(wfa.WorkflowId) + + if err != nil { + config.App().Logger.Infof("WORKER: Activity not found %d", activityID) + return + } + + localSystemCall := s.makeLocalActivity(wf, wfa) + + pid, _ := s.localConnector.RunCommand(localSystemCall) + + fmt.Println("PID: ", pid) + + err = s.workflowRepository.UpdateStatus(wfa.WorkflowId, workflow_repository.StatusRunning) + + if err != nil { + config.App().Logger.Infof("WORKER: Error updating workflow status %d", wfa.WorkflowId) + return + } + + _ = s.activityRepository.UpdateStatus(wfa.Id, activity_repository.StatusRunning) + err = s.activityRepository.UpdateProcID(wfa.Id, pid) + + if err != nil { + config.App().Logger.Infof("WORKER: Error updating activity status %d", activityID) + return + } + + config.App().Logger.Infof("WORKER: Running local command %s", localSystemCall) + +} + +func (s *LocalRuntimeService) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) { + for _, activity := range workflow.Spec.Activities { + if activity.Status != activity_repository.StatusRunning { + continue + } + if activity.GetRuntimeId() != s.runtimeName { + continue + } + + pid := activity.ProcId + + akfMonitorBashScript, err := singularity_runtime_service.NewAkfMonitorSingularity(). + SetWorkflow(workflow). + SetWorkflowActivity(activity). + GetScript() + + if err != nil { + config.App().Logger.Infof("WORKER: Error creating akf monitor script %s", pid) + return + } + + commandBase64 := base64.StdEncoding.EncodeToString([]byte(akfMonitorBashScript)) + commandFinal := "echo " + commandBase64 + " | base64 -d | bash" + + outputCommand, _ := s.localConnector.RunCommandWithOutput(commandFinal) + + if s.ProcessCompleted(outputCommand) { + s.handleProcessCompleted(workflow, activity, pid, outputCommand) + return + } + + s.handleProcessRunning(workflow, activity, pid, outputCommand) + + } +} + +func (s *LocalRuntimeService) handleProcessCompleted(_ workflow_entity.Workflow, workflowActivity workflow_activity_entity.WorkflowActivities, __ string, ___ string) { + s.activityRepository.UpdateStatus(workflowActivity.GetId(), activity_repository.StatusFinished) +} + +func (s *LocalRuntimeService) handleProcessRunning(_ workflow_entity.Workflow, workflowActivity workflow_activity_entity.WorkflowActivities, __ string, outputCommand string) { + totalCPU, totalMEM, err := s.ExtractMetrics(outputCommand) + + activityID := workflowActivity.GetId() + + if err != nil { + config.App().Logger.Infof("WORKER: Error extracting metrics %d", activityID) + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05") + + err = s.metricsRepository.Create(metrics_repository.ParamsMetricsCreate{ + MetricsDatabase: metrics_repository.MetricsDatabase{ + ActivityId: activityID, + Cpu: totalCPU, + Memory: totalMEM, + Window: "1s", + Timestamp: timestamp, + }, + }) + + if err != nil { + config.App().Logger.Infof("WORKER: Error updating metrics %d", activityID) + return + } + + logsOutput, logsErr, err := s.ExtractLogs(outputCommand) + + if err != nil { + config.App().Logger.Infof("WORKER: Error extracting logs %d", activityID) + return + } + + if logsOutput != "" { + s.logsRepository.Create(logs_repository.ParamsLogsCreate{ + LogsDatabase: logs_repository.LogsDatabase{ + ActivityId: activityID, + Logs: logsOutput, + }, + }) + + } + + if logsErr != "" { + s.logsRepository.Create(logs_repository.ParamsLogsCreate{ + LogsDatabase: logs_repository.LogsDatabase{ + ActivityId: activityID, + Logs: logsErr, + }, + }) + } + +} + +func (s *LocalRuntimeService) ExtractLogs(outputCommand string) (string, string, error) { + reOutput := regexp.MustCompile(`(?s)##START_LOG_OUTPUT##(.*)##END_LOG_OUTPUT##`) + reError := regexp.MustCompile(`(?s)##START_LOG_ERROR##(.*)##END_LOG_ERROR##`) + + matchOutput := reOutput.FindStringSubmatch(outputCommand) + matchError := reError.FindStringSubmatch(outputCommand) + + var logsOutput string + var logsErr string + + if len(matchOutput) > 1 { + logsOutput = strings.TrimSpace(matchOutput[1]) + } + + if len(matchError) > 1 { + logsErr = strings.TrimSpace(matchError[1]) + } + + return logsOutput, logsErr, nil +} + +func (s *LocalRuntimeService) ProcessCompleted(outputCommand string) bool { + if strings.Contains(outputCommand, "#NO_PROCESS_FOUND") { + config.App().Logger.Infof("WORKER: No process found in the output") + return true + } + return false +} + +func (s *LocalRuntimeService) ExtractMetrics(metrics string) (string, string, error) { + var re = regexp.MustCompile(`(?s)TOTAL_CPU=\((.*?)%\).*?TOTAL_MEM=\((.*?)%`) + var str = metrics + matches := re.FindStringSubmatch(str) + + if len(matches) == 0 { + return "", "", fmt.Errorf("no metrics found") + } + + totalCpu := matches[1] + totalMem := matches[2] + + return totalCpu, totalMem, nil +} + +func (s *LocalRuntimeService) makeLocalActivity(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) string { + command := wfa.Run + commandBase64 := base64.StdEncoding.EncodeToString([]byte(command)) + commandFinal := "echo " + commandBase64 + " | base64 -d | bash" + + mountPath := wfa.GetMountPath() + if mountPath == "" { + mountPath = wf.Spec.MountPath + } + + strOutFile := fmt.Sprintf("%s/akoflow_out%s_%s.out", + mountPath, + fmt.Sprintf("%d", wfa.WorkflowId), + fmt.Sprintf("%d", wfa.Id), + ) + + strErrFile := fmt.Sprintf("%s/akoflow_err%s_%s.err", + mountPath, + fmt.Sprintf("%d", wfa.WorkflowId), + fmt.Sprintf("%d", wfa.Id), + ) + + commandFinal = fmt.Sprintf("%s > %s 2> %s", commandFinal, strOutFile, strErrFile) + + return commandFinal +} diff --git a/pkg/server/runtimes/runtime.go b/pkg/server/runtimes/runtime.go index 210ccf5..bfb8d13 100755 --- a/pkg/server/runtimes/runtime.go +++ b/pkg/server/runtimes/runtime.go @@ -8,15 +8,17 @@ import ( "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" "github.com/ovvesley/akoflow/pkg/server/runtimes/docker_runtime" + "github.com/ovvesley/akoflow/pkg/server/runtimes/hpc_runtime" "github.com/ovvesley/akoflow/pkg/server/runtimes/kubernetes_runtime" - "github.com/ovvesley/akoflow/pkg/server/runtimes/sdumont_runtime" + "github.com/ovvesley/akoflow/pkg/server/runtimes/local_runtime" "github.com/ovvesley/akoflow/pkg/server/runtimes/singularity_runtime" ) const RUNTIME_K8S = "k8s" const RUNTIME_DOCKER = "docker" const RUNTIME_SINGULARITY = "singularity" -const RUNTIME_SINGULARITY_SDUMONT = "sdumont" +const RUNTIME_HPC = "hpc" +const RUNTIME_LOCAL = "local" type IRuntime interface { StartConnection() error @@ -42,8 +44,8 @@ func normalizeRuntime(runtime string) string { return RUNTIME_K8S } - if strings.HasPrefix(runtime, "sdumont") { - return RUNTIME_SINGULARITY_SDUMONT + if strings.HasPrefix(runtime, "hpc") { + return RUNTIME_HPC } return runtime @@ -55,10 +57,21 @@ func GetRuntimeInstance(runtimeName string) IRuntime { runtime := normalizeRuntime(runtimeName) modeMap := map[string]IRuntime{ - RUNTIME_DOCKER: docker_runtime.NewDockerRuntime(), - RUNTIME_K8S: kubernetes_runtime.NewKubernetesRuntime().SetRuntimeName(runtimeName), - RUNTIME_SINGULARITY: singularity_runtime.NewSingularityRuntime(), - RUNTIME_SINGULARITY_SDUMONT: sdumont_runtime.NewSdumontRuntime().SetRuntimeName(runtimeName), + RUNTIME_DOCKER: docker_runtime.NewDockerRuntime(), + + RUNTIME_K8S: kubernetes_runtime.NewKubernetesRuntime(). + SetRuntimeType(RUNTIME_K8S). + SetRuntimeName(runtimeName), + + RUNTIME_SINGULARITY: singularity_runtime.NewSingularityRuntime(), + + RUNTIME_HPC: hpc_runtime.NewHpcRuntime(). + SetRuntimeType(RUNTIME_HPC). + SetRuntimeName(runtimeName), + + RUNTIME_LOCAL: local_runtime.NewLocalRuntime(). + SetRuntimeType(RUNTIME_LOCAL). + SetRuntimeName(runtimeName), } if modeMap[runtime] == nil { config.App().Logger.Error(fmt.Sprintf("Runtime not found: %s", runtimeName)) diff --git a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime.go b/pkg/server/runtimes/sdumont_runtime/sdumont_runtime.go deleted file mode 100755 index 5e57c1e..0000000 --- a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime.go +++ /dev/null @@ -1,62 +0,0 @@ -package sdumont_runtime - -import ( - "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" - "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" - "github.com/ovvesley/akoflow/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go" -) - -func NewSdumontRuntime() *SdumontRuntime { - return &SdumontRuntime{ - sDumontRuntimeService: sdumont_runtime_service.New(), - } -} - -type SdumontRuntime struct { - sDumontRuntimeService *sdumont_runtime_service.SDumontRuntimeService - runtimeName string -} - -func (s *SdumontRuntime) StartConnection() error { - return nil -} - -func (s *SdumontRuntime) StopConnection() error { - return nil -} - -func (s *SdumontRuntime) SetRuntimeName(runtimeName string) *SdumontRuntime { - s.runtimeName = runtimeName - return s -} - -func (s *SdumontRuntime) ApplyJob(workflowID int, activityID int) bool { - s.sDumontRuntimeService.ApplyJob(workflowID, activityID) - return true -} - -func (s *SdumontRuntime) DeleteJob(workflowID int, activityID int) bool { - return true -} - -func (s *SdumontRuntime) GetMetrics(workflowID int, activityID int) string { - return "" -} - -func (s *SdumontRuntime) GetLogs(workflow workflow_entity.Workflow, workflowActivity workflow_activity_entity.WorkflowActivities) string { - return "" -} - -func (s *SdumontRuntime) GetStatus(workflowID int, activityID int) string { - return "" -} - -func (s *SdumontRuntime) HealthCheck() bool { - s.sDumontRuntimeService.HealthCheck(s.runtimeName) - return true -} - -func (s *SdumontRuntime) VerifyActivitiesWasFinished(workflow workflow_entity.Workflow) bool { - s.sDumontRuntimeService.VerifyActivitiesWasFinished(workflow) - return true -} diff --git a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/make_sbatch_sdumont_activity_service.go b/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/make_sbatch_sdumont_activity_service.go deleted file mode 100644 index 1ba0085..0000000 --- a/pkg/server/runtimes/sdumont_runtime/sdumont_runtime_service.go/make_sbatch_sdumont_activity_service.go +++ /dev/null @@ -1,118 +0,0 @@ -package sdumont_runtime_service - -import ( - "encoding/base64" - "fmt" - - "github.com/ovvesley/akoflow/pkg/server/entities/runtime_entity" - "github.com/ovvesley/akoflow/pkg/server/entities/workflow_activity_entity" - "github.com/ovvesley/akoflow/pkg/server/entities/workflow_entity" -) - -func NewMakeSBatchSDumontActivityService() MakeSBatchSDumontActivityService { - return MakeSBatchSDumontActivityService{ - singularityCommand: "", - } -} - -type MakeSBatchSDumontActivityService struct { - singularityCommand string - runtime runtime_entity.Runtime -} - -func (m MakeSBatchSDumontActivityService) SetSingularityCommand(singularityCommand string) MakeSBatchSDumontActivityService { - m.singularityCommand = singularityCommand - return m -} - -func (m MakeSBatchSDumontActivityService) SetRuntime(runtime runtime_entity.Runtime) MakeSBatchSDumontActivityService { - m.runtime = runtime - return m -} - -func (m MakeSBatchSDumontActivityService) GetSingularityCommand() string { - return m.singularityCommand -} - -func (m MakeSBatchSDumontActivityService) Handle(workflow workflow_entity.Workflow, activity workflow_activity_entity.WorkflowActivities) string { - - if m.GetSingularityCommand() == "" { - fmt.Println("Singularity command is empty") - return "" - } - - jobName := fmt.Sprintf("akoflow_%d_%d", workflow.GetId(), activity.GetId()) - - output := fmt.Sprintf("%s/akoflow_out_%d_%d.out", - workflow.GetMountPath(), - workflow.GetId(), - activity.GetId(), - ) - - error := fmt.Sprintf("%s/akoflow_err_%d_%d.err", - workflow.GetMountPath(), - workflow.GetId(), - activity.GetId(), - ) - - time := m.runtime.GetCurrentRuntimeMetadata("TIME") - if time == "" { - time = "48:00:00" // 48 hours - } - - partition := m.runtime.GetCurrentRuntimeMetadata("QUEUE") - if partition == "" { - partition = "gdl" - } - - ntasksStr := m.runtime.GetCurrentRuntimeMetadata("NTASKS") - ntasks := 1 - if ntasksStr != "" { - fmt.Sscanf(ntasksStr, "%d", &ntasks) - } - - nodesStr := m.runtime.GetCurrentRuntimeMetadata("NODES") - nodes := 1 - if nodesStr != "" { - fmt.Sscanf(nodesStr, "%d", &nodes) - } - - gpusStr := m.runtime.GetCurrentRuntimeMetadata("GPUS") - gpus := 1 - if gpusStr != "" { - fmt.Sscanf(gpusStr, "%d", &gpus) - } - - cpusPerGpuStr := m.runtime.GetCurrentRuntimeMetadata("CPUS_PER_GPU") - cpusPerGpu := 1 - if cpusPerGpuStr != "" { - fmt.Sscanf(cpusPerGpuStr, "%d", &cpusPerGpu) - } - - mem := m.runtime.GetCurrentRuntimeMetadata("MEM") - if mem == "" { - mem = "8G" - } - - wrap := fmt.Sprintf("%s", m.GetSingularityCommand()) - - command := fmt.Sprintf("sbatch --job-name=%s --output=%s --error=%s --time=%s --partition=%s --ntasks=%d --nodes=%d --gpus=%d --cpus-per-gpu=%d --mem=%s --wrap=\"%s\"", - jobName, - output, - error, - time, - partition, - ntasks, - nodes, - gpus, - cpusPerGpu, - mem, - wrap, - ) - - base64ParcialCommand := base64.StdEncoding.EncodeToString([]byte(command)) - - commandBase64 := fmt.Sprintf("echo %s | base64 -d | bash", base64ParcialCommand) - - return commandBase64 -} diff --git a/pkg/server/runtimes/singularity_runtime/singularity_runtime_service/make_singularity_activity_service.go b/pkg/server/runtimes/singularity_runtime/singularity_runtime_service/make_singularity_activity_service.go index e88acd3..55e03b9 100755 --- a/pkg/server/runtimes/singularity_runtime/singularity_runtime_service/make_singularity_activity_service.go +++ b/pkg/server/runtimes/singularity_runtime/singularity_runtime_service/make_singularity_activity_service.go @@ -57,7 +57,7 @@ func (s *MakeSingularityActivityService) makeContainerCommandActivity(wf workflo return entryPoint } -func (s *MakeSingularityActivityService) MakeContainerCommandActivityToSDumont(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) string { +func (s *MakeSingularityActivityService) MakeContainerCommandActivityToHPC(wf workflow_entity.Workflow, wfa workflow_activity_entity.WorkflowActivities) string { mountPath := wf.GetMountPath() imageSifPath := wf.Spec.Image