13219: Checks for expired run time and cancel container if needed.
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Queue polling frequency
33         PollPeriod time.Duration
34
35         // Time to wait between successive attempts to run the same container
36         MinRetryPeriod time.Duration
37
38         // Func that implements the container lifecycle. Must be set
39         // to a non-nil DispatchFunc before calling Run().
40         RunContainer DispatchFunc
41
42         auth     arvados.APIClientAuthorization
43         mtx      sync.Mutex
44         trackers map[string]*runTracker
45         throttle throttle
46 }
47
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
51 //
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
56 //
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
59
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
66         if err != nil {
67                 return fmt.Errorf("error getting my token UUID: %v", err)
68         }
69
70         d.throttle.hold = d.MinRetryPeriod
71
72         poll := time.NewTicker(d.PollPeriod)
73         defer poll.Stop()
74
75         for {
76                 select {
77                 case <-poll.C:
78                         break
79                 case <-ctx.Done():
80                         return ctx.Err()
81                 }
82
83                 todo := make(map[string]*runTracker)
84                 d.mtx.Lock()
85                 // Make a copy of trackers
86                 for uuid, tracker := range d.trackers {
87                         todo[uuid] = tracker
88                 }
89                 d.mtx.Unlock()
90
91                 // Containers I currently own (Locked/Running)
92                 querySuccess := d.checkForUpdates([][]interface{}{
93                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
94
95                 // Containers I should try to dispatch
96                 querySuccess = d.checkForUpdates([][]interface{}{
97                         {"state", "=", Queued},
98                         {"priority", ">", "0"}}, todo) && querySuccess
99
100                 if !querySuccess {
101                         // There was an error in one of the previous queries,
102                         // we probably didn't get updates for all the
103                         // containers we should have.  Don't check them
104                         // individually because it may be expensive.
105                         continue
106                 }
107
108                 // Containers I know about but didn't fall into the
109                 // above two categories (probably Complete/Cancelled)
110                 var missed []string
111                 for uuid := range todo {
112                         missed = append(missed, uuid)
113                 }
114
115                 for len(missed) > 0 {
116                         var batch []string
117                         if len(missed) > 20 {
118                                 batch = missed[0:20]
119                                 missed = missed[20:]
120                         } else {
121                                 batch = missed
122                                 missed = missed[0:0]
123                         }
124                         querySuccess = d.checkForUpdates([][]interface{}{
125                                 {"uuid", "in", batch}}, todo) && querySuccess
126                 }
127
128                 if !querySuccess {
129                         // There was an error in one of the previous queries, we probably
130                         // didn't see all the containers we should have, so don't shut down
131                         // the missed containers.
132                         continue
133                 }
134
135                 // Containers that I know about that didn't show up in any
136                 // query should be let go.
137                 for uuid, tracker := range todo {
138                         log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
139                         tracker.close()
140                 }
141
142         }
143 }
144
145 // Start a runner in a new goroutine, and send the initial container
146 // record to its updates channel.
147 func (d *Dispatcher) start(c arvados.Container) *runTracker {
148         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
149         tracker.updates <- c
150         go func() {
151                 d.RunContainer(d, c, tracker.updates)
152                 // RunContainer blocks for the lifetime of the container.  When
153                 // it returns, the tracker should delete itself.
154                 d.mtx.Lock()
155                 delete(d.trackers, c.UUID)
156                 d.mtx.Unlock()
157         }()
158         return tracker
159 }
160
161 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
162         params := arvadosclient.Dict{
163                 "filters": filters,
164                 "order":   []string{"priority desc"}}
165
166         var list arvados.ContainerList
167         for offset, more := 0, true; more; offset += len(list.Items) {
168                 params["offset"] = offset
169                 err := d.Arv.List("containers", params, &list)
170                 if err != nil {
171                         log.Printf("Error getting list of containers: %q", err)
172                         return false
173                 }
174                 more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
175                 d.checkListForUpdates(list.Items, todo)
176         }
177         return true
178 }
179
180 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
181         d.mtx.Lock()
182         defer d.mtx.Unlock()
183         if d.trackers == nil {
184                 d.trackers = make(map[string]*runTracker)
185         }
186
187         for _, c := range containers {
188                 tracker, alreadyTracking := d.trackers[c.UUID]
189                 delete(todo, c.UUID)
190
191                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
192                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
193                 } else if alreadyTracking {
194                         switch c.State {
195                         case Queued:
196                                 tracker.close()
197                         case Locked, Running:
198                                 if c.SchedulingParameters.MaxRunTime > 0 {
199                                         maxRunTime := time.Duration(c.SchedulingParameters.MaxRunTime) * time.Second
200                                         if time.Since(c.StartedAt) >= maxRunTime {
201                                                 // Time's up, schedule container for cancellation
202                                                 c.Priority = 0
203                                         }
204                                 }
205                                 tracker.update(c)
206                         case Cancelled, Complete:
207                                 tracker.close()
208                         }
209                 } else {
210                         switch c.State {
211                         case Queued:
212                                 if !d.throttle.Check(c.UUID) {
213                                         break
214                                 }
215                                 err := d.lock(c.UUID)
216                                 if err != nil {
217                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
218                                         break
219                                 }
220                                 c.State = Locked
221                                 d.trackers[c.UUID] = d.start(c)
222                         case Locked, Running:
223                                 if !d.throttle.Check(c.UUID) {
224                                         break
225                                 }
226                                 d.trackers[c.UUID] = d.start(c)
227                         case Cancelled, Complete:
228                                 // no-op (we already stopped monitoring)
229                         }
230                 }
231         }
232 }
233
234 // UpdateState makes an API call to change the state of a container.
235 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
236         err := d.Arv.Update("containers", uuid,
237                 arvadosclient.Dict{
238                         "container": arvadosclient.Dict{"state": state},
239                 }, nil)
240         if err != nil {
241                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
242         }
243         return err
244 }
245
246 // Lock makes the lock API call which updates the state of a container to Locked.
247 func (d *Dispatcher) lock(uuid string) error {
248         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
249 }
250
251 // Unlock makes the unlock API call which updates the state of a container to Queued.
252 func (d *Dispatcher) Unlock(uuid string) error {
253         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
254 }
255
256 // TrackContainer ensures a tracker is running for the given UUID,
257 // regardless of the current state of the container (except: if the
258 // container is locked by a different dispatcher, a tracker will not
259 // be started). If the container is not in Locked or Running state,
260 // the new tracker will close down immediately.
261 //
262 // This allows the dispatcher to put its own RunContainer func into a
263 // cleanup phase (for example, to kill local processes created by a
264 // prevous dispatch process that are still running even though the
265 // container state is final) without the risk of having multiple
266 // goroutines monitoring the same UUID.
267 func (d *Dispatcher) TrackContainer(uuid string) error {
268         var cntr arvados.Container
269         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
270         if err != nil {
271                 return err
272         }
273         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
274                 return nil
275         }
276
277         d.mtx.Lock()
278         defer d.mtx.Unlock()
279         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
280                 return nil
281         }
282         if d.trackers == nil {
283                 d.trackers = make(map[string]*runTracker)
284         }
285         d.trackers[uuid] = d.start(cntr)
286         switch cntr.State {
287         case Queued, Cancelled, Complete:
288                 d.trackers[uuid].close()
289         }
290         return nil
291 }
292
293 type runTracker struct {
294         closing bool
295         updates chan arvados.Container
296 }
297
298 func (tracker *runTracker) close() {
299         if !tracker.closing {
300                 close(tracker.updates)
301         }
302         tracker.closing = true
303 }
304
305 func (tracker *runTracker) update(c arvados.Container) {
306         if tracker.closing {
307                 return
308         }
309         select {
310         case <-tracker.updates:
311                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
312         default:
313         }
314         tracker.updates <- c
315 }