13558: Merge branch 'master' into wtsi-hgi-13558-debug-log-tag-req-id
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Queue polling frequency
33         PollPeriod time.Duration
34
35         // Time to wait between successive attempts to run the same container
36         MinRetryPeriod time.Duration
37
38         // Func that implements the container lifecycle. Must be set
39         // to a non-nil DispatchFunc before calling Run().
40         RunContainer DispatchFunc
41
42         auth     arvados.APIClientAuthorization
43         mtx      sync.Mutex
44         trackers map[string]*runTracker
45         throttle throttle
46 }
47
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
51 //
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
56 //
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
59
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
66         if err != nil {
67                 return fmt.Errorf("error getting my token UUID: %v", err)
68         }
69
70         d.throttle.hold = d.MinRetryPeriod
71
72         poll := time.NewTicker(d.PollPeriod)
73         defer poll.Stop()
74
75         for {
76                 select {
77                 case <-poll.C:
78                         break
79                 case <-ctx.Done():
80                         return ctx.Err()
81                 }
82
83                 todo := make(map[string]*runTracker)
84                 d.mtx.Lock()
85                 // Make a copy of trackers
86                 for uuid, tracker := range d.trackers {
87                         todo[uuid] = tracker
88                 }
89                 d.mtx.Unlock()
90
91                 // Containers I currently own (Locked/Running)
92                 querySuccess := d.checkForUpdates([][]interface{}{
93                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
94
95                 // Containers I should try to dispatch
96                 querySuccess = d.checkForUpdates([][]interface{}{
97                         {"state", "=", Queued},
98                         {"priority", ">", "0"}}, todo) && querySuccess
99
100                 if !querySuccess {
101                         // There was an error in one of the previous queries,
102                         // we probably didn't get updates for all the
103                         // containers we should have.  Don't check them
104                         // individually because it may be expensive.
105                         continue
106                 }
107
108                 // Containers I know about but didn't fall into the
109                 // above two categories (probably Complete/Cancelled)
110                 var missed []string
111                 for uuid := range todo {
112                         missed = append(missed, uuid)
113                 }
114
115                 for len(missed) > 0 {
116                         var batch []string
117                         if len(missed) > 20 {
118                                 batch = missed[0:20]
119                                 missed = missed[20:]
120                         } else {
121                                 batch = missed
122                                 missed = missed[0:0]
123                         }
124                         querySuccess = d.checkForUpdates([][]interface{}{
125                                 {"uuid", "in", batch}}, todo) && querySuccess
126                 }
127
128                 if !querySuccess {
129                         // There was an error in one of the previous queries, we probably
130                         // didn't see all the containers we should have, so don't shut down
131                         // the missed containers.
132                         continue
133                 }
134
135                 // Containers that I know about that didn't show up in any
136                 // query should be let go.
137                 for uuid, tracker := range todo {
138                         log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
139                         tracker.close()
140                 }
141
142         }
143 }
144
145 // Start a runner in a new goroutine, and send the initial container
146 // record to its updates channel.
147 func (d *Dispatcher) start(c arvados.Container) *runTracker {
148         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
149         tracker.updates <- c
150         go func() {
151                 d.RunContainer(d, c, tracker.updates)
152                 // RunContainer blocks for the lifetime of the container.  When
153                 // it returns, the tracker should delete itself.
154                 d.mtx.Lock()
155                 delete(d.trackers, c.UUID)
156                 d.mtx.Unlock()
157         }()
158         return tracker
159 }
160
161 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
162         params := arvadosclient.Dict{
163                 "filters": filters,
164                 "order":   []string{"priority desc"}}
165         offset := 0
166         for {
167                 params["offset"] = offset
168
169                 // This list variable must be a new one declared
170                 // inside the loop: otherwise, items in the API
171                 // response would get deep-merged into the items
172                 // loaded in previous iterations.
173                 var list arvados.ContainerList
174
175                 err := d.Arv.List("containers", params, &list)
176                 if err != nil {
177                         log.Printf("Error getting list of containers: %q", err)
178                         return false
179                 }
180                 d.checkListForUpdates(list.Items, todo)
181                 offset += len(list.Items)
182                 if len(list.Items) == 0 || list.ItemsAvailable <= offset {
183                         return true
184                 }
185         }
186 }
187
188 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
189         d.mtx.Lock()
190         defer d.mtx.Unlock()
191         if d.trackers == nil {
192                 d.trackers = make(map[string]*runTracker)
193         }
194
195         for _, c := range containers {
196                 tracker, alreadyTracking := d.trackers[c.UUID]
197                 delete(todo, c.UUID)
198
199                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
200                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
201                 } else if alreadyTracking {
202                         switch c.State {
203                         case Queued:
204                                 tracker.close()
205                         case Locked, Running:
206                                 tracker.update(c)
207                         case Cancelled, Complete:
208                                 tracker.close()
209                         }
210                 } else {
211                         switch c.State {
212                         case Queued:
213                                 if !d.throttle.Check(c.UUID) {
214                                         break
215                                 }
216                                 err := d.lock(c.UUID)
217                                 if err != nil {
218                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
219                                         break
220                                 }
221                                 c.State = Locked
222                                 d.trackers[c.UUID] = d.start(c)
223                         case Locked, Running:
224                                 if !d.throttle.Check(c.UUID) {
225                                         break
226                                 }
227                                 d.trackers[c.UUID] = d.start(c)
228                         case Cancelled, Complete:
229                                 // no-op (we already stopped monitoring)
230                         }
231                 }
232         }
233 }
234
235 // UpdateState makes an API call to change the state of a container.
236 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
237         err := d.Arv.Update("containers", uuid,
238                 arvadosclient.Dict{
239                         "container": arvadosclient.Dict{"state": state},
240                 }, nil)
241         if err != nil {
242                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
243         }
244         return err
245 }
246
247 // Lock makes the lock API call which updates the state of a container to Locked.
248 func (d *Dispatcher) lock(uuid string) error {
249         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
250 }
251
252 // Unlock makes the unlock API call which updates the state of a container to Queued.
253 func (d *Dispatcher) Unlock(uuid string) error {
254         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
255 }
256
257 // TrackContainer ensures a tracker is running for the given UUID,
258 // regardless of the current state of the container (except: if the
259 // container is locked by a different dispatcher, a tracker will not
260 // be started). If the container is not in Locked or Running state,
261 // the new tracker will close down immediately.
262 //
263 // This allows the dispatcher to put its own RunContainer func into a
264 // cleanup phase (for example, to kill local processes created by a
265 // prevous dispatch process that are still running even though the
266 // container state is final) without the risk of having multiple
267 // goroutines monitoring the same UUID.
268 func (d *Dispatcher) TrackContainer(uuid string) error {
269         var cntr arvados.Container
270         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
271         if err != nil {
272                 return err
273         }
274         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
275                 return nil
276         }
277
278         d.mtx.Lock()
279         defer d.mtx.Unlock()
280         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
281                 return nil
282         }
283         if d.trackers == nil {
284                 d.trackers = make(map[string]*runTracker)
285         }
286         d.trackers[uuid] = d.start(cntr)
287         switch cntr.State {
288         case Queued, Cancelled, Complete:
289                 d.trackers[uuid].close()
290         }
291         return nil
292 }
293
294 type runTracker struct {
295         closing bool
296         updates chan arvados.Container
297 }
298
299 func (tracker *runTracker) close() {
300         if !tracker.closing {
301                 close(tracker.updates)
302         }
303         tracker.closing = true
304 }
305
306 func (tracker *runTracker) update(c arvados.Container) {
307         if tracker.closing {
308                 return
309         }
310         select {
311         case <-tracker.updates:
312                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
313         default:
314         }
315         tracker.updates <- c
316 }