Merge branch '13752-index-all-filenames'
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 24a056264475bd26d2ec65e4b5825302903af806..20305ab90abe91b150ae71a7749fd39c8e529548 100644 (file)
@@ -33,7 +33,8 @@ type SqueueChecker struct {
        queue          map[string]*slurmJob
        startOnce      sync.Once
        done           chan struct{}
-       sync.Cond
+       lock           sync.RWMutex
+       notify         sync.Cond
 }
 
 // HasUUID checks if a given container UUID is in the slurm queue.
@@ -42,11 +43,11 @@ type SqueueChecker struct {
 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
        sqc.startOnce.Do(sqc.start)
 
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
 
        // block until next squeue broadcast signaling an update.
-       sqc.Wait()
+       sqc.notify.Wait()
        _, exists := sqc.queue[uuid]
        return exists
 }
@@ -55,25 +56,30 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 // container.
 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
        sqc.startOnce.Do(sqc.start)
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-       job, ok := sqc.queue[uuid]
-       if !ok {
+
+       sqc.lock.RLock()
+       job := sqc.queue[uuid]
+       if job == nil {
                // Wait in case the slurm job was just submitted and
                // will appear in the next squeue update.
-               sqc.Wait()
-               if job, ok = sqc.queue[uuid]; !ok {
-                       return
-               }
+               sqc.notify.Wait()
+               job = sqc.queue[uuid]
+       }
+       needUpdate := job != nil && job.wantPriority != want
+       sqc.lock.RUnlock()
+
+       if needUpdate {
+               sqc.lock.Lock()
+               job.wantPriority = want
+               sqc.lock.Unlock()
        }
-       job.wantPriority = want
 }
 
 // adjust slurm job nice values as needed to ensure slurm priority
 // order matches Arvados priority order.
 func (sqc *SqueueChecker) reniceAll() {
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
 
        jobs := make([]*slurmJob, 0, len(sqc.queue))
        for _, j := range sqc.queue {
@@ -133,12 +139,8 @@ func (sqc *SqueueChecker) Stop() {
 // queued). If it succeeds, it updates sqc.queue and wakes up any
 // goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
-       // Mutex between squeue sync and running sbatch or scancel.  This
-       // establishes a sequence so that squeue doesn't run concurrently with
-       // sbatch or scancel; the next update of squeue will occur only after
-       // sbatch or scancel has completed.
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
+       sqc.lock.Lock()
+       defer sqc.lock.Unlock()
 
        cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
@@ -191,18 +193,18 @@ func (sqc *SqueueChecker) check() {
                        // resolved the same way.
                        log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
                        sqc.Slurm.Release(uuid)
-               } else if p < 1<<20 && replacing.wantPriority > 0 {
+               } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
                        log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
                }
        }
        sqc.queue = newq
-       sqc.Broadcast()
+       sqc.notify.Broadcast()
 }
 
 // Initialize, and start a goroutine to call check() once per
 // squeue.Period until terminated by calling Stop().
 func (sqc *SqueueChecker) start() {
-       sqc.L = &sync.Mutex{}
+       sqc.notify.L = sqc.lock.RLocker()
        sqc.done = make(chan struct{})
        go func() {
                ticker := time.NewTicker(sqc.Period)
@@ -214,6 +216,15 @@ func (sqc *SqueueChecker) start() {
                        case <-ticker.C:
                                sqc.check()
                                sqc.reniceAll()
+                               select {
+                               case <-ticker.C:
+                                       // If this iteration took
+                                       // longer than sqc.Period,
+                                       // consume the next tick and
+                                       // wait. Otherwise we would
+                                       // starve other goroutines.
+                               default:
+                               }
                        }
                }
        }()
@@ -223,9 +234,9 @@ func (sqc *SqueueChecker) start() {
 // names reported by squeue.
 func (sqc *SqueueChecker) All() []string {
        sqc.startOnce.Do(sqc.start)
-       sqc.L.Lock()
-       defer sqc.L.Unlock()
-       sqc.Wait()
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
+       sqc.notify.Wait()
        var uuids []string
        for u := range sqc.queue {
                uuids = append(uuids, u)