Merge branch '10700-dispatch'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 19ab5aa2f7598efca21293afb3a3194320ea6f4d..1c080f36ac13133b12ad4308fb62d6f53549ded3 100644 (file)
@@ -6,17 +6,18 @@ import (
        "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
-       "github.com/coreos/go-systemd/daemon"
        "log"
        "math"
        "os"
        "os/exec"
        "strings"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "github.com/coreos/go-systemd/daemon"
 )
 
 // Config used by crunch-dispatch-slurm
@@ -31,6 +32,9 @@ type Config struct {
        //
        // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
        CrunchRunCommand []string
+
+       // Minimum time between two attempts to run the same container
+       MinRetryPeriod arvados.Duration
 }
 
 func main() {
@@ -41,8 +45,8 @@ func main() {
 }
 
 var (
-       theConfig     Config
-       squeueUpdater Squeue
+       theConfig Config
+       sqCheck   SqueueChecker
 )
 
 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -55,6 +59,10 @@ func doMain() error {
                "config",
                defaultConfigPath,
                "`path` to JSON or YAML configuration file")
+       dumpConfig := flag.Bool(
+               "dump-config",
+               false,
+               "write current configuration to stdout and exit")
 
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
@@ -88,6 +96,10 @@ func doMain() error {
                log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
 
+       if *dumpConfig {
+               log.Fatal(config.DumpAndExit(theConfig))
+       }
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("Error making Arvados client: %v", err)
@@ -95,13 +107,14 @@ func doMain() error {
        }
        arv.Retries = 25
 
-       squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
-       defer squeueUpdater.Done()
+       sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+       defer sqCheck.Stop()
 
        dispatcher := dispatch.Dispatcher{
-               Arv:          arv,
-               RunContainer: run,
-               PollInterval: time.Duration(theConfig.PollPeriod),
+               Arv:            arv,
+               RunContainer:   run,
+               PollPeriod:     time.Duration(theConfig.PollPeriod),
+               MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
@@ -164,8 +177,8 @@ func submit(dispatcher *dispatch.Dispatcher,
        cmd.Stderr = &stderr
 
        // Mutex between squeue sync and running sbatch or scancel.
-       squeueUpdater.SlurmLock.Lock()
-       defer squeueUpdater.SlurmLock.Unlock()
+       sqCheck.L.Lock()
+       defer sqCheck.L.Unlock()
 
        log.Printf("exec sbatch %+q", cmd.Args)
        err := cmd.Run()
@@ -188,7 +201,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
        submitted := false
        for !*monitorDone {
-               if squeueUpdater.CheckSqueue(container.UUID) {
+               if sqCheck.HasUUID(container.UUID) {
                        // Found in the queue, so continue monitoring
                        submitted = true
                } else if container.State == dispatch.Locked && !submitted {
@@ -250,28 +263,24 @@ func run(dispatcher *dispatch.Dispatcher,
        go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
        for container = range status {
-               if container.State == dispatch.Locked || container.State == dispatch.Running {
-                       if container.Priority == 0 {
-                               log.Printf("Canceling container %s", container.UUID)
-
-                               // Mutex between squeue sync and running sbatch or scancel.
-                               squeueUpdater.SlurmLock.Lock()
-                               cmd := scancelCmd(container)
-                               msg, err := cmd.CombinedOutput()
-                               squeueUpdater.SlurmLock.Unlock()
-
-                               if err != nil {
-                                       log.Printf("Error stopping container %s with %v %v: %v %v",
-                                               container.UUID, cmd.Path, cmd.Args, err, string(msg))
-                                       if squeueUpdater.CheckSqueue(container.UUID) {
-                                               log.Printf("Container %s is still in squeue after scancel.",
-                                                       container.UUID)
-                                               continue
-                                       }
-                               }
+               if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
+                       log.Printf("Canceling container %s", container.UUID)
+                       // Mutex between squeue sync and running sbatch or scancel.
+                       sqCheck.L.Lock()
+                       cmd := scancelCmd(container)
+                       msg, err := cmd.CombinedOutput()
+                       sqCheck.L.Unlock()
 
-                               err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+                       if err != nil {
+                               log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+                               if sqCheck.HasUUID(container.UUID) {
+                                       log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+                                       continue
+                               }
                        }
+
+                       // Ignore errors; if necessary, we'll try again next time
+                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
                }
        }
        monitorDone = true