//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"bytes"
"fmt"
- "log"
"sort"
"strings"
"sync"
"time"
)
+const slurm15NiceLimit int64 = 10000
+
type slurmJob struct {
uuid string
wantPriority int64
priority int64 // current slurm priority (incorporates nice value)
nice int64 // current slurm nice value
+ hitNiceLimit bool
}
-// Squeue implements asynchronous polling monitor of the SLURM queue using the
-// command 'squeue'.
+// SqueueChecker implements asynchronous polling monitor of the SLURM queue
+// using the command 'squeue'.
type SqueueChecker struct {
+ Logger logger
Period time.Duration
PrioritySpread int64
Slurm Slurm
queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
- sync.Cond
+ lock sync.RWMutex
+ notify sync.Cond
}
// HasUUID checks if a given container UUID is in the slurm queue.
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
- defer sqc.L.Unlock()
+ sqc.lock.RLock()
+ defer sqc.lock.RUnlock()
// block until next squeue broadcast signaling an update.
- sqc.Wait()
+ sqc.notify.Wait()
_, exists := sqc.queue[uuid]
return exists
}
// SetPriority sets or updates the desired (Arvados) priority for a
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
- sqc.L.Lock()
- defer sqc.L.Unlock()
- if _, ok := sqc.queue[uuid]; !ok {
+ sqc.startOnce.Do(sqc.start)
+
+ sqc.lock.RLock()
+ job := sqc.queue[uuid]
+ if job == nil {
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
- sqc.Wait()
- if _, ok = sqc.queue[uuid]; !ok {
- return
- }
+ sqc.notify.Wait()
+ job = sqc.queue[uuid]
+ }
+ needUpdate := job != nil && job.wantPriority != want
+ sqc.lock.RUnlock()
+
+ if needUpdate {
+ sqc.lock.Lock()
+ job.wantPriority = want
+ sqc.lock.Unlock()
}
- sqc.queue[uuid].wantPriority = want
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
- sqc.L.Lock()
- defer sqc.L.Unlock()
-
+ // This is slow (it shells out to scontrol many times) and no
+ // other goroutines update sqc.queue or any of the job fields
+ // we use here, so we don't acquire a lock.
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
if j.wantPriority == 0 {
// (perhaps it's not an Arvados job)
continue
}
+ if j.priority <= 2*slurm15NiceLimit {
+ // SLURM <= 15.x implements "hold" by setting
+ // priority to 0. If we include held jobs
+ // here, we'll end up trying to push other
+ // jobs below them using negative priority,
+ // which won't help anything.
+ continue
+ }
jobs = append(jobs, j)
}
sort.Slice(jobs, func(i, j int) bool {
- return jobs[i].wantPriority > jobs[j].wantPriority
+ if jobs[i].wantPriority != jobs[j].wantPriority {
+ return jobs[i].wantPriority > jobs[j].wantPriority
+ }
+ // break ties with container uuid --
+ // otherwise, the ordering would change from
+ // one interval to the next, and we'd do many
+ // pointless slurm queue rearrangements.
+ return jobs[i].uuid > jobs[j].uuid
})
renice := wantNice(jobs, sqc.PrioritySpread)
for i, job := range jobs {
- if renice[i] == job.nice {
+ niceNew := renice[i]
+ if job.hitNiceLimit && niceNew > slurm15NiceLimit {
+ niceNew = slurm15NiceLimit
+ }
+ if niceNew == job.nice {
continue
}
- log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
- sqc.Slurm.Renice(job.uuid, renice[i])
+ err := sqc.Slurm.Renice(job.uuid, niceNew)
+ if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
+ sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+ job.hitNiceLimit = true
+ }
}
}
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
- // Mutex between squeue sync and running sbatch or scancel. This
- // establishes a sequence so that squeue doesn't run concurrently with
- // sbatch or scancel; the next update of squeue will occur only after
- // sbatch or scancel has completed.
- sqc.L.Lock()
- defer sqc.L.Unlock()
-
- cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
- log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+ sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
if line == "" {
continue
}
- var uuid string
+ var uuid, state, reason string
var n, p int64
- if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
- log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
+ sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
continue
}
- newq[uuid] = &slurmJob{
- uuid: uuid,
- priority: p,
- nice: n,
+
+ // No other goroutines write to jobs' priority or nice
+ // fields, so we can read and write them without
+ // locks.
+ replacing, ok := sqc.queue[uuid]
+ if !ok {
+ replacing = &slurmJob{uuid: uuid}
+ }
+ replacing.priority = p
+ replacing.nice = n
+ newq[uuid] = replacing
+
+ if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
+ // When using SLURM 14.x or 15.x, our queued
+ // jobs land in this state when "scontrol
+ // reconfigure" invalidates their feature
+ // constraints by clearing all node features.
+ // They stay in this state even after the
+ // features reappear, until we run "scontrol
+ // release {jobid}". Priority is usually 0 in
+ // this state, but sometimes (due to a race
+ // with nice adjustments?) it's a small
+ // positive value.
+ //
+ // "scontrol release" is silent and successful
+ // regardless of whether the features have
+ // reappeared, so rather than second-guessing
+ // whether SLURM is ready, we just keep trying
+ // this until it works.
+ //
+ // "launch failed requeued held" seems to be
+ // another manifestation of this problem,
+ // resolved the same way.
+ sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+ sqc.Slurm.Release(uuid)
+ } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
+ sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
+ sqc.lock.Lock()
sqc.queue = newq
- sqc.Broadcast()
+ sqc.lock.Unlock()
+ sqc.notify.Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
- sqc.L = &sync.Mutex{}
+ sqc.notify.L = sqc.lock.RLocker()
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
case <-ticker.C:
sqc.check()
sqc.reniceAll()
+ select {
+ case <-ticker.C:
+ // If this iteration took
+ // longer than sqc.Period,
+ // consume the next tick and
+ // wait. Otherwise we would
+ // starve other goroutines.
+ default:
+ }
}
}
}()
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
- defer sqc.L.Unlock()
- sqc.Wait()
+ sqc.lock.RLock()
+ defer sqc.lock.RUnlock()
+ sqc.notify.Wait()
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)