17756: Rewrite lsfqueue "wait for next update".
authorTom Clegg <tom@curii.com>
Fri, 2 Jul 2021 21:07:27 +0000 (17:07 -0400)
committerTom Clegg <tom@curii.com>
Sun, 4 Jul 2021 17:31:56 +0000 (13:31 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/lsf/lsfqueue.go

index 65f38690c3a29638167af313387975d581851449..3c4fc4cb8cf6bc72e4cb1768041cd329e68eded1 100644 (file)
@@ -16,66 +16,93 @@ type lsfqueue struct {
        period time.Duration
        lsfcli *lsfcli
 
-       initOnce   sync.Once
-       mutex      sync.Mutex
-       needUpdate chan bool
-       updated    *sync.Cond
-       latest     map[string]bjobsEntry
+       initOnce  sync.Once
+       mutex     sync.Mutex
+       nextReady chan (<-chan struct{})
+       updated   *sync.Cond
+       latest    map[string]bjobsEntry
 }
 
 // JobID waits for the next queue update (so even a job that was only
 // submitted a nanosecond ago will show up) and then returns the LSF
 // job ID corresponding to the given container UUID.
 func (q *lsfqueue) JobID(uuid string) (int, bool) {
-       q.initOnce.Do(q.init)
-       q.mutex.Lock()
-       defer q.mutex.Unlock()
-       select {
-       case q.needUpdate <- true:
-       default:
-               // an update is already pending
-       }
-       q.updated.Wait()
-       ent, ok := q.latest[uuid]
-       q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+       ent, ok := q.getNext()[uuid]
        return ent.id, ok
 }
 
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+       latest := q.getNext()
+       names := make([]string, 0, len(latest))
+       for name := range latest {
+               names = append(names, name)
+       }
+       return names
+}
+
 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
        q.initOnce.Do(q.init)
        q.logger.Debug("SetPriority is not implemented")
 }
 
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+       q.initOnce.Do(q.init)
+       <-(<-q.nextReady)
+       q.mutex.Lock()
+       defer q.mutex.Unlock()
+       return q.latest
+}
+
 func (q *lsfqueue) init() {
        q.updated = sync.NewCond(&q.mutex)
-       q.needUpdate = make(chan bool, 1)
+       q.nextReady = make(chan (<-chan struct{}))
        ticker := time.NewTicker(time.Second)
        go func() {
-               for range q.needUpdate {
-                       q.logger.Debug("running bjobs")
-                       ents, err := q.lsfcli.Bjobs()
-                       if err != nil {
-                               q.logger.Warnf("bjobs: %s", err)
-                               // Retry on the next tick, don't wait
-                               // for another new call to JobID().
+               for range ticker.C {
+                       // Send a new "next update ready" channel to
+                       // the next goroutine that wants one (and any
+                       // others that have already queued up since
+                       // the first one started waiting).
+                       //
+                       // Below, when we get a new update, we'll
+                       // signal that to the other goroutines by
+                       // closing the ready chan.
+                       ready := make(chan struct{})
+                       q.nextReady <- ready
+                       for {
                                select {
-                               case q.needUpdate <- true:
+                               case q.nextReady <- ready:
+                                       continue
                                default:
                                }
+                               break
+                       }
+                       // Run bjobs repeatedly if needed, until we
+                       // get valid output.
+                       var ents []bjobsEntry
+                       for {
+                               q.logger.Debug("running bjobs")
+                               var err error
+                               ents, err = q.lsfcli.Bjobs()
+                               if err == nil {
+                                       break
+                               }
+                               q.logger.Warnf("bjobs: %s", err)
                                <-ticker.C
-                               continue
                        }
                        next := make(map[string]bjobsEntry, len(ents))
                        for _, ent := range ents {
                                next[ent.name] = ent
                        }
+                       // Replace q.latest and notify all the
+                       // goroutines that the "next update" they
+                       // asked for is now ready.
                        q.mutex.Lock()
                        q.latest = next
-                       q.updated.Broadcast()
-                       q.logger.Debugf("waking up waiters with latest %v", q.latest)
                        q.mutex.Unlock()
-                       // Limit "bjobs" invocations to 1 per second
-                       <-ticker.C
+                       close(ready)
                }
        }()
 }