17756: Add lsf dispatcher.
[arvados.git] / lib / lsf / lsfqueue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "sync"
9         "time"
10
11         "github.com/sirupsen/logrus"
12 )
13
14 type lsfqueue struct {
15         logger logrus.FieldLogger
16         period time.Duration
17         lsfcli *lsfcli
18
19         initOnce   sync.Once
20         mutex      sync.Mutex
21         needUpdate chan bool
22         updated    *sync.Cond
23         latest     map[string]bjobsEntry
24 }
25
26 // JobID waits for the next queue update (so even a job that was only
27 // submitted a nanosecond ago will show up) and then returns the LSF
28 // job ID corresponding to the given container UUID.
29 func (q *lsfqueue) JobID(uuid string) (int, bool) {
30         q.initOnce.Do(q.init)
31         q.mutex.Lock()
32         defer q.mutex.Unlock()
33         select {
34         case q.needUpdate <- true:
35         default:
36                 // an update is already pending
37         }
38         q.updated.Wait()
39         ent, ok := q.latest[uuid]
40         q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
41         return ent.id, ok
42 }
43
44 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
45         q.initOnce.Do(q.init)
46         q.logger.Debug("SetPriority is not implemented")
47 }
48
49 func (q *lsfqueue) init() {
50         q.updated = sync.NewCond(&q.mutex)
51         q.needUpdate = make(chan bool, 1)
52         ticker := time.NewTicker(time.Second)
53         go func() {
54                 for range q.needUpdate {
55                         q.logger.Debug("running bjobs")
56                         ents, err := q.lsfcli.Bjobs()
57                         if err != nil {
58                                 q.logger.Warnf("bjobs: %s", err)
59                                 // Retry on the next tick, don't wait
60                                 // for another new call to JobID().
61                                 select {
62                                 case q.needUpdate <- true:
63                                 default:
64                                 }
65                                 <-ticker.C
66                                 continue
67                         }
68                         next := make(map[string]bjobsEntry, len(ents))
69                         for _, ent := range ents {
70                                 next[ent.name] = ent
71                         }
72                         q.mutex.Lock()
73                         q.latest = next
74                         q.updated.Broadcast()
75                         q.logger.Debugf("waking up waiters with latest %v", q.latest)
76                         q.mutex.Unlock()
77                         // Limit "bjobs" invocations to 1 per second
78                         <-ticker.C
79                 }
80         }()
81 }