Merge branch '21703-collection-update-lock'
[arvados.git] / lib / lsf / lsfqueue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lsf
6
7 import (
8         "sync"
9         "time"
10
11         "github.com/sirupsen/logrus"
12 )
13
14 type lsfqueue struct {
15         logger logrus.FieldLogger
16         period time.Duration
17         lsfcli *lsfcli
18
19         initOnce  sync.Once
20         mutex     sync.Mutex
21         nextReady chan (<-chan struct{})
22         updated   *sync.Cond
23         latest    map[string]bjobsEntry
24 }
25
26 // Lookup waits for the next queue update (so even a job that was only
27 // submitted a nanosecond ago will show up) and then returns the LSF
28 // queue information corresponding to the given container UUID.
29 func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
30         ent, ok := q.getNext()[uuid]
31         return ent, ok
32 }
33
34 // All waits for the next queue update, then returns the names of all
35 // jobs in the queue. Used by checkLsfQueueForOrphans().
36 func (q *lsfqueue) All() []string {
37         latest := q.getNext()
38         names := make([]string, 0, len(latest))
39         for name := range latest {
40                 names = append(names, name)
41         }
42         return names
43 }
44
45 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
46         q.initOnce.Do(q.init)
47         q.logger.Debug("SetPriority is not implemented")
48 }
49
50 func (q *lsfqueue) getNext() map[string]bjobsEntry {
51         q.initOnce.Do(q.init)
52         <-(<-q.nextReady)
53         q.mutex.Lock()
54         defer q.mutex.Unlock()
55         return q.latest
56 }
57
58 func (q *lsfqueue) init() {
59         q.updated = sync.NewCond(&q.mutex)
60         q.nextReady = make(chan (<-chan struct{}))
61         ticker := time.NewTicker(q.period)
62         go func() {
63                 for range ticker.C {
64                         // Send a new "next update ready" channel to
65                         // the next goroutine that wants one (and any
66                         // others that have already queued up since
67                         // the first one started waiting).
68                         //
69                         // Below, when we get a new update, we'll
70                         // signal that to the other goroutines by
71                         // closing the ready chan.
72                         ready := make(chan struct{})
73                         q.nextReady <- ready
74                         for {
75                                 select {
76                                 case q.nextReady <- ready:
77                                         continue
78                                 default:
79                                 }
80                                 break
81                         }
82                         // Run bjobs repeatedly if needed, until we
83                         // get valid output.
84                         var ents []bjobsEntry
85                         for {
86                                 q.logger.Debug("running bjobs")
87                                 var err error
88                                 ents, err = q.lsfcli.Bjobs()
89                                 if err == nil {
90                                         break
91                                 }
92                                 q.logger.Warnf("bjobs: %s", err)
93                                 <-ticker.C
94                         }
95                         next := make(map[string]bjobsEntry, len(ents))
96                         for _, ent := range ents {
97                                 next[ent.Name] = ent
98                         }
99                         // Replace q.latest and notify all the
100                         // goroutines that the "next update" they
101                         // asked for is now ready.
102                         q.mutex.Lock()
103                         q.latest = next
104                         q.mutex.Unlock()
105                         close(ready)
106                 }
107         }()
108 }