Merge branch '9766-register-workflow' closes #9766
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 1d9015d5051f0a678a834ba56781377d39a4b96c..b33dc64e7bd7bf7b854fde1a55d0a388b32407db 100644 (file)
@@ -9,6 +9,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "github.com/coreos/go-systemd/daemon"
        "io"
        "io/ioutil"
        "log"
@@ -21,7 +22,7 @@ import (
 
 // Config used by crunch-dispatch-slurm
 type Config struct {
-       arvados.Client
+       Client arvados.Client
 
        SbatchArguments []string
        PollPeriod      arvados.Duration
@@ -105,6 +106,10 @@ func doMain() error {
                PollInterval:   time.Duration(config.PollPeriod),
                DoneProcessing: make(chan struct{})}
 
+       if _, err := daemon.SdNotify("READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
+       }
+
        err = dispatcher.RunDispatcher()
        if err != nil {
                return err
@@ -146,10 +151,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                        // OK, no cleanup needed
                        return
                }
-               err := dispatcher.Arv.Update("containers", container.UUID,
-                       arvadosclient.Dict{
-                               "container": arvadosclient.Dict{"state": "Queued"}},
-                       nil)
+               err := dispatcher.Unlock(container.UUID)
                if err != nil {
                        log.Printf("Error unlocking container %s: %v", container.UUID, err)
                }
@@ -242,7 +244,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
                                log.Printf("Error submitting container %s to slurm: %v",
                                        container.UUID, err)
                                // maybe sbatch is broken, put it back to queued
-                               dispatcher.UpdateState(container.UUID, dispatch.Queued)
+                               dispatcher.Unlock(container.UUID)
                        }
                        submitted = true
                } else {
@@ -258,20 +260,20 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
                                log.Printf("Error getting final container state: %v", err)
                        }
 
-                       var st arvados.ContainerState
                        switch con.State {
                        case dispatch.Locked:
-                               st = dispatch.Queued
+                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+                                       container.UUID, con.State, dispatch.Queued)
+                               dispatcher.Unlock(container.UUID)
                        case dispatch.Running:
-                               st = dispatch.Cancelled
+                               st := dispatch.Cancelled
+                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+                                       container.UUID, con.State, st)
+                               dispatcher.UpdateState(container.UUID, st)
                        default:
                                // Container state is Queued, Complete or Cancelled so stop monitoring it.
                                return
                        }
-
-                       log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-                               container.UUID, con.State, st)
-                       dispatcher.UpdateState(container.UUID, st)
                }
        }
 }