X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ffa1fd1fdf584c71e248e9bb7d523f788a517510..7f0f12c40238f3eb12a51877a755cf22357e0767:/lib/lsf/lsfqueue.go diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go index 65f38690c3..60f01640a0 100644 --- a/lib/lsf/lsfqueue.go +++ b/lib/lsf/lsfqueue.go @@ -16,29 +16,30 @@ type lsfqueue struct { period time.Duration lsfcli *lsfcli - initOnce sync.Once - mutex sync.Mutex - needUpdate chan bool - updated *sync.Cond - latest map[string]bjobsEntry + initOnce sync.Once + mutex sync.Mutex + nextReady chan (<-chan struct{}) + updated *sync.Cond + latest map[string]bjobsEntry } -// JobID waits for the next queue update (so even a job that was only +// Lookup waits for the next queue update (so even a job that was only // submitted a nanosecond ago will show up) and then returns the LSF -// job ID corresponding to the given container UUID. -func (q *lsfqueue) JobID(uuid string) (int, bool) { - q.initOnce.Do(q.init) - q.mutex.Lock() - defer q.mutex.Unlock() - select { - case q.needUpdate <- true: - default: - // an update is already pending +// queue information corresponding to the given container UUID. +func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) { + ent, ok := q.getNext()[uuid] + return ent, ok +} + +// All waits for the next queue update, then returns the names of all +// jobs in the queue. Used by checkLsfQueueForOrphans(). +func (q *lsfqueue) All() []string { + latest := q.getNext() + names := make([]string, 0, len(latest)) + for name := range latest { + names = append(names, name) } - q.updated.Wait() - ent, ok := q.latest[uuid] - q.logger.Debugf("JobID(%q) == %d", uuid, ent.id) - return ent.id, ok + return names } func (q *lsfqueue) SetPriority(uuid string, priority int64) { @@ -46,36 +47,62 @@ func (q *lsfqueue) SetPriority(uuid string, priority int64) { q.logger.Debug("SetPriority is not implemented") } +func (q *lsfqueue) getNext() map[string]bjobsEntry { + q.initOnce.Do(q.init) + <-(<-q.nextReady) + q.mutex.Lock() + defer q.mutex.Unlock() + return q.latest +} + func (q *lsfqueue) init() { q.updated = sync.NewCond(&q.mutex) - q.needUpdate = make(chan bool, 1) - ticker := time.NewTicker(time.Second) + q.nextReady = make(chan (<-chan struct{})) + ticker := time.NewTicker(q.period) go func() { - for range q.needUpdate { - q.logger.Debug("running bjobs") - ents, err := q.lsfcli.Bjobs() - if err != nil { - q.logger.Warnf("bjobs: %s", err) - // Retry on the next tick, don't wait - // for another new call to JobID(). + for range ticker.C { + // Send a new "next update ready" channel to + // the next goroutine that wants one (and any + // others that have already queued up since + // the first one started waiting). + // + // Below, when we get a new update, we'll + // signal that to the other goroutines by + // closing the ready chan. + ready := make(chan struct{}) + q.nextReady <- ready + for { select { - case q.needUpdate <- true: + case q.nextReady <- ready: + continue default: } + break + } + // Run bjobs repeatedly if needed, until we + // get valid output. + var ents []bjobsEntry + for { + q.logger.Debug("running bjobs") + var err error + ents, err = q.lsfcli.Bjobs() + if err == nil { + break + } + q.logger.Warnf("bjobs: %s", err) <-ticker.C - continue } next := make(map[string]bjobsEntry, len(ents)) for _, ent := range ents { - next[ent.name] = ent + next[ent.Name] = ent } + // Replace q.latest and notify all the + // goroutines that the "next update" they + // asked for is now ready. q.mutex.Lock() q.latest = next - q.updated.Broadcast() - q.logger.Debugf("waking up waiters with latest %v", q.latest) q.mutex.Unlock() - // Limit "bjobs" invocations to 1 per second - <-ticker.C + close(ready) } }() }