Merge branch '9766-register-workflow' closes #9766
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index af2c42e9b2faf87d42c47e3e306d58598226640e..b33dc64e7bd7bf7b854fde1a55d0a388b32407db 100644 (file)
@@ -9,6 +9,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "github.com/coreos/go-systemd/daemon"
        "io"
        "io/ioutil"
        "log"
@@ -21,8 +22,10 @@ import (
 
 // Config used by crunch-dispatch-slurm
 type Config struct {
+       Client arvados.Client
+
        SbatchArguments []string
-       PollPeriod      *time.Duration
+       PollPeriod      arvados.Duration
 
        // crunch-run command to invoke. The container UUID will be
        // appended. If nil, []string{"crunch-run"} will be used.
@@ -47,17 +50,13 @@ const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
 
 func doMain() error {
        flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+       flags.Usage = func() { usage(flags) }
 
        configPath := flags.String(
                "config",
                defaultConfigPath,
                "`path` to json configuration file")
 
-       config.PollPeriod = flags.Duration(
-               "poll-interval",
-               10*time.Second,
-               "Time duration to poll for queued containers")
-
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
@@ -71,6 +70,26 @@ func doMain() error {
                config.CrunchRunCommand = []string{"crunch-run"}
        }
 
+       if config.PollPeriod == 0 {
+               config.PollPeriod = arvados.Duration(10 * time.Second)
+       }
+
+       if config.Client.APIHost != "" || config.Client.AuthToken != "" {
+               // Copy real configs into env vars so [a]
+               // MakeArvadosClient() uses them, and [b] they get
+               // propagated to crunch-run via SLURM.
+               os.Setenv("ARVADOS_API_HOST", config.Client.APIHost)
+               os.Setenv("ARVADOS_API_TOKEN", config.Client.AuthToken)
+               os.Setenv("ARVADOS_API_INSECURE", "")
+               if config.Client.Insecure {
+                       os.Setenv("ARVADOS_API_INSECURE", "1")
+               }
+               os.Setenv("ARVADOS_KEEP_SERVICES", "")
+               os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+       } else {
+               log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
+       }
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("Error making Arvados client: %v", err)
@@ -78,15 +97,19 @@ func doMain() error {
        }
        arv.Retries = 25
 
-       squeueUpdater.StartMonitor(*config.PollPeriod)
+       squeueUpdater.StartMonitor(time.Duration(config.PollPeriod))
        defer squeueUpdater.Done()
 
        dispatcher := dispatch.Dispatcher{
                Arv:            arv,
                RunContainer:   run,
-               PollInterval:   *config.PollPeriod,
+               PollInterval:   time.Duration(config.PollPeriod),
                DoneProcessing: make(chan struct{})}
 
+       if _, err := daemon.SdNotify("READY=1"); err != nil {
+               log.Printf("Error notifying init daemon: %v", err)
+       }
+
        err = dispatcher.RunDispatcher()
        if err != nil {
                return err
@@ -128,10 +151,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                        // OK, no cleanup needed
                        return
                }
-               err := dispatcher.Arv.Update("containers", container.UUID,
-                       arvadosclient.Dict{
-                               "container": arvadosclient.Dict{"state": "Queued"}},
-                       nil)
+               err := dispatcher.Unlock(container.UUID)
                if err != nil {
                        log.Printf("Error unlocking container %s: %v", container.UUID, err)
                }
@@ -224,7 +244,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
                                log.Printf("Error submitting container %s to slurm: %v",
                                        container.UUID, err)
                                // maybe sbatch is broken, put it back to queued
-                               dispatcher.UpdateState(container.UUID, dispatch.Queued)
+                               dispatcher.Unlock(container.UUID)
                        }
                        submitted = true
                } else {
@@ -240,20 +260,20 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
                                log.Printf("Error getting final container state: %v", err)
                        }
 
-                       var st arvados.ContainerState
                        switch con.State {
                        case dispatch.Locked:
-                               st = dispatch.Queued
+                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+                                       container.UUID, con.State, dispatch.Queued)
+                               dispatcher.Unlock(container.UUID)
                        case dispatch.Running:
-                               st = dispatch.Cancelled
+                               st := dispatch.Cancelled
+                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+                                       container.UUID, con.State, st)
+                               dispatcher.UpdateState(container.UUID, st)
                        default:
                                // Container state is Queued, Complete or Cancelled so stop monitoring it.
                                return
                        }
-
-                       log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-                               container.UUID, con.State, st)
-                       dispatcher.UpdateState(container.UUID, st)
                }
        }
 }