Merge branch '21941-keep-web-link' refs #21941
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "bytes"
11         "context"
12         "fmt"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/dispatchcloud"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "github.com/sirupsen/logrus"
20 )
21
22 const (
23         Queued    = arvados.ContainerStateQueued
24         Locked    = arvados.ContainerStateLocked
25         Running   = arvados.ContainerStateRunning
26         Complete  = arvados.ContainerStateComplete
27         Cancelled = arvados.ContainerStateCancelled
28 )
29
30 type Logger interface {
31         Printf(string, ...interface{})
32         Warnf(string, ...interface{})
33         Debugf(string, ...interface{})
34 }
35
36 // Dispatcher struct
37 type Dispatcher struct {
38         Arv *arvadosclient.ArvadosClient
39
40         Logger Logger
41
42         // Batch size for container queries
43         BatchSize int
44
45         // Queue polling frequency
46         PollPeriod time.Duration
47
48         // Time to wait between successive attempts to run the same container
49         MinRetryPeriod time.Duration
50
51         // Func that implements the container lifecycle. Must be set
52         // to a non-nil DispatchFunc before calling Run().
53         RunContainer DispatchFunc
54
55         auth     arvados.APIClientAuthorization
56         mtx      sync.Mutex
57         trackers map[string]*runTracker
58         throttle throttle
59 }
60
61 // A DispatchFunc executes a container (if the container record is
62 // Locked) or resume monitoring an already-running container, and wait
63 // until that container exits.
64 //
65 // While the container runs, the DispatchFunc should listen for
66 // updated container records on the provided channel. When the channel
67 // closes, the DispatchFunc should stop the container if it's still
68 // running, and return.
69 //
70 // The DispatchFunc should not return until the container is finished.
71 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
72
73 // Run watches the API server's queue for containers that are either
74 // ready to run and available to lock, or are already locked by this
75 // dispatcher's token. When a new one appears, Run calls RunContainer
76 // in a new goroutine.
77 func (d *Dispatcher) Run(ctx context.Context) error {
78         if d.Logger == nil {
79                 d.Logger = logrus.StandardLogger()
80         }
81
82         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
83         if err != nil {
84                 return fmt.Errorf("error getting my token UUID: %v", err)
85         }
86
87         d.throttle.hold = d.MinRetryPeriod
88
89         poll := time.NewTicker(d.PollPeriod)
90         defer poll.Stop()
91
92         if d.BatchSize == 0 {
93                 d.BatchSize = 100
94         }
95
96         for {
97                 select {
98                 case <-poll.C:
99                         break
100                 case <-ctx.Done():
101                         d.mtx.Lock()
102                         defer d.mtx.Unlock()
103                         for _, tracker := range d.trackers {
104                                 tracker.close()
105                         }
106                         return ctx.Err()
107                 }
108
109                 todo := make(map[string]*runTracker)
110                 d.mtx.Lock()
111                 // Make a copy of trackers
112                 for uuid, tracker := range d.trackers {
113                         todo[uuid] = tracker
114                 }
115                 d.mtx.Unlock()
116
117                 // Containers I currently own (Locked/Running)
118                 querySuccess := d.checkForUpdates([][]interface{}{
119                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
120
121                 // Containers I should try to dispatch
122                 querySuccess = d.checkForUpdates([][]interface{}{
123                         {"state", "=", Queued},
124                         {"priority", ">", "0"}}, todo) && querySuccess
125
126                 if !querySuccess {
127                         // There was an error in one of the previous queries,
128                         // we probably didn't get updates for all the
129                         // containers we should have.  Don't check them
130                         // individually because it may be expensive.
131                         continue
132                 }
133
134                 // Containers I know about but didn't fall into the
135                 // above two categories (probably Complete/Cancelled)
136                 var missed []string
137                 for uuid := range todo {
138                         missed = append(missed, uuid)
139                 }
140
141                 for len(missed) > 0 {
142                         var batch []string
143                         if len(missed) > 20 {
144                                 batch = missed[0:20]
145                                 missed = missed[20:]
146                         } else {
147                                 batch = missed
148                                 missed = missed[0:0]
149                         }
150                         querySuccess = d.checkForUpdates([][]interface{}{
151                                 {"uuid", "in", batch}}, todo) && querySuccess
152                 }
153
154                 if !querySuccess {
155                         // There was an error in one of the previous queries, we probably
156                         // didn't see all the containers we should have, so don't shut down
157                         // the missed containers.
158                         continue
159                 }
160
161                 // Containers that I know about that didn't show up in any
162                 // query should be let go.
163                 for uuid, tracker := range todo {
164                         d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
165                         tracker.close()
166                 }
167
168         }
169 }
170
171 // Start a runner in a new goroutine, and send the initial container
172 // record to its updates channel.
173 func (d *Dispatcher) start(c arvados.Container) *runTracker {
174         tracker := &runTracker{
175                 updates: make(chan arvados.Container, 1),
176                 logger:  d.Logger,
177         }
178         tracker.updates <- c
179         go func() {
180                 fallbackState := Queued
181                 err := d.RunContainer(d, c, tracker.updates)
182                 if err != nil {
183                         text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
184                         if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
185                                 fallbackState = Cancelled
186                                 var logBuf bytes.Buffer
187                                 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
188                                 if len(err.AvailableTypes) == 0 {
189                                         fmt.Fprint(&logBuf, "No instance types are configured.\n")
190                                 } else {
191                                         fmt.Fprint(&logBuf, "Available instance types:\n")
192                                         for _, t := range err.AvailableTypes {
193                                                 fmt.Fprintf(&logBuf,
194                                                         "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
195                                                         t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
196                                         }
197                                 }
198                                 text = logBuf.String()
199                         }
200                         d.Logger.Printf("%s", text)
201                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
202                                 "object_uuid": c.UUID,
203                                 "event_type":  "dispatch",
204                                 "properties":  map[string]string{"text": text}}}
205                         d.Arv.Create("logs", lr, nil)
206                 }
207                 // If checkListForUpdates() doesn't close the tracker
208                 // after 2 queue updates, try to move the container to
209                 // the fallback state, which should eventually work
210                 // and cause the tracker to close.
211                 updates := 0
212                 for upd := range tracker.updates {
213                         updates++
214                         if upd.State == Locked || upd.State == Running {
215                                 // Tracker didn't clean up before
216                                 // returning -- or this is the first
217                                 // update and it contains stale
218                                 // information from before
219                                 // RunContainer() returned.
220                                 if updates < 2 {
221                                         // Avoid generating confusing
222                                         // logs / API calls in the
223                                         // stale-info case.
224                                         continue
225                                 }
226                                 d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
227                                 d.UpdateState(c.UUID, fallbackState)
228                         }
229                 }
230         }()
231         return tracker
232 }
233
234 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
235         var countList arvados.ContainerList
236         params := arvadosclient.Dict{
237                 "filters": filters,
238                 "count":   "exact",
239                 "limit":   0,
240                 "order":   []string{"priority desc"}}
241         err := d.Arv.List("containers", params, &countList)
242         if err != nil {
243                 d.Logger.Warnf("error getting count of containers: %q", err)
244                 return false
245         }
246         itemsAvailable := countList.ItemsAvailable
247         params = arvadosclient.Dict{
248                 "filters": filters,
249                 "count":   "none",
250                 "limit":   d.BatchSize,
251                 "order":   []string{"priority desc"}}
252         offset := 0
253         for {
254                 params["offset"] = offset
255
256                 // This list variable must be a new one declared
257                 // inside the loop: otherwise, items in the API
258                 // response would get deep-merged into the items
259                 // loaded in previous iterations.
260                 var list arvados.ContainerList
261
262                 err := d.Arv.List("containers", params, &list)
263                 if err != nil {
264                         d.Logger.Warnf("error getting list of containers: %q", err)
265                         return false
266                 }
267                 d.checkListForUpdates(list.Items, todo)
268                 offset += len(list.Items)
269                 if len(list.Items) == 0 || itemsAvailable <= offset {
270                         return true
271                 }
272         }
273 }
274
275 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
276         d.mtx.Lock()
277         defer d.mtx.Unlock()
278         if d.trackers == nil {
279                 d.trackers = make(map[string]*runTracker)
280         }
281
282         for _, c := range containers {
283                 tracker, alreadyTracking := d.trackers[c.UUID]
284                 delete(todo, c.UUID)
285
286                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
287                         d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
288                 } else if alreadyTracking {
289                         switch c.State {
290                         case Queued, Cancelled, Complete:
291                                 d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
292                                 tracker.close()
293                                 delete(d.trackers, c.UUID)
294                         case Locked, Running:
295                                 d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
296                                 tracker.update(c)
297                         }
298                 } else {
299                         switch c.State {
300                         case Queued:
301                                 if !d.throttle.Check(c.UUID) {
302                                         break
303                                 }
304                                 err := d.lock(c.UUID)
305                                 if err != nil {
306                                         d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
307                                         break
308                                 }
309                                 c.State = Locked
310                                 d.trackers[c.UUID] = d.start(c)
311                         case Locked, Running:
312                                 if !d.throttle.Check(c.UUID) {
313                                         break
314                                 }
315                                 d.trackers[c.UUID] = d.start(c)
316                         case Cancelled, Complete:
317                                 // no-op (we already stopped monitoring)
318                         }
319                 }
320         }
321 }
322
323 // UpdateState makes an API call to change the state of a container.
324 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
325         err := d.Arv.Update("containers", uuid,
326                 arvadosclient.Dict{
327                         "container": arvadosclient.Dict{"state": state},
328                 }, nil)
329         if err != nil {
330                 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
331         }
332         return err
333 }
334
335 // Lock makes the lock API call which updates the state of a container to Locked.
336 func (d *Dispatcher) lock(uuid string) error {
337         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
338 }
339
340 // Unlock makes the unlock API call which updates the state of a container to Queued.
341 func (d *Dispatcher) Unlock(uuid string) error {
342         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
343 }
344
345 // TrackContainer ensures a tracker is running for the given UUID,
346 // regardless of the current state of the container (except: if the
347 // container is locked by a different dispatcher, a tracker will not
348 // be started). If the container is not in Locked or Running state,
349 // the new tracker will close down immediately.
350 //
351 // This allows the dispatcher to put its own RunContainer func into a
352 // cleanup phase (for example, to kill local processes created by a
353 // prevous dispatch process that are still running even though the
354 // container state is final) without the risk of having multiple
355 // goroutines monitoring the same UUID.
356 func (d *Dispatcher) TrackContainer(uuid string) error {
357         var cntr arvados.Container
358         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
359         if err != nil {
360                 return err
361         }
362         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
363                 return nil
364         }
365
366         d.mtx.Lock()
367         defer d.mtx.Unlock()
368         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
369                 return nil
370         }
371         if d.trackers == nil {
372                 d.trackers = make(map[string]*runTracker)
373         }
374         d.trackers[uuid] = d.start(cntr)
375         switch cntr.State {
376         case Queued, Cancelled, Complete:
377                 d.trackers[uuid].close()
378         }
379         return nil
380 }
381
382 type runTracker struct {
383         closing bool
384         updates chan arvados.Container
385         logger  Logger
386 }
387
388 func (tracker *runTracker) close() {
389         if !tracker.closing {
390                 close(tracker.updates)
391         }
392         tracker.closing = true
393 }
394
395 func (tracker *runTracker) update(c arvados.Container) {
396         if tracker.closing {
397                 return
398         }
399         select {
400         case <-tracker.updates:
401                 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
402         default:
403         }
404         tracker.updates <- c
405 }