Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 296c0a3f40a0af1919f1951975f2c1a1ced81c12..3c89103f38dbc1cd094047d173c10bfe0b28f08e 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 // Dispatcher service for Crunch that submits containers to the slurm queue.
@@ -22,6 +26,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // Config used by crunch-dispatch-slurm
 type Config struct {
        Client arvados.Client
@@ -65,10 +71,21 @@ func doMain() error {
                "dump-config",
                false,
                "write current configuration to stdout and exit")
-
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-dispatch-slurm %s\n", version)
+               return nil
+       }
+
+       log.Printf("crunch-dispatch-slurm %s started", version)
+
        err := readConfig(&theConfig, *configPath)
        if err != nil {
                return err
@@ -147,6 +164,17 @@ func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueCheck
        }
 }
 
+func niceness(priority int) int {
+       if priority > 1000 {
+               priority = 1000
+       }
+       if priority < 0 {
+               priority = 0
+       }
+       // Niceness range 1-10000
+       return (1000 - priority) * 10
+}
+
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
@@ -165,6 +193,7 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@ -177,9 +206,15 @@ func scancelFunc(container arvados.Container) *exec.Cmd {
        return exec.Command("scancel", "--name="+container.UUID)
 }
 
+// scontrolCmd
+func scontrolFunc(container arvados.Container) *exec.Cmd {
+       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
 var scancelCmd = scancelFunc
+var scontrolCmd = scontrolFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
@@ -187,7 +222,12 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch
 
        // Send a tiny script on stdin to execute the crunch-run
        // command (slurm requires this to be a #! script)
-       cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
+
+       // append() here avoids modifying crunchRunCommand's
+       // underlying array, which is shared with other goroutines.
+       args := append([]string(nil), crunchRunCommand...)
+       args = append(args, container.UUID)
+       cmd.Stdin = strings.NewReader(execScript(args))
 
        var stdout, stderr bytes.Buffer
        cmd.Stdout = &stdout
@@ -227,7 +267,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                log.Printf("Submitting container %s to slurm", ctr.UUID)
                if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
                        text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
-                       log.Printf(text)
+                       log.Print(text)
 
                        lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                                "object_uuid": ctr.UUID,
@@ -273,6 +313,10 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
+                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
+                               // dynamically adjust priority
+                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
+                               scontrolUpdate(updated)
                        }
                }
        }
@@ -293,6 +337,21 @@ func scancel(ctr arvados.Container) {
        }
 }
 
+func scontrolUpdate(ctr arvados.Container) {
+       sqCheck.L.Lock()
+       cmd := scontrolCmd(ctr)
+       msg, err := cmd.CombinedOutput()
+       sqCheck.L.Unlock()
+
+       if err != nil {
+               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               time.Sleep(time.Second)
+       } else if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("Container %s priority is now %v, niceness is now %v",
+                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+       }
+}
+
 func readConfig(dst interface{}, path string) error {
        err := config.LoadFile(dst, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {