Merge branch '13340-wb-keep-links'
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Batch size for container queries
33         BatchSize int64
34
35         // Queue polling frequency
36         PollPeriod time.Duration
37
38         // Time to wait between successive attempts to run the same container
39         MinRetryPeriod time.Duration
40
41         // Func that implements the container lifecycle. Must be set
42         // to a non-nil DispatchFunc before calling Run().
43         RunContainer DispatchFunc
44
45         auth     arvados.APIClientAuthorization
46         mtx      sync.Mutex
47         trackers map[string]*runTracker
48         throttle throttle
49 }
50
51 // A DispatchFunc executes a container (if the container record is
52 // Locked) or resume monitoring an already-running container, and wait
53 // until that container exits.
54 //
55 // While the container runs, the DispatchFunc should listen for
56 // updated container records on the provided channel. When the channel
57 // closes, the DispatchFunc should stop the container if it's still
58 // running, and return.
59 //
60 // The DispatchFunc should not return until the container is finished.
61 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
62
63 // Run watches the API server's queue for containers that are either
64 // ready to run and available to lock, or are already locked by this
65 // dispatcher's token. When a new one appears, Run calls RunContainer
66 // in a new goroutine.
67 func (d *Dispatcher) Run(ctx context.Context) error {
68         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
69         if err != nil {
70                 return fmt.Errorf("error getting my token UUID: %v", err)
71         }
72
73         d.throttle.hold = d.MinRetryPeriod
74
75         poll := time.NewTicker(d.PollPeriod)
76         defer poll.Stop()
77
78         if d.BatchSize == 0 {
79                 d.BatchSize = 100
80         }
81
82         for {
83                 select {
84                 case <-poll.C:
85                         break
86                 case <-ctx.Done():
87                         return ctx.Err()
88                 }
89
90                 todo := make(map[string]*runTracker)
91                 d.mtx.Lock()
92                 // Make a copy of trackers
93                 for uuid, tracker := range d.trackers {
94                         todo[uuid] = tracker
95                 }
96                 d.mtx.Unlock()
97
98                 // Containers I currently own (Locked/Running)
99                 querySuccess := d.checkForUpdates([][]interface{}{
100                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
101
102                 // Containers I should try to dispatch
103                 querySuccess = d.checkForUpdates([][]interface{}{
104                         {"state", "=", Queued},
105                         {"priority", ">", "0"}}, todo) && querySuccess
106
107                 if !querySuccess {
108                         // There was an error in one of the previous queries,
109                         // we probably didn't get updates for all the
110                         // containers we should have.  Don't check them
111                         // individually because it may be expensive.
112                         continue
113                 }
114
115                 // Containers I know about but didn't fall into the
116                 // above two categories (probably Complete/Cancelled)
117                 var missed []string
118                 for uuid := range todo {
119                         missed = append(missed, uuid)
120                 }
121
122                 for len(missed) > 0 {
123                         var batch []string
124                         if len(missed) > 20 {
125                                 batch = missed[0:20]
126                                 missed = missed[20:]
127                         } else {
128                                 batch = missed
129                                 missed = missed[0:0]
130                         }
131                         querySuccess = d.checkForUpdates([][]interface{}{
132                                 {"uuid", "in", batch}}, todo) && querySuccess
133                 }
134
135                 if !querySuccess {
136                         // There was an error in one of the previous queries, we probably
137                         // didn't see all the containers we should have, so don't shut down
138                         // the missed containers.
139                         continue
140                 }
141
142                 // Containers that I know about that didn't show up in any
143                 // query should be let go.
144                 for uuid, tracker := range todo {
145                         log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
146                         tracker.close()
147                 }
148
149         }
150 }
151
152 // Start a runner in a new goroutine, and send the initial container
153 // record to its updates channel.
154 func (d *Dispatcher) start(c arvados.Container) *runTracker {
155         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
156         tracker.updates <- c
157         go func() {
158                 d.RunContainer(d, c, tracker.updates)
159                 // RunContainer blocks for the lifetime of the container.  When
160                 // it returns, the tracker should delete itself.
161                 d.mtx.Lock()
162                 delete(d.trackers, c.UUID)
163                 d.mtx.Unlock()
164         }()
165         return tracker
166 }
167
168 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
169         var countList arvados.ContainerList
170         params := arvadosclient.Dict{
171                 "filters": filters,
172                 "count":   "exact",
173                 "limit":   0,
174                 "order":   []string{"priority desc"}}
175         err := d.Arv.List("containers", params, &countList)
176         if err != nil {
177                 log.Printf("error getting count of containers: %q", err)
178                 return false
179         }
180         itemsAvailable := countList.ItemsAvailable
181         params = arvadosclient.Dict{
182                 "filters": filters,
183                 "count":   "none",
184                 "limit":   d.BatchSize,
185                 "order":   []string{"priority desc"}}
186         offset := 0
187         for {
188                 params["offset"] = offset
189
190                 // This list variable must be a new one declared
191                 // inside the loop: otherwise, items in the API
192                 // response would get deep-merged into the items
193                 // loaded in previous iterations.
194                 var list arvados.ContainerList
195
196                 err := d.Arv.List("containers", params, &list)
197                 if err != nil {
198                         log.Printf("Error getting list of containers: %q", err)
199                         return false
200                 }
201                 d.checkListForUpdates(list.Items, todo)
202                 offset += len(list.Items)
203                 if len(list.Items) == 0 || itemsAvailable <= offset {
204                         return true
205                 }
206         }
207 }
208
209 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
210         d.mtx.Lock()
211         defer d.mtx.Unlock()
212         if d.trackers == nil {
213                 d.trackers = make(map[string]*runTracker)
214         }
215
216         for _, c := range containers {
217                 tracker, alreadyTracking := d.trackers[c.UUID]
218                 delete(todo, c.UUID)
219
220                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
221                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
222                 } else if alreadyTracking {
223                         switch c.State {
224                         case Queued:
225                                 tracker.close()
226                         case Locked, Running:
227                                 tracker.update(c)
228                         case Cancelled, Complete:
229                                 tracker.close()
230                         }
231                 } else {
232                         switch c.State {
233                         case Queued:
234                                 if !d.throttle.Check(c.UUID) {
235                                         break
236                                 }
237                                 err := d.lock(c.UUID)
238                                 if err != nil {
239                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
240                                         break
241                                 }
242                                 c.State = Locked
243                                 d.trackers[c.UUID] = d.start(c)
244                         case Locked, Running:
245                                 if !d.throttle.Check(c.UUID) {
246                                         break
247                                 }
248                                 d.trackers[c.UUID] = d.start(c)
249                         case Cancelled, Complete:
250                                 // no-op (we already stopped monitoring)
251                         }
252                 }
253         }
254 }
255
256 // UpdateState makes an API call to change the state of a container.
257 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
258         err := d.Arv.Update("containers", uuid,
259                 arvadosclient.Dict{
260                         "container": arvadosclient.Dict{"state": state},
261                 }, nil)
262         if err != nil {
263                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
264         }
265         return err
266 }
267
268 // Lock makes the lock API call which updates the state of a container to Locked.
269 func (d *Dispatcher) lock(uuid string) error {
270         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
271 }
272
273 // Unlock makes the unlock API call which updates the state of a container to Queued.
274 func (d *Dispatcher) Unlock(uuid string) error {
275         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
276 }
277
278 // TrackContainer ensures a tracker is running for the given UUID,
279 // regardless of the current state of the container (except: if the
280 // container is locked by a different dispatcher, a tracker will not
281 // be started). If the container is not in Locked or Running state,
282 // the new tracker will close down immediately.
283 //
284 // This allows the dispatcher to put its own RunContainer func into a
285 // cleanup phase (for example, to kill local processes created by a
286 // prevous dispatch process that are still running even though the
287 // container state is final) without the risk of having multiple
288 // goroutines monitoring the same UUID.
289 func (d *Dispatcher) TrackContainer(uuid string) error {
290         var cntr arvados.Container
291         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
292         if err != nil {
293                 return err
294         }
295         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
296                 return nil
297         }
298
299         d.mtx.Lock()
300         defer d.mtx.Unlock()
301         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
302                 return nil
303         }
304         if d.trackers == nil {
305                 d.trackers = make(map[string]*runTracker)
306         }
307         d.trackers[uuid] = d.start(cntr)
308         switch cntr.State {
309         case Queued, Cancelled, Complete:
310                 d.trackers[uuid].close()
311         }
312         return nil
313 }
314
315 type runTracker struct {
316         closing bool
317         updates chan arvados.Container
318 }
319
320 func (tracker *runTracker) close() {
321         if !tracker.closing {
322                 close(tracker.updates)
323         }
324         tracker.closing = true
325 }
326
327 func (tracker *runTracker) update(c arvados.Container) {
328         if tracker.closing {
329                 return
330         }
331         select {
332         case <-tracker.updates:
333                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
334         default:
335         }
336         tracker.updates <- c
337 }