Merge branch '13788-concurrent-map-write'
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Queue polling frequency
33         PollPeriod time.Duration
34
35         // Time to wait between successive attempts to run the same container
36         MinRetryPeriod time.Duration
37
38         // Func that implements the container lifecycle. Must be set
39         // to a non-nil DispatchFunc before calling Run().
40         RunContainer DispatchFunc
41
42         auth     arvados.APIClientAuthorization
43         mtx      sync.Mutex
44         trackers map[string]*runTracker
45         throttle throttle
46 }
47
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
51 //
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
56 //
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
59
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
66         if err != nil {
67                 return fmt.Errorf("error getting my token UUID: %v", err)
68         }
69
70         d.throttle.hold = d.MinRetryPeriod
71
72         poll := time.NewTicker(d.PollPeriod)
73         defer poll.Stop()
74
75         for {
76                 select {
77                 case <-poll.C:
78                         break
79                 case <-ctx.Done():
80                         return ctx.Err()
81                 }
82
83                 todo := make(map[string]*runTracker)
84                 d.mtx.Lock()
85                 // Make a copy of trackers
86                 for uuid, tracker := range d.trackers {
87                         todo[uuid] = tracker
88                 }
89                 d.mtx.Unlock()
90
91                 // Containers I currently own (Locked/Running)
92                 querySuccess := d.checkForUpdates([][]interface{}{
93                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
94
95                 // Containers I should try to dispatch
96                 querySuccess = d.checkForUpdates([][]interface{}{
97                         {"state", "=", Queued},
98                         {"priority", ">", "0"}}, todo) && querySuccess
99
100                 if !querySuccess {
101                         // There was an error in one of the previous queries,
102                         // we probably didn't get updates for all the
103                         // containers we should have.  Don't check them
104                         // individually because it may be expensive.
105                         continue
106                 }
107
108                 // Containers I know about but didn't fall into the
109                 // above two categories (probably Complete/Cancelled)
110                 var missed []string
111                 for uuid := range todo {
112                         missed = append(missed, uuid)
113                 }
114
115                 for len(missed) > 0 {
116                         var batch []string
117                         if len(missed) > 20 {
118                                 batch = missed[0:20]
119                                 missed = missed[20:]
120                         } else {
121                                 batch = missed
122                                 missed = missed[0:0]
123                         }
124                         querySuccess = d.checkForUpdates([][]interface{}{
125                                 {"uuid", "in", batch}}, todo) && querySuccess
126                 }
127
128                 if !querySuccess {
129                         // There was an error in one of the previous queries, we probably
130                         // didn't see all the containers we should have, so don't shut down
131                         // the missed containers.
132                         continue
133                 }
134
135                 // Containers that I know about that didn't show up in any
136                 // query should be let go.
137                 for uuid, tracker := range todo {
138                         log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
139                         tracker.close()
140                 }
141
142         }
143 }
144
145 // Start a runner in a new goroutine, and send the initial container
146 // record to its updates channel.
147 func (d *Dispatcher) start(c arvados.Container) *runTracker {
148         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
149         tracker.updates <- c
150         go func() {
151                 d.RunContainer(d, c, tracker.updates)
152                 // RunContainer blocks for the lifetime of the container.  When
153                 // it returns, the tracker should delete itself.
154                 d.mtx.Lock()
155                 delete(d.trackers, c.UUID)
156                 d.mtx.Unlock()
157         }()
158         return tracker
159 }
160
161 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
162         params := arvadosclient.Dict{
163                 "filters": filters,
164                 "order":   []string{"priority desc"}}
165         offset := 0
166         for {
167                 params["offset"] = offset
168                 var list arvados.ContainerList
169                 err := d.Arv.List("containers", params, &list)
170                 if err != nil {
171                         log.Printf("Error getting list of containers: %q", err)
172                         return false
173                 }
174                 d.checkListForUpdates(list.Items, todo)
175                 offset += len(list.Items)
176                 if len(list.Items) == 0 || list.ItemsAvailable <= offset {
177                         return true
178                 }
179         }
180 }
181
182 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
183         d.mtx.Lock()
184         defer d.mtx.Unlock()
185         if d.trackers == nil {
186                 d.trackers = make(map[string]*runTracker)
187         }
188
189         for _, c := range containers {
190                 tracker, alreadyTracking := d.trackers[c.UUID]
191                 delete(todo, c.UUID)
192
193                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
194                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
195                 } else if alreadyTracking {
196                         switch c.State {
197                         case Queued:
198                                 tracker.close()
199                         case Locked, Running:
200                                 tracker.update(c)
201                         case Cancelled, Complete:
202                                 tracker.close()
203                         }
204                 } else {
205                         switch c.State {
206                         case Queued:
207                                 if !d.throttle.Check(c.UUID) {
208                                         break
209                                 }
210                                 err := d.lock(c.UUID)
211                                 if err != nil {
212                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
213                                         break
214                                 }
215                                 c.State = Locked
216                                 d.trackers[c.UUID] = d.start(c)
217                         case Locked, Running:
218                                 if !d.throttle.Check(c.UUID) {
219                                         break
220                                 }
221                                 d.trackers[c.UUID] = d.start(c)
222                         case Cancelled, Complete:
223                                 // no-op (we already stopped monitoring)
224                         }
225                 }
226         }
227 }
228
229 // UpdateState makes an API call to change the state of a container.
230 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
231         err := d.Arv.Update("containers", uuid,
232                 arvadosclient.Dict{
233                         "container": arvadosclient.Dict{"state": state},
234                 }, nil)
235         if err != nil {
236                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
237         }
238         return err
239 }
240
241 // Lock makes the lock API call which updates the state of a container to Locked.
242 func (d *Dispatcher) lock(uuid string) error {
243         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
244 }
245
246 // Unlock makes the unlock API call which updates the state of a container to Queued.
247 func (d *Dispatcher) Unlock(uuid string) error {
248         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
249 }
250
251 // TrackContainer ensures a tracker is running for the given UUID,
252 // regardless of the current state of the container (except: if the
253 // container is locked by a different dispatcher, a tracker will not
254 // be started). If the container is not in Locked or Running state,
255 // the new tracker will close down immediately.
256 //
257 // This allows the dispatcher to put its own RunContainer func into a
258 // cleanup phase (for example, to kill local processes created by a
259 // prevous dispatch process that are still running even though the
260 // container state is final) without the risk of having multiple
261 // goroutines monitoring the same UUID.
262 func (d *Dispatcher) TrackContainer(uuid string) error {
263         var cntr arvados.Container
264         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
265         if err != nil {
266                 return err
267         }
268         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
269                 return nil
270         }
271
272         d.mtx.Lock()
273         defer d.mtx.Unlock()
274         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
275                 return nil
276         }
277         if d.trackers == nil {
278                 d.trackers = make(map[string]*runTracker)
279         }
280         d.trackers[uuid] = d.start(cntr)
281         switch cntr.State {
282         case Queued, Cancelled, Complete:
283                 d.trackers[uuid].close()
284         }
285         return nil
286 }
287
288 type runTracker struct {
289         closing bool
290         updates chan arvados.Container
291 }
292
293 func (tracker *runTracker) close() {
294         if !tracker.closing {
295                 close(tracker.updates)
296         }
297         tracker.closing = true
298 }
299
300 func (tracker *runTracker) update(c arvados.Container) {
301         if tracker.closing {
302                 return
303         }
304         select {
305         case <-tracker.updates:
306                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
307         default:
308         }
309         tracker.updates <- c
310 }