+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
"math"
"os"
"os/exec"
+ "regexp"
"strings"
"time"
log.Printf("Error notifying init daemon: %v", err)
}
+ go checkSqueueForOrphans(dispatcher, sqCheck)
+
return dispatcher.Run(context.Background())
}
+var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Check the next squeue report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel slurm
+// jobs started by a previous dispatch process that never released
+// their slurm allocations even though their container states are
+// Cancelled or Complete. See https://dev.arvados.org/issues/10979
+func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
+ for _, uuid := range sqCheck.All() {
+ if !containerUuidPattern.MatchString(uuid) {
+ continue
+ }
+ err := dispatcher.TrackContainer(uuid)
+ if err != nil {
+ log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
+ }
+ }
+}
+
// sbatchCmd
func sbatchFunc(container arvados.Container) *exec.Cmd {
- memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+
+ var disk int64
+ for _, m := range container.Mounts {
+ if m.Kind == "tmp" {
+ disk += m.Capacity
+ }
+ }
+ disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, "--share")
sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
- log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ log.Print(text)
+
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": ctr.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ disp.Arv.Create("logs", lr, nil)
+
disp.Unlock(ctr.UUID)
return
}
}
- log.Printf("Start monitoring container %s", ctr.UUID)
+ log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer log.Printf("Done monitoring container %s", ctr.UUID)
// If the container disappears from the slurm queue, there is