X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1586823b65c7ec7656626e491a31f3f9516a4a56..54e5ad11af9ee6804c908e49249edf87de7b35dd:/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index cca8b3f150..ae2ca58421 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -1,16 +1,18 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main // Dispatcher service for Crunch that submits containers to the slurm queue. import ( - "bytes" "context" "flag" "fmt" "log" "math" "os" - "os/exec" "regexp" "strings" "time" @@ -22,6 +24,8 @@ import ( "github.com/coreos/go-systemd/daemon" ) +var version = "dev" + // Config used by crunch-dispatch-slurm type Config struct { Client arvados.Client @@ -37,9 +41,12 @@ type Config struct { // Minimum time between two attempts to run the same container MinRetryPeriod arvados.Duration + + slurm Slurm } func main() { + theConfig.slurm = &slurmCLI{} err := doMain() if err != nil { log.Fatal(err) @@ -65,10 +72,21 @@ func doMain() error { "dump-config", false, "write current configuration to stdout and exit") - + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("crunch-dispatch-slurm %s\n", version) + return nil + } + + log.Printf("crunch-dispatch-slurm %s started", version) + err := readConfig(&theConfig, *configPath) if err != nil { return err @@ -147,8 +165,18 @@ func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueCheck } } -// sbatchCmd -func sbatchFunc(container arvados.Container) *exec.Cmd { +func niceness(priority int) int { + if priority > 1000 { + priority = 1000 + } + if priority < 0 { + priority = 0 + } + // Niceness range 1-10000 + return (1000 - priority) * 10 +} + +func sbatchArgs(container arvados.Container) []string { mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576))) var disk int64 @@ -165,54 +193,27 @@ func sbatchFunc(container arvados.Container) *exec.Cmd { sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem)) sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs)) sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk)) + sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority))) if len(container.SchedulingParameters.Partitions) > 0 { sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ","))) } - return exec.Command("sbatch", sbatchArgs...) + return sbatchArgs } -// scancelCmd -func scancelFunc(container arvados.Container) *exec.Cmd { - return exec.Command("scancel", "--name="+container.UUID) -} - -// Wrap these so that they can be overridden by tests -var sbatchCmd = sbatchFunc -var scancelCmd = scancelFunc - -// Submit job to slurm using sbatch. func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error { - cmd := sbatchCmd(container) - - // Send a tiny script on stdin to execute the crunch-run - // command (slurm requires this to be a #! script) - cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID))) - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr + // append() here avoids modifying crunchRunCommand's + // underlying array, which is shared with other goroutines. + crArgs := append([]string(nil), crunchRunCommand...) + crArgs = append(crArgs, container.UUID) + crScript := strings.NewReader(execScript(crArgs)) - // Mutex between squeue sync and running sbatch or scancel. sqCheck.L.Lock() defer sqCheck.L.Unlock() - log.Printf("exec sbatch %+q", cmd.Args) - err := cmd.Run() - - switch err.(type) { - case nil: - log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String())) - return nil - - case *exec.ExitError: - dispatcher.Unlock(container.UUID) - return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes()) - - default: - dispatcher.Unlock(container.UUID) - return fmt.Errorf("exec failed: %v", err) - } + sbArgs := sbatchArgs(container) + log.Printf("running sbatch %+q", sbArgs) + return theConfig.slurm.Batch(crScript, sbArgs) } // Submit a container to the slurm queue (or resume monitoring if it's @@ -226,13 +227,21 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) { log.Printf("Submitting container %s to slurm", ctr.UUID) if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil { - log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err) + text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err) + log.Print(text) + + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": ctr.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + disp.Arv.Create("logs", lr, nil) + disp.Unlock(ctr.UUID) return } } - log.Printf("Start monitoring container %s", ctr.UUID) + log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State) defer log.Printf("Done monitoring container %s", ctr.UUID) // If the container disappears from the slurm queue, there is @@ -265,6 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados } else if updated.Priority == 0 { log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority) scancel(ctr) + } else { + renice(updated) } } } @@ -272,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados func scancel(ctr arvados.Container) { sqCheck.L.Lock() - cmd := scancelCmd(ctr) - msg, err := cmd.CombinedOutput() + err := theConfig.slurm.Cancel(ctr.UUID) sqCheck.L.Unlock() if err != nil { - log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg) + log.Printf("scancel: %s", err) time.Sleep(time.Second) } else if sqCheck.HasUUID(ctr.UUID) { log.Printf("container %s is still in squeue after scancel", ctr.UUID) @@ -285,6 +295,28 @@ func scancel(ctr arvados.Container) { } } +func renice(ctr arvados.Container) { + nice := niceness(ctr.Priority) + oldnice := sqCheck.GetNiceness(ctr.UUID) + if nice == oldnice || oldnice == -1 { + return + } + log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice) + sqCheck.L.Lock() + err := theConfig.slurm.Renice(ctr.UUID, nice) + sqCheck.L.Unlock() + + if err != nil { + log.Printf("renice: %s", err) + time.Sleep(time.Second) + return + } + if sqCheck.HasUUID(ctr.UUID) { + log.Printf("container %s has arvados priority %d, slurm nice %d", + ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID)) + } +} + func readConfig(dst interface{}, path string) error { err := config.LoadFile(dst, path) if err != nil && os.IsNotExist(err) && path == defaultConfigPath {