Merge branch '12479-wb-structured-vocabulary'
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Queue polling frequency
33         PollPeriod time.Duration
34
35         // Time to wait between successive attempts to run the same container
36         MinRetryPeriod time.Duration
37
38         // Func that implements the container lifecycle. Must be set
39         // to a non-nil DispatchFunc before calling Run().
40         RunContainer DispatchFunc
41
42         auth     arvados.APIClientAuthorization
43         mtx      sync.Mutex
44         trackers map[string]*runTracker
45         throttle throttle
46 }
47
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
51 //
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
56 //
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
59
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
66         if err != nil {
67                 return fmt.Errorf("error getting my token UUID: %v", err)
68         }
69
70         d.throttle.hold = d.MinRetryPeriod
71
72         poll := time.NewTicker(d.PollPeriod)
73         defer poll.Stop()
74
75         for {
76                 select {
77                 case <-poll.C:
78                         break
79                 case <-ctx.Done():
80                         return ctx.Err()
81                 }
82
83                 todo := make(map[string]*runTracker)
84                 d.mtx.Lock()
85                 // Make a copy of trackers
86                 for uuid, tracker := range d.trackers {
87                         todo[uuid] = tracker
88                 }
89                 d.mtx.Unlock()
90
91                 // Containers I currently own (Locked/Running)
92                 querySuccess := d.checkForUpdates([][]interface{}{
93                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
94
95                 // Containers I should try to dispatch
96                 querySuccess = d.checkForUpdates([][]interface{}{
97                         {"state", "=", Queued},
98                         {"priority", ">", "0"}}, todo) && querySuccess
99
100                 if !querySuccess {
101                         // There was an error in one of the previous queries,
102                         // we probably didn't get updates for all the
103                         // containers we should have.  Don't check them
104                         // individually because it may be expensive.
105                         continue
106                 }
107
108                 // Containers I know about but didn't fall into the
109                 // above two categories (probably Complete/Cancelled)
110                 var missed []string
111                 for uuid := range todo {
112                         missed = append(missed, uuid)
113                 }
114
115                 for len(missed) > 0 {
116                         var batch []string
117                         if len(missed) > 20 {
118                                 batch = missed[0:20]
119                                 missed = missed[20:]
120                         } else {
121                                 batch = missed
122                                 missed = missed[0:0]
123                         }
124                         querySuccess = d.checkForUpdates([][]interface{}{
125                                 {"uuid", "in", batch}}, todo) && querySuccess
126                 }
127
128                 if !querySuccess {
129                         // There was an error in one of the previous queries, we probably
130                         // didn't see all the containers we should have, so don't shut down
131                         // the missed containers.
132                         continue
133                 }
134
135                 // Containers that I know about that didn't show up in any
136                 // query should be let go.
137                 for uuid, tracker := range todo {
138                         log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
139                         tracker.close()
140                 }
141
142         }
143 }
144
145 // Start a runner in a new goroutine, and send the initial container
146 // record to its updates channel.
147 func (d *Dispatcher) start(c arvados.Container) *runTracker {
148         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
149         tracker.updates <- c
150         go func() {
151                 d.RunContainer(d, c, tracker.updates)
152                 // RunContainer blocks for the lifetime of the container.  When
153                 // it returns, the tracker should delete itself.
154                 d.mtx.Lock()
155                 delete(d.trackers, c.UUID)
156                 d.mtx.Unlock()
157         }()
158         return tracker
159 }
160
161 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
162         params := arvadosclient.Dict{
163                 "filters": filters,
164                 "order":   []string{"priority desc"}}
165
166         var list arvados.ContainerList
167         for offset, more := 0, true; more; offset += len(list.Items) {
168                 params["offset"] = offset
169                 err := d.Arv.List("containers", params, &list)
170                 if err != nil {
171                         log.Printf("Error getting list of containers: %q", err)
172                         return false
173                 }
174                 more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
175                 d.checkListForUpdates(list.Items, todo)
176         }
177         return true
178 }
179
180 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
181         d.mtx.Lock()
182         defer d.mtx.Unlock()
183         if d.trackers == nil {
184                 d.trackers = make(map[string]*runTracker)
185         }
186
187         for _, c := range containers {
188                 tracker, alreadyTracking := d.trackers[c.UUID]
189                 delete(todo, c.UUID)
190
191                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
192                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
193                 } else if alreadyTracking {
194                         switch c.State {
195                         case Queued:
196                                 tracker.close()
197                         case Locked, Running:
198                                 tracker.update(c)
199                         case Cancelled, Complete:
200                                 tracker.close()
201                         }
202                 } else {
203                         switch c.State {
204                         case Queued:
205                                 if !d.throttle.Check(c.UUID) {
206                                         break
207                                 }
208                                 err := d.lock(c.UUID)
209                                 if err != nil {
210                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
211                                         break
212                                 }
213                                 c.State = Locked
214                                 d.trackers[c.UUID] = d.start(c)
215                         case Locked, Running:
216                                 if !d.throttle.Check(c.UUID) {
217                                         break
218                                 }
219                                 d.trackers[c.UUID] = d.start(c)
220                         case Cancelled, Complete:
221                                 // no-op (we already stopped monitoring)
222                         }
223                 }
224         }
225 }
226
227 // UpdateState makes an API call to change the state of a container.
228 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
229         err := d.Arv.Update("containers", uuid,
230                 arvadosclient.Dict{
231                         "container": arvadosclient.Dict{"state": state},
232                 }, nil)
233         if err != nil {
234                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
235         }
236         return err
237 }
238
239 // Lock makes the lock API call which updates the state of a container to Locked.
240 func (d *Dispatcher) lock(uuid string) error {
241         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
242 }
243
244 // Unlock makes the unlock API call which updates the state of a container to Queued.
245 func (d *Dispatcher) Unlock(uuid string) error {
246         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
247 }
248
249 // TrackContainer ensures a tracker is running for the given UUID,
250 // regardless of the current state of the container (except: if the
251 // container is locked by a different dispatcher, a tracker will not
252 // be started). If the container is not in Locked or Running state,
253 // the new tracker will close down immediately.
254 //
255 // This allows the dispatcher to put its own RunContainer func into a
256 // cleanup phase (for example, to kill local processes created by a
257 // prevous dispatch process that are still running even though the
258 // container state is final) without the risk of having multiple
259 // goroutines monitoring the same UUID.
260 func (d *Dispatcher) TrackContainer(uuid string) error {
261         var cntr arvados.Container
262         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
263         if err != nil {
264                 return err
265         }
266         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
267                 return nil
268         }
269
270         d.mtx.Lock()
271         defer d.mtx.Unlock()
272         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
273                 return nil
274         }
275         if d.trackers == nil {
276                 d.trackers = make(map[string]*runTracker)
277         }
278         d.trackers[uuid] = d.start(cntr)
279         switch cntr.State {
280         case Queued, Cancelled, Complete:
281                 d.trackers[uuid].close()
282         }
283         return nil
284 }
285
286 type runTracker struct {
287         closing bool
288         updates chan arvados.Container
289 }
290
291 func (tracker *runTracker) close() {
292         if !tracker.closing {
293                 close(tracker.updates)
294         }
295         tracker.closing = true
296 }
297
298 func (tracker *runTracker) update(c arvados.Container) {
299         if tracker.closing {
300                 return
301         }
302         select {
303         case <-tracker.updates:
304                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
305         default:
306         }
307         tracker.updates <- c
308 }