Merge branch 'master' into 11850-singlecontainer-max-requirements
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index ff6e871af68343d7883e680725472a037e5768a0..ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af 100644 (file)
@@ -1,16 +1,18 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
        "log"
        "math"
        "os"
-       "os/exec"
        "regexp"
        "strings"
        "time"
@@ -22,6 +24,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // Config used by crunch-dispatch-slurm
 type Config struct {
        Client arvados.Client
@@ -37,9 +41,12 @@ type Config struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       slurm Slurm
 }
 
 func main() {
+       theConfig.slurm = &slurmCLI{}
        err := doMain()
        if err != nil {
                log.Fatal(err)
@@ -65,10 +72,21 @@ func doMain() error {
                "dump-config",
                false,
                "write current configuration to stdout and exit")
-
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-dispatch-slurm %s\n", version)
+               return nil
+       }
+
+       log.Printf("crunch-dispatch-slurm %s started", version)
+
        err := readConfig(&theConfig, *configPath)
        if err != nil {
                return err
@@ -147,8 +165,18 @@ func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueCheck
        }
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func niceness(priority int) int {
+       if priority > 1000 {
+               priority = 1000
+       }
+       if priority < 0 {
+               priority = 0
+       }
+       // Niceness range 1-10000
+       return (1000 - priority) * 10
+}
+
+func sbatchArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
        var disk int64
@@ -165,54 +193,27 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return exec.Command("sbatch", sbatchArgs...)
+       return sbatchArgs
 }
 
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-       cmd := sbatchCmd(container)
-
-       // Send a tiny script on stdin to execute the crunch-run
-       // command (slurm requires this to be a #! script)
-       cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
+       // append() here avoids modifying crunchRunCommand's
+       // underlying array, which is shared with other goroutines.
+       crArgs := append([]string(nil), crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := strings.NewReader(execScript(crArgs))
 
-       var stdout, stderr bytes.Buffer
-       cmd.Stdout = &stdout
-       cmd.Stderr = &stderr
-
-       // Mutex between squeue sync and running sbatch or scancel.
        sqCheck.L.Lock()
        defer sqCheck.L.Unlock()
 
-       log.Printf("exec sbatch %+q", cmd.Args)
-       err := cmd.Run()
-
-       switch err.(type) {
-       case nil:
-               log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-               return nil
-
-       case *exec.ExitError:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-       default:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("exec failed: %v", err)
-       }
+       sbArgs := sbatchArgs(container)
+       log.Printf("running sbatch %+q", sbArgs)
+       return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -273,6 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
+                       } else {
+                               renice(updated)
                        }
                }
        }
@@ -280,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
        sqCheck.L.Lock()
-       cmd := scancelCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Cancel(ctr.UUID)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
        } else if sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -293,6 +295,28 @@ func scancel(ctr arvados.Container) {
        }
 }
 
+func renice(ctr arvados.Container) {
+       nice := niceness(ctr.Priority)
+       oldnice := sqCheck.GetNiceness(ctr.UUID)
+       if nice == oldnice || oldnice == -1 {
+               return
+       }
+       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
+       sqCheck.L.Lock()
+       err := theConfig.slurm.Renice(ctr.UUID, nice)
+       sqCheck.L.Unlock()
+
+       if err != nil {
+               log.Printf("renice: %s", err)
+               time.Sleep(time.Second)
+               return
+       }
+       if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("container %s has arvados priority %d, slurm nice %d",
+                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+       }
+}
+
 func readConfig(dst interface{}, path string) error {
        err := config.LoadFile(dst, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {