Merge branch '9417-asserttrue-for-value-comparison'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 1dada2f104424cb17b085d0e97a95057754407a5..4bfff6a5f0ccfe15a5a5e452f4536c01693df976 100644 (file)
@@ -5,6 +5,7 @@ package main
 import (
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "io/ioutil"
@@ -69,21 +70,27 @@ func doMain() error {
 }
 
 // sbatchCmd
-func sbatchFunc(container dispatch.Container) *exec.Cmd {
-       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+func sbatchFunc(container arvados.Container) *exec.Cmd {
+       memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
        return exec.Command("sbatch", "--share", "--parsable",
                fmt.Sprintf("--job-name=%s", container.UUID),
                fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
-               fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+               fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
                fmt.Sprintf("--priority=%d", container.Priority))
 }
 
+// scancelCmd
+func scancelFunc(container arvados.Container) *exec.Cmd {
+       return exec.Command("scancel", "--name="+container.UUID)
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
+var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
-       container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
+       container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
        submitErr = nil
 
        defer func() {
@@ -175,13 +182,10 @@ func submit(dispatcher *dispatch.Dispatcher,
 //
 // If the container is marked as Running, check if it is in the slurm queue.
 // If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
        submitted := false
        for !*monitorDone {
-               if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-                       // Most recent run of squeue had an error, so do nothing.
-                       continue
-               } else if inQ {
+               if squeueUpdater.CheckSqueue(container.UUID) {
                        // Found in the queue, so continue monitoring
                        submitted = true
                } else if container.State == dispatch.Locked && !submitted {
@@ -204,13 +208,13 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
                        // release it back to the Queue, if it is Running then
                        // clean up the record.
 
-                       var con dispatch.Container
+                       var con arvados.Container
                        err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
                        if err != nil {
                                log.Printf("Error getting final container state: %v", err)
                        }
 
-                       var st string
+                       var st arvados.ContainerState
                        switch con.State {
                        case dispatch.Locked:
                                st = dispatch.Queued
@@ -233,8 +237,8 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
 // Monitor status updates.  If the priority changes to zero, cancel the
 // container using scancel.
 func run(dispatcher *dispatch.Dispatcher,
-       container dispatch.Container,
-       status chan dispatch.Container) {
+       container arvados.Container,
+       status chan arvados.Container) {
 
        log.Printf("Monitoring container %v started", container.UUID)
        defer log.Printf("Monitoring container %v finished", container.UUID)
@@ -249,15 +253,13 @@ func run(dispatcher *dispatch.Dispatcher,
 
                                // Mutex between squeue sync and running sbatch or scancel.
                                squeueUpdater.SlurmLock.Lock()
-                               err := exec.Command("scancel", "--name="+container.UUID).Run()
+                               err := scancelCmd(container).Run()
                                squeueUpdater.SlurmLock.Unlock()
 
                                if err != nil {
                                        log.Printf("Error stopping container %s with scancel: %v",
                                                container.UUID, err)
-                                       if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-                                               continue
-                                       } else if inQ {
+                                       if squeueUpdater.CheckSqueue(container.UUID) {
                                                log.Printf("Container %s is still in squeue after scancel.",
                                                        container.UUID)
                                                continue