Merge branch '17853-fix-write-with-rlock' into main
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17         "github.com/sirupsen/logrus"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 type Logger interface {
29         Printf(string, ...interface{})
30         Warnf(string, ...interface{})
31         Debugf(string, ...interface{})
32 }
33
34 // Dispatcher struct
35 type Dispatcher struct {
36         Arv *arvadosclient.ArvadosClient
37
38         Logger Logger
39
40         // Batch size for container queries
41         BatchSize int
42
43         // Queue polling frequency
44         PollPeriod time.Duration
45
46         // Time to wait between successive attempts to run the same container
47         MinRetryPeriod time.Duration
48
49         // Func that implements the container lifecycle. Must be set
50         // to a non-nil DispatchFunc before calling Run().
51         RunContainer DispatchFunc
52
53         auth     arvados.APIClientAuthorization
54         mtx      sync.Mutex
55         trackers map[string]*runTracker
56         throttle throttle
57 }
58
59 // A DispatchFunc executes a container (if the container record is
60 // Locked) or resume monitoring an already-running container, and wait
61 // until that container exits.
62 //
63 // While the container runs, the DispatchFunc should listen for
64 // updated container records on the provided channel. When the channel
65 // closes, the DispatchFunc should stop the container if it's still
66 // running, and return.
67 //
68 // The DispatchFunc should not return until the container is finished.
69 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
70
71 // Run watches the API server's queue for containers that are either
72 // ready to run and available to lock, or are already locked by this
73 // dispatcher's token. When a new one appears, Run calls RunContainer
74 // in a new goroutine.
75 func (d *Dispatcher) Run(ctx context.Context) error {
76         if d.Logger == nil {
77                 d.Logger = logrus.StandardLogger()
78         }
79
80         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
81         if err != nil {
82                 return fmt.Errorf("error getting my token UUID: %v", err)
83         }
84
85         d.throttle.hold = d.MinRetryPeriod
86
87         poll := time.NewTicker(d.PollPeriod)
88         defer poll.Stop()
89
90         if d.BatchSize == 0 {
91                 d.BatchSize = 100
92         }
93
94         for {
95                 select {
96                 case <-poll.C:
97                         break
98                 case <-ctx.Done():
99                         return ctx.Err()
100                 }
101
102                 todo := make(map[string]*runTracker)
103                 d.mtx.Lock()
104                 // Make a copy of trackers
105                 for uuid, tracker := range d.trackers {
106                         todo[uuid] = tracker
107                 }
108                 d.mtx.Unlock()
109
110                 // Containers I currently own (Locked/Running)
111                 querySuccess := d.checkForUpdates([][]interface{}{
112                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
113
114                 // Containers I should try to dispatch
115                 querySuccess = d.checkForUpdates([][]interface{}{
116                         {"state", "=", Queued},
117                         {"priority", ">", "0"}}, todo) && querySuccess
118
119                 if !querySuccess {
120                         // There was an error in one of the previous queries,
121                         // we probably didn't get updates for all the
122                         // containers we should have.  Don't check them
123                         // individually because it may be expensive.
124                         continue
125                 }
126
127                 // Containers I know about but didn't fall into the
128                 // above two categories (probably Complete/Cancelled)
129                 var missed []string
130                 for uuid := range todo {
131                         missed = append(missed, uuid)
132                 }
133
134                 for len(missed) > 0 {
135                         var batch []string
136                         if len(missed) > 20 {
137                                 batch = missed[0:20]
138                                 missed = missed[20:]
139                         } else {
140                                 batch = missed
141                                 missed = missed[0:0]
142                         }
143                         querySuccess = d.checkForUpdates([][]interface{}{
144                                 {"uuid", "in", batch}}, todo) && querySuccess
145                 }
146
147                 if !querySuccess {
148                         // There was an error in one of the previous queries, we probably
149                         // didn't see all the containers we should have, so don't shut down
150                         // the missed containers.
151                         continue
152                 }
153
154                 // Containers that I know about that didn't show up in any
155                 // query should be let go.
156                 for uuid, tracker := range todo {
157                         d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
158                         tracker.close()
159                 }
160
161         }
162 }
163
164 // Start a runner in a new goroutine, and send the initial container
165 // record to its updates channel.
166 func (d *Dispatcher) start(c arvados.Container) *runTracker {
167         tracker := &runTracker{
168                 updates: make(chan arvados.Container, 1),
169                 logger:  d.Logger,
170         }
171         tracker.updates <- c
172         go func() {
173                 d.RunContainer(d, c, tracker.updates)
174                 // RunContainer blocks for the lifetime of the container.  When
175                 // it returns, the tracker should delete itself.
176                 d.mtx.Lock()
177                 delete(d.trackers, c.UUID)
178                 d.mtx.Unlock()
179         }()
180         return tracker
181 }
182
183 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
184         var countList arvados.ContainerList
185         params := arvadosclient.Dict{
186                 "filters": filters,
187                 "count":   "exact",
188                 "limit":   0,
189                 "order":   []string{"priority desc"}}
190         err := d.Arv.List("containers", params, &countList)
191         if err != nil {
192                 d.Logger.Warnf("error getting count of containers: %q", err)
193                 return false
194         }
195         itemsAvailable := countList.ItemsAvailable
196         params = arvadosclient.Dict{
197                 "filters": filters,
198                 "count":   "none",
199                 "limit":   d.BatchSize,
200                 "order":   []string{"priority desc"}}
201         offset := 0
202         for {
203                 params["offset"] = offset
204
205                 // This list variable must be a new one declared
206                 // inside the loop: otherwise, items in the API
207                 // response would get deep-merged into the items
208                 // loaded in previous iterations.
209                 var list arvados.ContainerList
210
211                 err := d.Arv.List("containers", params, &list)
212                 if err != nil {
213                         d.Logger.Warnf("error getting list of containers: %q", err)
214                         return false
215                 }
216                 d.checkListForUpdates(list.Items, todo)
217                 offset += len(list.Items)
218                 if len(list.Items) == 0 || itemsAvailable <= offset {
219                         return true
220                 }
221         }
222 }
223
224 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
225         d.mtx.Lock()
226         defer d.mtx.Unlock()
227         if d.trackers == nil {
228                 d.trackers = make(map[string]*runTracker)
229         }
230
231         for _, c := range containers {
232                 tracker, alreadyTracking := d.trackers[c.UUID]
233                 delete(todo, c.UUID)
234
235                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
236                         d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
237                 } else if alreadyTracking {
238                         switch c.State {
239                         case Queued:
240                                 tracker.close()
241                         case Locked, Running:
242                                 tracker.update(c)
243                         case Cancelled, Complete:
244                                 tracker.close()
245                         }
246                 } else {
247                         switch c.State {
248                         case Queued:
249                                 if !d.throttle.Check(c.UUID) {
250                                         break
251                                 }
252                                 err := d.lock(c.UUID)
253                                 if err != nil {
254                                         d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
255                                         break
256                                 }
257                                 c.State = Locked
258                                 d.trackers[c.UUID] = d.start(c)
259                         case Locked, Running:
260                                 if !d.throttle.Check(c.UUID) {
261                                         break
262                                 }
263                                 d.trackers[c.UUID] = d.start(c)
264                         case Cancelled, Complete:
265                                 // no-op (we already stopped monitoring)
266                         }
267                 }
268         }
269 }
270
271 // UpdateState makes an API call to change the state of a container.
272 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
273         err := d.Arv.Update("containers", uuid,
274                 arvadosclient.Dict{
275                         "container": arvadosclient.Dict{"state": state},
276                 }, nil)
277         if err != nil {
278                 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
279         }
280         return err
281 }
282
283 // Lock makes the lock API call which updates the state of a container to Locked.
284 func (d *Dispatcher) lock(uuid string) error {
285         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
286 }
287
288 // Unlock makes the unlock API call which updates the state of a container to Queued.
289 func (d *Dispatcher) Unlock(uuid string) error {
290         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
291 }
292
293 // TrackContainer ensures a tracker is running for the given UUID,
294 // regardless of the current state of the container (except: if the
295 // container is locked by a different dispatcher, a tracker will not
296 // be started). If the container is not in Locked or Running state,
297 // the new tracker will close down immediately.
298 //
299 // This allows the dispatcher to put its own RunContainer func into a
300 // cleanup phase (for example, to kill local processes created by a
301 // prevous dispatch process that are still running even though the
302 // container state is final) without the risk of having multiple
303 // goroutines monitoring the same UUID.
304 func (d *Dispatcher) TrackContainer(uuid string) error {
305         var cntr arvados.Container
306         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
307         if err != nil {
308                 return err
309         }
310         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
311                 return nil
312         }
313
314         d.mtx.Lock()
315         defer d.mtx.Unlock()
316         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
317                 return nil
318         }
319         if d.trackers == nil {
320                 d.trackers = make(map[string]*runTracker)
321         }
322         d.trackers[uuid] = d.start(cntr)
323         switch cntr.State {
324         case Queued, Cancelled, Complete:
325                 d.trackers[uuid].close()
326         }
327         return nil
328 }
329
330 type runTracker struct {
331         closing bool
332         updates chan arvados.Container
333         logger  Logger
334 }
335
336 func (tracker *runTracker) close() {
337         if !tracker.closing {
338                 close(tracker.updates)
339         }
340         tracker.closing = true
341 }
342
343 func (tracker *runTracker) update(c arvados.Container) {
344         if tracker.closing {
345                 return
346         }
347         select {
348         case <-tracker.updates:
349                 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
350         default:
351         }
352         tracker.updates <- c
353 }