// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bytes"
"context"
"flag"
"fmt"
"log"
"math"
"os"
- "os/exec"
"regexp"
"strings"
"time"
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ slurm Slurm
}
func main() {
+ theConfig.slurm = &slurmCLI{}
err := doMain()
if err != nil {
log.Fatal(err)
return (1000 - priority) * 10
}
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+ return sbatchArgs
}
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
- cmd := sbatchCmd(container)
-
- // Send a tiny script on stdin to execute the crunch-run
- // command (slurm requires this to be a #! script)
-
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
- args := append([]string(nil), crunchRunCommand...)
- args = append(args, container.UUID)
- cmd.Stdin = strings.NewReader(execScript(args))
-
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
+ crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := strings.NewReader(execScript(crArgs))
- // Mutex between squeue sync and running sbatch or scancel.
sqCheck.L.Lock()
defer sqCheck.L.Unlock()
- log.Printf("exec sbatch %+q", cmd.Args)
- err := cmd.Run()
-
- switch err.(type) {
- case nil:
- log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
- return nil
-
- case *exec.ExitError:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
- default:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("exec failed: %v", err)
- }
+ sbArgs := sbatchArgs(container)
+ log.Printf("running sbatch %+q", sbArgs)
+ return theConfig.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
} else if updated.Priority == 0 {
log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
scancel(ctr)
- } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
- // dynamically adjust priority
- log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
- scontrolUpdate(updated)
+ } else {
+ renice(updated)
}
}
}
func scancel(ctr arvados.Container) {
sqCheck.L.Lock()
- cmd := scancelCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Cancel(ctr.UUID)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("scancel: %s", err)
time.Sleep(time.Second)
} else if sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
}
}
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+ nice := niceness(ctr.Priority)
+ oldnice := sqCheck.GetNiceness(ctr.UUID)
+ if nice == oldnice || oldnice == -1 {
+ return
+ }
+ log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
sqCheck.L.Lock()
- cmd := scontrolCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Renice(ctr.UUID, nice)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("renice: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
- log.Printf("Container %s priority is now %v, niceness is now %v",
+ return
+ }
+ if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("container %s has arvados priority %d, slurm nice %d",
ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
}
}