17574: Merge branch 'main'
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "bytes"
11         "context"
12         "fmt"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/dispatchcloud"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "github.com/sirupsen/logrus"
20 )
21
22 const (
23         Queued    = arvados.ContainerStateQueued
24         Locked    = arvados.ContainerStateLocked
25         Running   = arvados.ContainerStateRunning
26         Complete  = arvados.ContainerStateComplete
27         Cancelled = arvados.ContainerStateCancelled
28 )
29
30 type Logger interface {
31         Printf(string, ...interface{})
32         Warnf(string, ...interface{})
33         Debugf(string, ...interface{})
34 }
35
36 // Dispatcher struct
37 type Dispatcher struct {
38         Arv *arvadosclient.ArvadosClient
39
40         Logger Logger
41
42         // Batch size for container queries
43         BatchSize int
44
45         // Queue polling frequency
46         PollPeriod time.Duration
47
48         // Time to wait between successive attempts to run the same container
49         MinRetryPeriod time.Duration
50
51         // Func that implements the container lifecycle. Must be set
52         // to a non-nil DispatchFunc before calling Run().
53         RunContainer DispatchFunc
54
55         auth     arvados.APIClientAuthorization
56         mtx      sync.Mutex
57         trackers map[string]*runTracker
58         throttle throttle
59 }
60
61 // A DispatchFunc executes a container (if the container record is
62 // Locked) or resume monitoring an already-running container, and wait
63 // until that container exits.
64 //
65 // While the container runs, the DispatchFunc should listen for
66 // updated container records on the provided channel. When the channel
67 // closes, the DispatchFunc should stop the container if it's still
68 // running, and return.
69 //
70 // The DispatchFunc should not return until the container is finished.
71 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
72
73 // Run watches the API server's queue for containers that are either
74 // ready to run and available to lock, or are already locked by this
75 // dispatcher's token. When a new one appears, Run calls RunContainer
76 // in a new goroutine.
77 func (d *Dispatcher) Run(ctx context.Context) error {
78         if d.Logger == nil {
79                 d.Logger = logrus.StandardLogger()
80         }
81
82         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
83         if err != nil {
84                 return fmt.Errorf("error getting my token UUID: %v", err)
85         }
86
87         d.throttle.hold = d.MinRetryPeriod
88
89         poll := time.NewTicker(d.PollPeriod)
90         defer poll.Stop()
91
92         if d.BatchSize == 0 {
93                 d.BatchSize = 100
94         }
95
96         for {
97                 select {
98                 case <-poll.C:
99                         break
100                 case <-ctx.Done():
101                         return ctx.Err()
102                 }
103
104                 todo := make(map[string]*runTracker)
105                 d.mtx.Lock()
106                 // Make a copy of trackers
107                 for uuid, tracker := range d.trackers {
108                         todo[uuid] = tracker
109                 }
110                 d.mtx.Unlock()
111
112                 // Containers I currently own (Locked/Running)
113                 querySuccess := d.checkForUpdates([][]interface{}{
114                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
115
116                 // Containers I should try to dispatch
117                 querySuccess = d.checkForUpdates([][]interface{}{
118                         {"state", "=", Queued},
119                         {"priority", ">", "0"}}, todo) && querySuccess
120
121                 if !querySuccess {
122                         // There was an error in one of the previous queries,
123                         // we probably didn't get updates for all the
124                         // containers we should have.  Don't check them
125                         // individually because it may be expensive.
126                         continue
127                 }
128
129                 // Containers I know about but didn't fall into the
130                 // above two categories (probably Complete/Cancelled)
131                 var missed []string
132                 for uuid := range todo {
133                         missed = append(missed, uuid)
134                 }
135
136                 for len(missed) > 0 {
137                         var batch []string
138                         if len(missed) > 20 {
139                                 batch = missed[0:20]
140                                 missed = missed[20:]
141                         } else {
142                                 batch = missed
143                                 missed = missed[0:0]
144                         }
145                         querySuccess = d.checkForUpdates([][]interface{}{
146                                 {"uuid", "in", batch}}, todo) && querySuccess
147                 }
148
149                 if !querySuccess {
150                         // There was an error in one of the previous queries, we probably
151                         // didn't see all the containers we should have, so don't shut down
152                         // the missed containers.
153                         continue
154                 }
155
156                 // Containers that I know about that didn't show up in any
157                 // query should be let go.
158                 for uuid, tracker := range todo {
159                         d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
160                         tracker.close()
161                 }
162
163         }
164 }
165
166 // Start a runner in a new goroutine, and send the initial container
167 // record to its updates channel.
168 func (d *Dispatcher) start(c arvados.Container) *runTracker {
169         tracker := &runTracker{
170                 updates: make(chan arvados.Container, 1),
171                 logger:  d.Logger,
172         }
173         tracker.updates <- c
174         go func() {
175                 err := d.RunContainer(d, c, tracker.updates)
176                 if err != nil {
177                         text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
178                         if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
179                                 var logBuf bytes.Buffer
180                                 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
181                                 if len(err.AvailableTypes) == 0 {
182                                         fmt.Fprint(&logBuf, "No instance types are configured.\n")
183                                 } else {
184                                         fmt.Fprint(&logBuf, "Available instance types:\n")
185                                         for _, t := range err.AvailableTypes {
186                                                 fmt.Fprintf(&logBuf,
187                                                         "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
188                                                         t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
189                                         }
190                                 }
191                                 text = logBuf.String()
192                                 d.UpdateState(c.UUID, Cancelled)
193                         }
194                         d.Logger.Printf("%s", text)
195                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
196                                 "object_uuid": c.UUID,
197                                 "event_type":  "dispatch",
198                                 "properties":  map[string]string{"text": text}}}
199                         d.Arv.Create("logs", lr, nil)
200                         d.Unlock(c.UUID)
201                 }
202
203                 d.mtx.Lock()
204                 delete(d.trackers, c.UUID)
205                 d.mtx.Unlock()
206         }()
207         return tracker
208 }
209
210 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
211         var countList arvados.ContainerList
212         params := arvadosclient.Dict{
213                 "filters": filters,
214                 "count":   "exact",
215                 "limit":   0,
216                 "order":   []string{"priority desc"}}
217         err := d.Arv.List("containers", params, &countList)
218         if err != nil {
219                 d.Logger.Warnf("error getting count of containers: %q", err)
220                 return false
221         }
222         itemsAvailable := countList.ItemsAvailable
223         params = arvadosclient.Dict{
224                 "filters": filters,
225                 "count":   "none",
226                 "limit":   d.BatchSize,
227                 "order":   []string{"priority desc"}}
228         offset := 0
229         for {
230                 params["offset"] = offset
231
232                 // This list variable must be a new one declared
233                 // inside the loop: otherwise, items in the API
234                 // response would get deep-merged into the items
235                 // loaded in previous iterations.
236                 var list arvados.ContainerList
237
238                 err := d.Arv.List("containers", params, &list)
239                 if err != nil {
240                         d.Logger.Warnf("error getting list of containers: %q", err)
241                         return false
242                 }
243                 d.checkListForUpdates(list.Items, todo)
244                 offset += len(list.Items)
245                 if len(list.Items) == 0 || itemsAvailable <= offset {
246                         return true
247                 }
248         }
249 }
250
251 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
252         d.mtx.Lock()
253         defer d.mtx.Unlock()
254         if d.trackers == nil {
255                 d.trackers = make(map[string]*runTracker)
256         }
257
258         for _, c := range containers {
259                 tracker, alreadyTracking := d.trackers[c.UUID]
260                 delete(todo, c.UUID)
261
262                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
263                         d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
264                 } else if alreadyTracking {
265                         switch c.State {
266                         case Queued:
267                                 tracker.close()
268                         case Locked, Running:
269                                 tracker.update(c)
270                         case Cancelled, Complete:
271                                 tracker.close()
272                         }
273                 } else {
274                         switch c.State {
275                         case Queued:
276                                 if !d.throttle.Check(c.UUID) {
277                                         break
278                                 }
279                                 err := d.lock(c.UUID)
280                                 if err != nil {
281                                         d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
282                                         break
283                                 }
284                                 c.State = Locked
285                                 d.trackers[c.UUID] = d.start(c)
286                         case Locked, Running:
287                                 if !d.throttle.Check(c.UUID) {
288                                         break
289                                 }
290                                 d.trackers[c.UUID] = d.start(c)
291                         case Cancelled, Complete:
292                                 // no-op (we already stopped monitoring)
293                         }
294                 }
295         }
296 }
297
298 // UpdateState makes an API call to change the state of a container.
299 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
300         err := d.Arv.Update("containers", uuid,
301                 arvadosclient.Dict{
302                         "container": arvadosclient.Dict{"state": state},
303                 }, nil)
304         if err != nil {
305                 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
306         }
307         return err
308 }
309
310 // Lock makes the lock API call which updates the state of a container to Locked.
311 func (d *Dispatcher) lock(uuid string) error {
312         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
313 }
314
315 // Unlock makes the unlock API call which updates the state of a container to Queued.
316 func (d *Dispatcher) Unlock(uuid string) error {
317         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
318 }
319
320 // TrackContainer ensures a tracker is running for the given UUID,
321 // regardless of the current state of the container (except: if the
322 // container is locked by a different dispatcher, a tracker will not
323 // be started). If the container is not in Locked or Running state,
324 // the new tracker will close down immediately.
325 //
326 // This allows the dispatcher to put its own RunContainer func into a
327 // cleanup phase (for example, to kill local processes created by a
328 // prevous dispatch process that are still running even though the
329 // container state is final) without the risk of having multiple
330 // goroutines monitoring the same UUID.
331 func (d *Dispatcher) TrackContainer(uuid string) error {
332         var cntr arvados.Container
333         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
334         if err != nil {
335                 return err
336         }
337         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
338                 return nil
339         }
340
341         d.mtx.Lock()
342         defer d.mtx.Unlock()
343         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
344                 return nil
345         }
346         if d.trackers == nil {
347                 d.trackers = make(map[string]*runTracker)
348         }
349         d.trackers[uuid] = d.start(cntr)
350         switch cntr.State {
351         case Queued, Cancelled, Complete:
352                 d.trackers[uuid].close()
353         }
354         return nil
355 }
356
357 type runTracker struct {
358         closing bool
359         updates chan arvados.Container
360         logger  Logger
361 }
362
363 func (tracker *runTracker) close() {
364         if !tracker.closing {
365                 close(tracker.updates)
366         }
367         tracker.closing = true
368 }
369
370 func (tracker *runTracker) update(c arvados.Container) {
371         if tracker.closing {
372                 return
373         }
374         select {
375         case <-tracker.updates:
376                 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
377         default:
378         }
379         tracker.updates <- c
380 }