Refactor the multi-host salt install page.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 4067d27d5a43b8b603ed3e8aa4db07ebad66721d..d4e41ed1fb2c5dd4f114dfa7d5493dcd79b28015 100644 (file)
@@ -2,12 +2,11 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package dispatchslurm
 
 import (
        "bytes"
        "fmt"
-       "log"
        "sort"
        "strings"
        "sync"
@@ -24,16 +23,18 @@ type slurmJob struct {
        hitNiceLimit bool
 }
 
-// Squeue implements asynchronous polling monitor of the SLURM queue using the
-// command 'squeue'.
+// SqueueChecker implements asynchronous polling monitor of the SLURM queue
+// using the command 'squeue'.
 type SqueueChecker struct {
+       Logger         logger
        Period         time.Duration
        PrioritySpread int64
        Slurm          Slurm
        queue          map[string]*slurmJob
        startOnce      sync.Once
        done           chan struct{}
-       sync.Cond
+       lock           sync.RWMutex
+       notify         sync.Cond
 }
 
 // HasUUID checks if a given container UUID is in the slurm queue.
@@ -42,11 +43,11 @@ type SqueueChecker struct {
 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
        sqc.startOnce.Do(sqc.start)
 
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
 
        // block until next squeue broadcast signaling an update.
-       sqc.Wait()
+       sqc.notify.Wait()
        _, exists := sqc.queue[uuid]
        return exists
 }
@@ -55,26 +56,31 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 // container.
 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
        sqc.startOnce.Do(sqc.start)
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-       job, ok := sqc.queue[uuid]
-       if !ok {
+
+       sqc.lock.RLock()
+       job := sqc.queue[uuid]
+       if job == nil {
                // Wait in case the slurm job was just submitted and
                // will appear in the next squeue update.
-               sqc.Wait()
-               if job, ok = sqc.queue[uuid]; !ok {
-                       return
-               }
+               sqc.notify.Wait()
+               job = sqc.queue[uuid]
+       }
+       needUpdate := job != nil && job.wantPriority != want
+       sqc.lock.RUnlock()
+
+       if needUpdate {
+               sqc.lock.Lock()
+               job.wantPriority = want
+               sqc.lock.Unlock()
        }
-       job.wantPriority = want
 }
 
 // adjust slurm job nice values as needed to ensure slurm priority
 // order matches Arvados priority order.
 func (sqc *SqueueChecker) reniceAll() {
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-
+       // This is slow (it shells out to scontrol many times) and no
+       // other goroutines update sqc.queue or any of the job fields
+       // we use here, so we don't acquire a lock.
        jobs := make([]*slurmJob, 0, len(sqc.queue))
        for _, j := range sqc.queue {
                if j.wantPriority == 0 {
@@ -82,7 +88,7 @@ func (sqc *SqueueChecker) reniceAll() {
                        // (perhaps it's not an Arvados job)
                        continue
                }
-               if j.priority == 0 {
+               if j.priority <= 2*slurm15NiceLimit {
                        // SLURM <= 15.x implements "hold" by setting
                        // priority to 0. If we include held jobs
                        // here, we'll end up trying to push other
@@ -96,13 +102,12 @@ func (sqc *SqueueChecker) reniceAll() {
        sort.Slice(jobs, func(i, j int) bool {
                if jobs[i].wantPriority != jobs[j].wantPriority {
                        return jobs[i].wantPriority > jobs[j].wantPriority
-               } else {
-                       // break ties with container uuid --
-                       // otherwise, the ordering would change from
-                       // one interval to the next, and we'd do many
-                       // pointless slurm queue rearrangements.
-                       return jobs[i].uuid > jobs[j].uuid
                }
+               // break ties with container uuid --
+               // otherwise, the ordering would change from
+               // one interval to the next, and we'd do many
+               // pointless slurm queue rearrangements.
+               return jobs[i].uuid > jobs[j].uuid
        })
        renice := wantNice(jobs, sqc.PrioritySpread)
        for i, job := range jobs {
@@ -115,7 +120,7 @@ func (sqc *SqueueChecker) reniceAll() {
                }
                err := sqc.Slurm.Renice(job.uuid, niceNew)
                if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
-                       log.Printf("container %q clamping nice values at %d, priority order will not be correct", job.uuid, slurm15NiceLimit)
+                       sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
                        job.hitNiceLimit = true
                }
        }
@@ -133,18 +138,11 @@ func (sqc *SqueueChecker) Stop() {
 // queued). If it succeeds, it updates sqc.queue and wakes up any
 // goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
-       // Mutex between squeue sync and running sbatch or scancel.  This
-       // establishes a sequence so that squeue doesn't run concurrently with
-       // sbatch or scancel; the next update of squeue will occur only after
-       // sbatch or scancel has completed.
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-
        cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
-               log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+               sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
                return
        }
 
@@ -157,9 +155,13 @@ func (sqc *SqueueChecker) check() {
                var uuid, state, reason string
                var n, p int64
                if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
-                       log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+                       sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
                        continue
                }
+
+               // No other goroutines write to jobs' priority or nice
+               // fields, so we can read and write them without
+               // locks.
                replacing, ok := sqc.queue[uuid]
                if !ok {
                        replacing = &slurmJob{uuid: uuid}
@@ -168,14 +170,17 @@ func (sqc *SqueueChecker) check() {
                replacing.nice = n
                newq[uuid] = replacing
 
-               if state == "PENDING" && ((reason == "BadConstraints" && p == 0) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
+               if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
                        // When using SLURM 14.x or 15.x, our queued
                        // jobs land in this state when "scontrol
                        // reconfigure" invalidates their feature
                        // constraints by clearing all node features.
                        // They stay in this state even after the
                        // features reappear, until we run "scontrol
-                       // release {jobid}".
+                       // release {jobid}". Priority is usually 0 in
+                       // this state, but sometimes (due to a race
+                       // with nice adjustments?) it's a small
+                       // positive value.
                        //
                        // "scontrol release" is silent and successful
                        // regardless of whether the features have
@@ -186,20 +191,22 @@ func (sqc *SqueueChecker) check() {
                        // "launch failed requeued held" seems to be
                        // another manifestation of this problem,
                        // resolved the same way.
-                       log.Printf("releasing held job %q", uuid)
+                       sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
                        sqc.Slurm.Release(uuid)
-               } else if p < 1<<20 && replacing.wantPriority > 0 {
-                       log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+               } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
+                       sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
                }
        }
+       sqc.lock.Lock()
        sqc.queue = newq
-       sqc.Broadcast()
+       sqc.lock.Unlock()
+       sqc.notify.Broadcast()
 }
 
 // Initialize, and start a goroutine to call check() once per
 // squeue.Period until terminated by calling Stop().
 func (sqc *SqueueChecker) start() {
-       sqc.L = &sync.Mutex{}
+       sqc.notify.L = sqc.lock.RLocker()
        sqc.done = make(chan struct{})
        go func() {
                ticker := time.NewTicker(sqc.Period)
@@ -211,6 +218,15 @@ func (sqc *SqueueChecker) start() {
                        case <-ticker.C:
                                sqc.check()
                                sqc.reniceAll()
+                               select {
+                               case <-ticker.C:
+                                       // If this iteration took
+                                       // longer than sqc.Period,
+                                       // consume the next tick and
+                                       // wait. Otherwise we would
+                                       // starve other goroutines.
+                               default:
+                               }
                        }
                }
        }()
@@ -220,9 +236,9 @@ func (sqc *SqueueChecker) start() {
 // names reported by squeue.
 func (sqc *SqueueChecker) All() []string {
        sqc.startOnce.Do(sqc.start)
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-       sqc.Wait()
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
+       sqc.notify.Wait()
        var uuids []string
        for u := range sqc.queue {
                uuids = append(uuids, u)