1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "github.com/sirupsen/logrus"
14 type lsfqueue struct {
15 logger logrus.FieldLogger
23 latest map[string]bjobsEntry
26 // JobID waits for the next queue update (so even a job that was only
27 // submitted a nanosecond ago will show up) and then returns the LSF
28 // job ID corresponding to the given container UUID.
29 func (q *lsfqueue) JobID(uuid string) (int, bool) {
32 defer q.mutex.Unlock()
34 case q.needUpdate <- true:
36 // an update is already pending
39 ent, ok := q.latest[uuid]
40 q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
44 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
46 q.logger.Debug("SetPriority is not implemented")
49 func (q *lsfqueue) init() {
50 q.updated = sync.NewCond(&q.mutex)
51 q.needUpdate = make(chan bool, 1)
52 ticker := time.NewTicker(time.Second)
54 for range q.needUpdate {
55 q.logger.Debug("running bjobs")
56 ents, err := q.lsfcli.Bjobs()
58 q.logger.Warnf("bjobs: %s", err)
59 // Retry on the next tick, don't wait
60 // for another new call to JobID().
62 case q.needUpdate <- true:
68 next := make(map[string]bjobsEntry, len(ents))
69 for _, ent := range ents {
75 q.logger.Debugf("waking up waiters with latest %v", q.latest)
77 // Limit "bjobs" invocations to 1 per second