Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 5eeb32e62fd071326661da8b0c365d85799dccbd..3c89103f38dbc1cd094047d173c10bfe0b28f08e 100644 (file)
@@ -80,11 +80,11 @@ func doMain() error {
 
        // Print version information if requested
        if *getVersion {
-               fmt.Printf("Version: %s\n", version)
-               os.Exit(0)
+               fmt.Printf("crunch-dispatch-slurm %s\n", version)
+               return nil
        }
 
-       log.Printf("crunch-dispatch-slurm %q started", version)
+       log.Printf("crunch-dispatch-slurm %s started", version)
 
        err := readConfig(&theConfig, *configPath)
        if err != nil {
@@ -164,6 +164,17 @@ func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueCheck
        }
 }
 
+func niceness(priority int) int {
+       if priority > 1000 {
+               priority = 1000
+       }
+       if priority < 0 {
+               priority = 0
+       }
+       // Niceness range 1-10000
+       return (1000 - priority) * 10
+}
+
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
@@ -182,6 +193,7 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@ -194,9 +206,15 @@ func scancelFunc(container arvados.Container) *exec.Cmd {
        return exec.Command("scancel", "--name="+container.UUID)
 }
 
+// scontrolCmd
+func scontrolFunc(container arvados.Container) *exec.Cmd {
+       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
 var scancelCmd = scancelFunc
+var scontrolCmd = scontrolFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
@@ -204,7 +222,12 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch
 
        // Send a tiny script on stdin to execute the crunch-run
        // command (slurm requires this to be a #! script)
-       cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
+
+       // append() here avoids modifying crunchRunCommand's
+       // underlying array, which is shared with other goroutines.
+       args := append([]string(nil), crunchRunCommand...)
+       args = append(args, container.UUID)
+       cmd.Stdin = strings.NewReader(execScript(args))
 
        var stdout, stderr bytes.Buffer
        cmd.Stdout = &stdout
@@ -290,6 +313,10 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
+                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
+                               // dynamically adjust priority
+                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
+                               scontrolUpdate(updated)
                        }
                }
        }
@@ -310,6 +337,21 @@ func scancel(ctr arvados.Container) {
        }
 }
 
+func scontrolUpdate(ctr arvados.Container) {
+       sqCheck.L.Lock()
+       cmd := scontrolCmd(ctr)
+       msg, err := cmd.CombinedOutput()
+       sqCheck.L.Unlock()
+
+       if err != nil {
+               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               time.Sleep(time.Second)
+       } else if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("Container %s priority is now %v, niceness is now %v",
+                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+       }
+}
+
 func readConfig(dst interface{}, path string) error {
        err := config.LoadFile(dst, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {