+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bytes"
"context"
"flag"
"fmt"
"log"
"math"
"os"
- "os/exec"
+ "regexp"
"strings"
"time"
"github.com/coreos/go-systemd/daemon"
)
+var version = "dev"
+
// Config used by crunch-dispatch-slurm
type Config struct {
Client arvados.Client
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ slurm Slurm
}
func main() {
+ theConfig.slurm = &slurmCLI{}
err := doMain()
if err != nil {
log.Fatal(err)
"dump-config",
false,
"write current configuration to stdout and exit")
-
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunch-dispatch-slurm %s\n", version)
+ return nil
+ }
+
+ log.Printf("crunch-dispatch-slurm %s started", version)
+
err := readConfig(&theConfig, *configPath)
if err != nil {
return err
log.Printf("Error notifying init daemon: %v", err)
}
+ go checkSqueueForOrphans(dispatcher, sqCheck)
+
return dispatcher.Run(context.Background())
}
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
- memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
+var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Check the next squeue report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel slurm
+// jobs started by a previous dispatch process that never released
+// their slurm allocations even though their container states are
+// Cancelled or Complete. See https://dev.arvados.org/issues/10979
+func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
+ for _, uuid := range sqCheck.All() {
+ if !containerUuidPattern.MatchString(uuid) {
+ continue
+ }
+ err := dispatcher.TrackContainer(uuid)
+ if err != nil {
+ log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
+ }
+ }
+}
+
+func niceness(priority int) int {
+ if priority > 1000 {
+ priority = 1000
+ }
+ if priority < 0 {
+ priority = 0
+ }
+ // Niceness range 1-10000
+ return (1000 - priority) * 10
+}
+
+func sbatchArgs(container arvados.Container) []string {
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+
+ var disk int64
+ for _, m := range container.Mounts {
+ if m.Kind == "tmp" {
+ disk += m.Capacity
+ }
+ }
+ disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, "--share")
sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return exec.Command("sbatch", sbatchArgs...)
+ return sbatchArgs
}
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-
-// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
- cmd := sbatchCmd(container)
+ // append() here avoids modifying crunchRunCommand's
+ // underlying array, which is shared with other goroutines.
+ crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := strings.NewReader(execScript(crArgs))
- // Send a tiny script on stdin to execute the crunch-run
- // command (slurm requires this to be a #! script)
- cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
-
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
-
- // Mutex between squeue sync and running sbatch or scancel.
sqCheck.L.Lock()
defer sqCheck.L.Unlock()
- log.Printf("exec sbatch %+q", cmd.Args)
- err := cmd.Run()
-
- switch err.(type) {
- case nil:
- log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
- return nil
-
- case *exec.ExitError:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
- default:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("exec failed: %v", err)
- }
+ sbArgs := sbatchArgs(container)
+ log.Printf("running sbatch %+q", sbArgs)
+ return theConfig.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
- log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ log.Print(text)
+
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": ctr.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ disp.Arv.Create("logs", lr, nil)
+
disp.Unlock(ctr.UUID)
return
}
}
- log.Printf("Start monitoring container %s", ctr.UUID)
+ log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer log.Printf("Done monitoring container %s", ctr.UUID)
// If the container disappears from the slurm queue, there is
} else if updated.Priority == 0 {
log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
scancel(ctr)
+ } else {
+ renice(updated)
}
}
}
func scancel(ctr arvados.Container) {
sqCheck.L.Lock()
- cmd := scancelCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Cancel(ctr.UUID)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("scancel: %s", err)
time.Sleep(time.Second)
} else if sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
}
}
+func renice(ctr arvados.Container) {
+ nice := niceness(ctr.Priority)
+ oldnice := sqCheck.GetNiceness(ctr.UUID)
+ if nice == oldnice || oldnice == -1 {
+ return
+ }
+ log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
+ sqCheck.L.Lock()
+ err := theConfig.slurm.Renice(ctr.UUID, nice)
+ sqCheck.L.Unlock()
+
+ if err != nil {
+ log.Printf("renice: %s", err)
+ time.Sleep(time.Second)
+ return
+ }
+ if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("container %s has arvados priority %d, slurm nice %d",
+ ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+ }
+}
+
func readConfig(dst interface{}, path string) error {
err := config.LoadFile(dst, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {