X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0eb72b526bf8bbb011551ecf019f604e17a534f1..7499f61a2912cfdb1a316808fafa6e6ee77ee2e0:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index d22e02597b..eae21e62b6 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -6,28 +6,36 @@ package main import ( "bytes" - "log" - "os/exec" + "fmt" + "sort" "strings" "sync" "time" ) -// Squeue implements asynchronous polling monitor of the SLURM queue using the -// command 'squeue'. -type SqueueChecker struct { - Period time.Duration - uuids map[string]bool - startOnce sync.Once - done chan struct{} - sync.Cond -} +const slurm15NiceLimit int64 = 10000 -func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--all", "--format=%j") +type slurmJob struct { + uuid string + wantPriority int64 + priority int64 // current slurm priority (incorporates nice value) + nice int64 // current slurm nice value + hitNiceLimit bool } -var squeueCmd = squeueFunc +// SqueueChecker implements asynchronous polling monitor of the SLURM queue +// using the command 'squeue'. +type SqueueChecker struct { + Logger logger + Period time.Duration + PrioritySpread int64 + Slurm Slurm + queue map[string]*slurmJob + startOnce sync.Once + done chan struct{} + lock sync.RWMutex + notify sync.Cond +} // HasUUID checks if a given container UUID is in the slurm queue. // This does not run squeue directly, but instead blocks until woken @@ -35,12 +43,87 @@ var squeueCmd = squeueFunc func (sqc *SqueueChecker) HasUUID(uuid string) bool { sqc.startOnce.Do(sqc.start) - sqc.L.Lock() - defer sqc.L.Unlock() + sqc.lock.RLock() + defer sqc.lock.RUnlock() // block until next squeue broadcast signaling an update. - sqc.Wait() - return sqc.uuids[uuid] + sqc.notify.Wait() + _, exists := sqc.queue[uuid] + return exists +} + +// SetPriority sets or updates the desired (Arvados) priority for a +// container. +func (sqc *SqueueChecker) SetPriority(uuid string, want int64) { + sqc.startOnce.Do(sqc.start) + + sqc.lock.RLock() + job := sqc.queue[uuid] + if job == nil { + // Wait in case the slurm job was just submitted and + // will appear in the next squeue update. + sqc.notify.Wait() + job = sqc.queue[uuid] + } + needUpdate := job != nil && job.wantPriority != want + sqc.lock.RUnlock() + + if needUpdate { + sqc.lock.Lock() + job.wantPriority = want + sqc.lock.Unlock() + } +} + +// adjust slurm job nice values as needed to ensure slurm priority +// order matches Arvados priority order. +func (sqc *SqueueChecker) reniceAll() { + // This is slow (it shells out to scontrol many times) and no + // other goroutines update sqc.queue or any of the job fields + // we use here, so we don't acquire a lock. + jobs := make([]*slurmJob, 0, len(sqc.queue)) + for _, j := range sqc.queue { + if j.wantPriority == 0 { + // SLURM job with unknown Arvados priority + // (perhaps it's not an Arvados job) + continue + } + if j.priority <= 2*slurm15NiceLimit { + // SLURM <= 15.x implements "hold" by setting + // priority to 0. If we include held jobs + // here, we'll end up trying to push other + // jobs below them using negative priority, + // which won't help anything. + continue + } + jobs = append(jobs, j) + } + + sort.Slice(jobs, func(i, j int) bool { + if jobs[i].wantPriority != jobs[j].wantPriority { + return jobs[i].wantPriority > jobs[j].wantPriority + } + // break ties with container uuid -- + // otherwise, the ordering would change from + // one interval to the next, and we'd do many + // pointless slurm queue rearrangements. + return jobs[i].uuid > jobs[j].uuid + }) + renice := wantNice(jobs, sqc.PrioritySpread) + for i, job := range jobs { + niceNew := renice[i] + if job.hitNiceLimit && niceNew > slurm15NiceLimit { + niceNew = slurm15NiceLimit + } + if niceNew == job.nice { + continue + } + err := sqc.Slurm.Renice(job.uuid, niceNew) + if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") { + sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit) + job.hitNiceLimit = true + } + } } // Stop stops the squeue monitoring goroutine. Do not call HasUUID @@ -52,36 +135,78 @@ func (sqc *SqueueChecker) Stop() { } // check gets the names of jobs in the SLURM queue (running and -// queued). If it succeeds, it updates squeue.uuids and wakes up any +// queued). If it succeeds, it updates sqc.queue and wakes up any // goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { - // Mutex between squeue sync and running sbatch or scancel. This - // establishes a sequence so that squeue doesn't run concurrently with - // sbatch or scancel; the next update of squeue will occur only after - // sbatch or scancel has completed. - sqc.L.Lock() - defer sqc.L.Unlock() - - cmd := squeueCmd() + cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"}) stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} cmd.Stdout, cmd.Stderr = stdout, stderr if err := cmd.Run(); err != nil { - log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) + sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) return } - uuids := strings.Split(stdout.String(), "\n") - sqc.uuids = make(map[string]bool, len(uuids)) - for _, uuid := range uuids { - sqc.uuids[uuid] = true + lines := strings.Split(stdout.String(), "\n") + newq := make(map[string]*slurmJob, len(lines)) + for _, line := range lines { + if line == "" { + continue + } + var uuid, state, reason string + var n, p int64 + if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil { + sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line) + continue + } + + // No other goroutines write to jobs' priority or nice + // fields, so we can read and write them without + // locks. + replacing, ok := sqc.queue[uuid] + if !ok { + replacing = &slurmJob{uuid: uuid} + } + replacing.priority = p + replacing.nice = n + newq[uuid] = replacing + + if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 { + // When using SLURM 14.x or 15.x, our queued + // jobs land in this state when "scontrol + // reconfigure" invalidates their feature + // constraints by clearing all node features. + // They stay in this state even after the + // features reappear, until we run "scontrol + // release {jobid}". Priority is usually 0 in + // this state, but sometimes (due to a race + // with nice adjustments?) it's a small + // positive value. + // + // "scontrol release" is silent and successful + // regardless of whether the features have + // reappeared, so rather than second-guessing + // whether SLURM is ready, we just keep trying + // this until it works. + // + // "launch failed requeued held" seems to be + // another manifestation of this problem, + // resolved the same way. + sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason) + sqc.Slurm.Release(uuid) + } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 { + sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) + } } - sqc.Broadcast() + sqc.lock.Lock() + sqc.queue = newq + sqc.lock.Unlock() + sqc.notify.Broadcast() } // Initialize, and start a goroutine to call check() once per // squeue.Period until terminated by calling Stop(). func (sqc *SqueueChecker) start() { - sqc.L = &sync.Mutex{} + sqc.notify.L = sqc.lock.RLocker() sqc.done = make(chan struct{}) go func() { ticker := time.NewTicker(sqc.Period) @@ -92,6 +217,16 @@ func (sqc *SqueueChecker) start() { return case <-ticker.C: sqc.check() + sqc.reniceAll() + select { + case <-ticker.C: + // If this iteration took + // longer than sqc.Period, + // consume the next tick and + // wait. Otherwise we would + // starve other goroutines. + default: + } } } }() @@ -101,12 +236,12 @@ func (sqc *SqueueChecker) start() { // names reported by squeue. func (sqc *SqueueChecker) All() []string { sqc.startOnce.Do(sqc.start) - sqc.L.Lock() - defer sqc.L.Unlock() - sqc.Wait() + sqc.lock.RLock() + defer sqc.lock.RUnlock() + sqc.notify.Wait() var uuids []string - for uuid := range sqc.uuids { - uuids = append(uuids, uuid) + for u := range sqc.queue { + uuids = append(uuids, u) } return uuids }