13959: Use logrus for crunch-dispatch-local logging.
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17         "github.com/Sirupsen/logrus"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 type Logger interface {
29         Printf(string, ...interface{})
30         Warnf(string, ...interface{})
31         Debugf(string, ...interface{})
32 }
33
34 // Dispatcher struct
35 type Dispatcher struct {
36         Arv *arvadosclient.ArvadosClient
37
38         Logger Logger
39
40         // Queue polling frequency
41         PollPeriod time.Duration
42
43         // Time to wait between successive attempts to run the same container
44         MinRetryPeriod time.Duration
45
46         // Func that implements the container lifecycle. Must be set
47         // to a non-nil DispatchFunc before calling Run().
48         RunContainer DispatchFunc
49
50         auth     arvados.APIClientAuthorization
51         mtx      sync.Mutex
52         trackers map[string]*runTracker
53         throttle throttle
54 }
55
56 // A DispatchFunc executes a container (if the container record is
57 // Locked) or resume monitoring an already-running container, and wait
58 // until that container exits.
59 //
60 // While the container runs, the DispatchFunc should listen for
61 // updated container records on the provided channel. When the channel
62 // closes, the DispatchFunc should stop the container if it's still
63 // running, and return.
64 //
65 // The DispatchFunc should not return until the container is finished.
66 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
67
68 // Run watches the API server's queue for containers that are either
69 // ready to run and available to lock, or are already locked by this
70 // dispatcher's token. When a new one appears, Run calls RunContainer
71 // in a new goroutine.
72 func (d *Dispatcher) Run(ctx context.Context) error {
73         if d.Logger == nil {
74                 d.Logger = logrus.StandardLogger()
75         }
76
77         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
78         if err != nil {
79                 return fmt.Errorf("error getting my token UUID: %v", err)
80         }
81
82         d.throttle.hold = d.MinRetryPeriod
83
84         poll := time.NewTicker(d.PollPeriod)
85         defer poll.Stop()
86
87         for {
88                 select {
89                 case <-poll.C:
90                         break
91                 case <-ctx.Done():
92                         return ctx.Err()
93                 }
94
95                 todo := make(map[string]*runTracker)
96                 d.mtx.Lock()
97                 // Make a copy of trackers
98                 for uuid, tracker := range d.trackers {
99                         todo[uuid] = tracker
100                 }
101                 d.mtx.Unlock()
102
103                 // Containers I currently own (Locked/Running)
104                 querySuccess := d.checkForUpdates([][]interface{}{
105                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
106
107                 // Containers I should try to dispatch
108                 querySuccess = d.checkForUpdates([][]interface{}{
109                         {"state", "=", Queued},
110                         {"priority", ">", "0"}}, todo) && querySuccess
111
112                 if !querySuccess {
113                         // There was an error in one of the previous queries,
114                         // we probably didn't get updates for all the
115                         // containers we should have.  Don't check them
116                         // individually because it may be expensive.
117                         continue
118                 }
119
120                 // Containers I know about but didn't fall into the
121                 // above two categories (probably Complete/Cancelled)
122                 var missed []string
123                 for uuid := range todo {
124                         missed = append(missed, uuid)
125                 }
126
127                 for len(missed) > 0 {
128                         var batch []string
129                         if len(missed) > 20 {
130                                 batch = missed[0:20]
131                                 missed = missed[20:]
132                         } else {
133                                 batch = missed
134                                 missed = missed[0:0]
135                         }
136                         querySuccess = d.checkForUpdates([][]interface{}{
137                                 {"uuid", "in", batch}}, todo) && querySuccess
138                 }
139
140                 if !querySuccess {
141                         // There was an error in one of the previous queries, we probably
142                         // didn't see all the containers we should have, so don't shut down
143                         // the missed containers.
144                         continue
145                 }
146
147                 // Containers that I know about that didn't show up in any
148                 // query should be let go.
149                 for uuid, tracker := range todo {
150                         d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
151                         tracker.close()
152                 }
153
154         }
155 }
156
157 // Start a runner in a new goroutine, and send the initial container
158 // record to its updates channel.
159 func (d *Dispatcher) start(c arvados.Container) *runTracker {
160         tracker := &runTracker{
161                 updates: make(chan arvados.Container, 1),
162                 logger:  d.Logger,
163         }
164         tracker.updates <- c
165         go func() {
166                 d.RunContainer(d, c, tracker.updates)
167                 // RunContainer blocks for the lifetime of the container.  When
168                 // it returns, the tracker should delete itself.
169                 d.mtx.Lock()
170                 delete(d.trackers, c.UUID)
171                 d.mtx.Unlock()
172         }()
173         return tracker
174 }
175
176 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
177         params := arvadosclient.Dict{
178                 "filters": filters,
179                 "order":   []string{"priority desc"}}
180         offset := 0
181         for {
182                 params["offset"] = offset
183
184                 // This list variable must be a new one declared
185                 // inside the loop: otherwise, items in the API
186                 // response would get deep-merged into the items
187                 // loaded in previous iterations.
188                 var list arvados.ContainerList
189
190                 err := d.Arv.List("containers", params, &list)
191                 if err != nil {
192                         d.Logger.Warnf("error getting list of containers: %q", err)
193                         return false
194                 }
195                 d.checkListForUpdates(list.Items, todo)
196                 offset += len(list.Items)
197                 if len(list.Items) == 0 || list.ItemsAvailable <= offset {
198                         return true
199                 }
200         }
201 }
202
203 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
204         d.mtx.Lock()
205         defer d.mtx.Unlock()
206         if d.trackers == nil {
207                 d.trackers = make(map[string]*runTracker)
208         }
209
210         for _, c := range containers {
211                 tracker, alreadyTracking := d.trackers[c.UUID]
212                 delete(todo, c.UUID)
213
214                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
215                         d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
216                 } else if alreadyTracking {
217                         switch c.State {
218                         case Queued:
219                                 tracker.close()
220                         case Locked, Running:
221                                 tracker.update(c)
222                         case Cancelled, Complete:
223                                 tracker.close()
224                         }
225                 } else {
226                         switch c.State {
227                         case Queued:
228                                 if !d.throttle.Check(c.UUID) {
229                                         break
230                                 }
231                                 err := d.lock(c.UUID)
232                                 if err != nil {
233                                         d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
234                                         break
235                                 }
236                                 c.State = Locked
237                                 d.trackers[c.UUID] = d.start(c)
238                         case Locked, Running:
239                                 if !d.throttle.Check(c.UUID) {
240                                         break
241                                 }
242                                 d.trackers[c.UUID] = d.start(c)
243                         case Cancelled, Complete:
244                                 // no-op (we already stopped monitoring)
245                         }
246                 }
247         }
248 }
249
250 // UpdateState makes an API call to change the state of a container.
251 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
252         err := d.Arv.Update("containers", uuid,
253                 arvadosclient.Dict{
254                         "container": arvadosclient.Dict{"state": state},
255                 }, nil)
256         if err != nil {
257                 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
258         }
259         return err
260 }
261
262 // Lock makes the lock API call which updates the state of a container to Locked.
263 func (d *Dispatcher) lock(uuid string) error {
264         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
265 }
266
267 // Unlock makes the unlock API call which updates the state of a container to Queued.
268 func (d *Dispatcher) Unlock(uuid string) error {
269         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
270 }
271
272 // TrackContainer ensures a tracker is running for the given UUID,
273 // regardless of the current state of the container (except: if the
274 // container is locked by a different dispatcher, a tracker will not
275 // be started). If the container is not in Locked or Running state,
276 // the new tracker will close down immediately.
277 //
278 // This allows the dispatcher to put its own RunContainer func into a
279 // cleanup phase (for example, to kill local processes created by a
280 // prevous dispatch process that are still running even though the
281 // container state is final) without the risk of having multiple
282 // goroutines monitoring the same UUID.
283 func (d *Dispatcher) TrackContainer(uuid string) error {
284         var cntr arvados.Container
285         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
286         if err != nil {
287                 return err
288         }
289         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
290                 return nil
291         }
292
293         d.mtx.Lock()
294         defer d.mtx.Unlock()
295         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
296                 return nil
297         }
298         if d.trackers == nil {
299                 d.trackers = make(map[string]*runTracker)
300         }
301         d.trackers[uuid] = d.start(cntr)
302         switch cntr.State {
303         case Queued, Cancelled, Complete:
304                 d.trackers[uuid].close()
305         }
306         return nil
307 }
308
309 type runTracker struct {
310         closing bool
311         updates chan arvados.Container
312         logger  Logger
313 }
314
315 func (tracker *runTracker) close() {
316         if !tracker.closing {
317                 close(tracker.updates)
318         }
319         tracker.closing = true
320 }
321
322 func (tracker *runTracker) update(c arvados.Container) {
323         if tracker.closing {
324                 return
325         }
326         select {
327         case <-tracker.updates:
328                 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
329         default:
330         }
331         tracker.updates <- c
332 }