Merge branch '9613-user-profile-string-keys'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index f718fbcdcea3fd5c00ab8763240ee3056f098a53..0bf30dad9ff3789465e8413a56414eb83fb7bd96 100644 (file)
@@ -5,6 +5,7 @@ package main
 import (
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "io/ioutil"
@@ -69,17 +70,16 @@ func doMain() error {
 }
 
 // sbatchCmd
-func sbatchFunc(container dispatch.Container) *exec.Cmd {
-       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
-       return exec.Command("sbatch", "--share", "--parsable",
+func sbatchFunc(container arvados.Container) *exec.Cmd {
+       memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
+       return exec.Command("sbatch", "--share",
                fmt.Sprintf("--job-name=%s", container.UUID),
                fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
-               fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
-               fmt.Sprintf("--priority=%d", container.Priority))
+               fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
 }
 
 // scancelCmd
-func scancelFunc(container dispatch.Container) *exec.Cmd {
+func scancelFunc(container arvados.Container) *exec.Cmd {
        return exec.Command("scancel", "--name="+container.UUID)
 }
 
@@ -89,9 +89,7 @@ var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
-       container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
-       submitErr = nil
-
+       container arvados.Container, crunchRunCommand string) (submitErr error) {
        defer func() {
                // If we didn't get as far as submitting a slurm job,
                // unlock the container and return it to the queue.
@@ -170,9 +168,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                return
        }
 
-       // If everything worked out, got the jobid on stdout
-       jobid = strings.TrimSpace(string(stdoutMsg))
-
+       log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
        return
 }
 
@@ -181,7 +177,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 //
 // If the container is marked as Running, check if it is in the slurm queue.
 // If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
        submitted := false
        for !*monitorDone {
                if squeueUpdater.CheckSqueue(container.UUID) {
@@ -193,7 +189,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
 
                        log.Printf("About to submit queued container %v", container.UUID)
 
-                       if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+                       if err := submit(dispatcher, container, *crunchRunCommand); err != nil {
                                log.Printf("Error submitting container %s to slurm: %v",
                                        container.UUID, err)
                                // maybe sbatch is broken, put it back to queued
@@ -207,13 +203,13 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
                        // release it back to the Queue, if it is Running then
                        // clean up the record.
 
-                       var con dispatch.Container
+                       var con arvados.Container
                        err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
                        if err != nil {
                                log.Printf("Error getting final container state: %v", err)
                        }
 
-                       var st string
+                       var st arvados.ContainerState
                        switch con.State {
                        case dispatch.Locked:
                                st = dispatch.Queued
@@ -236,8 +232,8 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
 // Monitor status updates.  If the priority changes to zero, cancel the
 // container using scancel.
 func run(dispatcher *dispatch.Dispatcher,
-       container dispatch.Container,
-       status chan dispatch.Container) {
+       container arvados.Container,
+       status chan arvados.Container) {
 
        log.Printf("Monitoring container %v started", container.UUID)
        defer log.Printf("Monitoring container %v finished", container.UUID)