X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df591042778408d03d410d5c22a669d85652d1ea..21bf21abe900918a7882f7e43c102418e04159be:/services/crunch-dispatch-slurm/squeue.go?ds=sidebyside diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index ee79c6f774..20305ab90a 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -14,11 +14,14 @@ import ( "time" ) +const slurm15NiceLimit int64 = 10000 + type slurmJob struct { uuid string wantPriority int64 priority int64 // current slurm priority (incorporates nice value) nice int64 // current slurm nice value + hitNiceLimit bool } // Squeue implements asynchronous polling monitor of the SLURM queue using the @@ -30,7 +33,8 @@ type SqueueChecker struct { queue map[string]*slurmJob startOnce sync.Once done chan struct{} - sync.Cond + lock sync.RWMutex + notify sync.Cond } // HasUUID checks if a given container UUID is in the slurm queue. @@ -39,11 +43,11 @@ type SqueueChecker struct { func (sqc *SqueueChecker) HasUUID(uuid string) bool { sqc.startOnce.Do(sqc.start) - sqc.L.Lock() - defer sqc.L.Unlock() + sqc.lock.RLock() + defer sqc.lock.RUnlock() // block until next squeue broadcast signaling an update. - sqc.Wait() + sqc.notify.Wait() _, exists := sqc.queue[uuid] return exists } @@ -52,25 +56,30 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool { // container. func (sqc *SqueueChecker) SetPriority(uuid string, want int64) { sqc.startOnce.Do(sqc.start) - sqc.L.Lock() - defer sqc.L.Unlock() - job, ok := sqc.queue[uuid] - if !ok { + + sqc.lock.RLock() + job := sqc.queue[uuid] + if job == nil { // Wait in case the slurm job was just submitted and // will appear in the next squeue update. - sqc.Wait() - if job, ok = sqc.queue[uuid]; !ok { - return - } + sqc.notify.Wait() + job = sqc.queue[uuid] + } + needUpdate := job != nil && job.wantPriority != want + sqc.lock.RUnlock() + + if needUpdate { + sqc.lock.Lock() + job.wantPriority = want + sqc.lock.Unlock() } - job.wantPriority = want } // adjust slurm job nice values as needed to ensure slurm priority // order matches Arvados priority order. func (sqc *SqueueChecker) reniceAll() { - sqc.L.Lock() - defer sqc.L.Unlock() + sqc.lock.RLock() + defer sqc.lock.RUnlock() jobs := make([]*slurmJob, 0, len(sqc.queue)) for _, j := range sqc.queue { @@ -79,7 +88,7 @@ func (sqc *SqueueChecker) reniceAll() { // (perhaps it's not an Arvados job) continue } - if j.priority == 0 { + if j.priority <= 2*slurm15NiceLimit { // SLURM <= 15.x implements "hold" by setting // priority to 0. If we include held jobs // here, we'll end up trying to push other @@ -103,10 +112,18 @@ func (sqc *SqueueChecker) reniceAll() { }) renice := wantNice(jobs, sqc.PrioritySpread) for i, job := range jobs { - if renice[i] == job.nice { + niceNew := renice[i] + if job.hitNiceLimit && niceNew > slurm15NiceLimit { + niceNew = slurm15NiceLimit + } + if niceNew == job.nice { continue } - sqc.Slurm.Renice(job.uuid, renice[i]) + err := sqc.Slurm.Renice(job.uuid, niceNew) + if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") { + log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit) + job.hitNiceLimit = true + } } } @@ -122,12 +139,8 @@ func (sqc *SqueueChecker) Stop() { // queued). If it succeeds, it updates sqc.queue and wakes up any // goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { - // Mutex between squeue sync and running sbatch or scancel. This - // establishes a sequence so that squeue doesn't run concurrently with - // sbatch or scancel; the next update of squeue will occur only after - // sbatch or scancel has completed. - sqc.L.Lock() - defer sqc.L.Unlock() + sqc.lock.Lock() + defer sqc.lock.Unlock() cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"}) stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} @@ -157,31 +170,41 @@ func (sqc *SqueueChecker) check() { replacing.nice = n newq[uuid] = replacing - if state == "PENDING" && reason == "BadConstraints" && p == 0 && replacing.wantPriority > 0 { + if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 { // When using SLURM 14.x or 15.x, our queued // jobs land in this state when "scontrol // reconfigure" invalidates their feature // constraints by clearing all node features. // They stay in this state even after the // features reappear, until we run "scontrol - // release {jobid}". + // release {jobid}". Priority is usually 0 in + // this state, but sometimes (due to a race + // with nice adjustments?) it's a small + // positive value. // // "scontrol release" is silent and successful // regardless of whether the features have // reappeared, so rather than second-guessing // whether SLURM is ready, we just keep trying // this until it works. + // + // "launch failed requeued held" seems to be + // another manifestation of this problem, + // resolved the same way. + log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason) sqc.Slurm.Release(uuid) + } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 { + log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) } } sqc.queue = newq - sqc.Broadcast() + sqc.notify.Broadcast() } // Initialize, and start a goroutine to call check() once per // squeue.Period until terminated by calling Stop(). func (sqc *SqueueChecker) start() { - sqc.L = &sync.Mutex{} + sqc.notify.L = sqc.lock.RLocker() sqc.done = make(chan struct{}) go func() { ticker := time.NewTicker(sqc.Period) @@ -193,6 +216,15 @@ func (sqc *SqueueChecker) start() { case <-ticker.C: sqc.check() sqc.reniceAll() + select { + case <-ticker.C: + // If this iteration took + // longer than sqc.Period, + // consume the next tick and + // wait. Otherwise we would + // starve other goroutines. + default: + } } } }() @@ -202,9 +234,9 @@ func (sqc *SqueueChecker) start() { // names reported by squeue. func (sqc *SqueueChecker) All() []string { sqc.startOnce.Do(sqc.start) - sqc.L.Lock() - defer sqc.L.Unlock() - sqc.Wait() + sqc.lock.RLock() + defer sqc.lock.RUnlock() + sqc.notify.Wait() var uuids []string for u := range sqc.queue { uuids = append(uuids, u)