1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "github.com/sirupsen/logrus"
14 type lsfqueue struct {
15 logger logrus.FieldLogger
21 nextReady chan (<-chan struct{})
23 latest map[string]bjobsEntry
26 // Lookup waits for the next queue update (so even a job that was only
27 // submitted a nanosecond ago will show up) and then returns the LSF
28 // queue information corresponding to the given container UUID.
29 func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
30 ent, ok := q.getNext()[uuid]
34 // All waits for the next queue update, then returns the names of all
35 // jobs in the queue. Used by checkLsfQueueForOrphans().
36 func (q *lsfqueue) All() []string {
38 names := make([]string, 0, len(latest))
39 for name := range latest {
40 names = append(names, name)
45 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
47 q.logger.Debug("SetPriority is not implemented")
50 func (q *lsfqueue) getNext() map[string]bjobsEntry {
54 defer q.mutex.Unlock()
58 func (q *lsfqueue) init() {
59 q.updated = sync.NewCond(&q.mutex)
60 q.nextReady = make(chan (<-chan struct{}))
61 ticker := time.NewTicker(q.period)
64 // Send a new "next update ready" channel to
65 // the next goroutine that wants one (and any
66 // others that have already queued up since
67 // the first one started waiting).
69 // Below, when we get a new update, we'll
70 // signal that to the other goroutines by
71 // closing the ready chan.
72 ready := make(chan struct{})
76 case q.nextReady <- ready:
82 // Run bjobs repeatedly if needed, until we
86 q.logger.Debug("running bjobs")
88 ents, err = q.lsfcli.Bjobs()
92 q.logger.Warnf("bjobs: %s", err)
95 next := make(map[string]bjobsEntry, len(ents))
96 for _, ent := range ents {
99 // Replace q.latest and notify all the
100 // goroutines that the "next update" they
101 // asked for is now ready.