18947: Remove errant uses of runsu.sh.
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "bytes"
11         "context"
12         "fmt"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/dispatchcloud"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "github.com/sirupsen/logrus"
20 )
21
22 const (
23         Queued    = arvados.ContainerStateQueued
24         Locked    = arvados.ContainerStateLocked
25         Running   = arvados.ContainerStateRunning
26         Complete  = arvados.ContainerStateComplete
27         Cancelled = arvados.ContainerStateCancelled
28 )
29
30 type Logger interface {
31         Printf(string, ...interface{})
32         Warnf(string, ...interface{})
33         Debugf(string, ...interface{})
34 }
35
36 // Dispatcher struct
37 type Dispatcher struct {
38         Arv *arvadosclient.ArvadosClient
39
40         Logger Logger
41
42         // Batch size for container queries
43         BatchSize int
44
45         // Queue polling frequency
46         PollPeriod time.Duration
47
48         // Time to wait between successive attempts to run the same container
49         MinRetryPeriod time.Duration
50
51         // Func that implements the container lifecycle. Must be set
52         // to a non-nil DispatchFunc before calling Run().
53         RunContainer DispatchFunc
54
55         auth     arvados.APIClientAuthorization
56         mtx      sync.Mutex
57         trackers map[string]*runTracker
58         throttle throttle
59 }
60
61 // A DispatchFunc executes a container (if the container record is
62 // Locked) or resume monitoring an already-running container, and wait
63 // until that container exits.
64 //
65 // While the container runs, the DispatchFunc should listen for
66 // updated container records on the provided channel. When the channel
67 // closes, the DispatchFunc should stop the container if it's still
68 // running, and return.
69 //
70 // The DispatchFunc should not return until the container is finished.
71 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
72
73 // Run watches the API server's queue for containers that are either
74 // ready to run and available to lock, or are already locked by this
75 // dispatcher's token. When a new one appears, Run calls RunContainer
76 // in a new goroutine.
77 func (d *Dispatcher) Run(ctx context.Context) error {
78         if d.Logger == nil {
79                 d.Logger = logrus.StandardLogger()
80         }
81
82         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
83         if err != nil {
84                 return fmt.Errorf("error getting my token UUID: %v", err)
85         }
86
87         d.throttle.hold = d.MinRetryPeriod
88
89         poll := time.NewTicker(d.PollPeriod)
90         defer poll.Stop()
91
92         if d.BatchSize == 0 {
93                 d.BatchSize = 100
94         }
95
96         for {
97                 select {
98                 case <-poll.C:
99                         break
100                 case <-ctx.Done():
101                         return ctx.Err()
102                 }
103
104                 todo := make(map[string]*runTracker)
105                 d.mtx.Lock()
106                 // Make a copy of trackers
107                 for uuid, tracker := range d.trackers {
108                         todo[uuid] = tracker
109                 }
110                 d.mtx.Unlock()
111
112                 // Containers I currently own (Locked/Running)
113                 querySuccess := d.checkForUpdates([][]interface{}{
114                         {"locked_by_uuid", "=", d.auth.UUID}}, todo)
115
116                 // Containers I should try to dispatch
117                 querySuccess = d.checkForUpdates([][]interface{}{
118                         {"state", "=", Queued},
119                         {"priority", ">", "0"}}, todo) && querySuccess
120
121                 if !querySuccess {
122                         // There was an error in one of the previous queries,
123                         // we probably didn't get updates for all the
124                         // containers we should have.  Don't check them
125                         // individually because it may be expensive.
126                         continue
127                 }
128
129                 // Containers I know about but didn't fall into the
130                 // above two categories (probably Complete/Cancelled)
131                 var missed []string
132                 for uuid := range todo {
133                         missed = append(missed, uuid)
134                 }
135
136                 for len(missed) > 0 {
137                         var batch []string
138                         if len(missed) > 20 {
139                                 batch = missed[0:20]
140                                 missed = missed[20:]
141                         } else {
142                                 batch = missed
143                                 missed = missed[0:0]
144                         }
145                         querySuccess = d.checkForUpdates([][]interface{}{
146                                 {"uuid", "in", batch}}, todo) && querySuccess
147                 }
148
149                 if !querySuccess {
150                         // There was an error in one of the previous queries, we probably
151                         // didn't see all the containers we should have, so don't shut down
152                         // the missed containers.
153                         continue
154                 }
155
156                 // Containers that I know about that didn't show up in any
157                 // query should be let go.
158                 for uuid, tracker := range todo {
159                         d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
160                         tracker.close()
161                 }
162
163         }
164 }
165
166 // Start a runner in a new goroutine, and send the initial container
167 // record to its updates channel.
168 func (d *Dispatcher) start(c arvados.Container) *runTracker {
169         tracker := &runTracker{
170                 updates: make(chan arvados.Container, 1),
171                 logger:  d.Logger,
172         }
173         tracker.updates <- c
174         go func() {
175                 fallbackState := Queued
176                 err := d.RunContainer(d, c, tracker.updates)
177                 if err != nil {
178                         text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
179                         if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
180                                 fallbackState = Cancelled
181                                 var logBuf bytes.Buffer
182                                 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
183                                 if len(err.AvailableTypes) == 0 {
184                                         fmt.Fprint(&logBuf, "No instance types are configured.\n")
185                                 } else {
186                                         fmt.Fprint(&logBuf, "Available instance types:\n")
187                                         for _, t := range err.AvailableTypes {
188                                                 fmt.Fprintf(&logBuf,
189                                                         "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
190                                                         t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
191                                         }
192                                 }
193                                 text = logBuf.String()
194                         }
195                         d.Logger.Printf("%s", text)
196                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
197                                 "object_uuid": c.UUID,
198                                 "event_type":  "dispatch",
199                                 "properties":  map[string]string{"text": text}}}
200                         d.Arv.Create("logs", lr, nil)
201                 }
202                 // If checkListForUpdates() doesn't close the tracker
203                 // after 2 queue updates, try to move the container to
204                 // the fallback state, which should eventually work
205                 // and cause the tracker to close.
206                 updates := 0
207                 for upd := range tracker.updates {
208                         updates++
209                         if upd.State == Locked || upd.State == Running {
210                                 // Tracker didn't clean up before
211                                 // returning -- or this is the first
212                                 // update and it contains stale
213                                 // information from before
214                                 // RunContainer() returned.
215                                 if updates < 2 {
216                                         // Avoid generating confusing
217                                         // logs / API calls in the
218                                         // stale-info case.
219                                         continue
220                                 }
221                                 d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
222                                 d.UpdateState(c.UUID, fallbackState)
223                         }
224                 }
225         }()
226         return tracker
227 }
228
229 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
230         var countList arvados.ContainerList
231         params := arvadosclient.Dict{
232                 "filters": filters,
233                 "count":   "exact",
234                 "limit":   0,
235                 "order":   []string{"priority desc"}}
236         err := d.Arv.List("containers", params, &countList)
237         if err != nil {
238                 d.Logger.Warnf("error getting count of containers: %q", err)
239                 return false
240         }
241         itemsAvailable := countList.ItemsAvailable
242         params = arvadosclient.Dict{
243                 "filters": filters,
244                 "count":   "none",
245                 "limit":   d.BatchSize,
246                 "order":   []string{"priority desc"}}
247         offset := 0
248         for {
249                 params["offset"] = offset
250
251                 // This list variable must be a new one declared
252                 // inside the loop: otherwise, items in the API
253                 // response would get deep-merged into the items
254                 // loaded in previous iterations.
255                 var list arvados.ContainerList
256
257                 err := d.Arv.List("containers", params, &list)
258                 if err != nil {
259                         d.Logger.Warnf("error getting list of containers: %q", err)
260                         return false
261                 }
262                 d.checkListForUpdates(list.Items, todo)
263                 offset += len(list.Items)
264                 if len(list.Items) == 0 || itemsAvailable <= offset {
265                         return true
266                 }
267         }
268 }
269
270 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
271         d.mtx.Lock()
272         defer d.mtx.Unlock()
273         if d.trackers == nil {
274                 d.trackers = make(map[string]*runTracker)
275         }
276
277         for _, c := range containers {
278                 tracker, alreadyTracking := d.trackers[c.UUID]
279                 delete(todo, c.UUID)
280
281                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
282                         d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
283                 } else if alreadyTracking {
284                         switch c.State {
285                         case Queued, Cancelled, Complete:
286                                 d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
287                                 tracker.close()
288                                 delete(d.trackers, c.UUID)
289                         case Locked, Running:
290                                 d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
291                                 tracker.update(c)
292                         }
293                 } else {
294                         switch c.State {
295                         case Queued:
296                                 if !d.throttle.Check(c.UUID) {
297                                         break
298                                 }
299                                 err := d.lock(c.UUID)
300                                 if err != nil {
301                                         d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
302                                         break
303                                 }
304                                 c.State = Locked
305                                 d.trackers[c.UUID] = d.start(c)
306                         case Locked, Running:
307                                 if !d.throttle.Check(c.UUID) {
308                                         break
309                                 }
310                                 d.trackers[c.UUID] = d.start(c)
311                         case Cancelled, Complete:
312                                 // no-op (we already stopped monitoring)
313                         }
314                 }
315         }
316 }
317
318 // UpdateState makes an API call to change the state of a container.
319 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
320         err := d.Arv.Update("containers", uuid,
321                 arvadosclient.Dict{
322                         "container": arvadosclient.Dict{"state": state},
323                 }, nil)
324         if err != nil {
325                 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
326         }
327         return err
328 }
329
330 // Lock makes the lock API call which updates the state of a container to Locked.
331 func (d *Dispatcher) lock(uuid string) error {
332         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
333 }
334
335 // Unlock makes the unlock API call which updates the state of a container to Queued.
336 func (d *Dispatcher) Unlock(uuid string) error {
337         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
338 }
339
340 // TrackContainer ensures a tracker is running for the given UUID,
341 // regardless of the current state of the container (except: if the
342 // container is locked by a different dispatcher, a tracker will not
343 // be started). If the container is not in Locked or Running state,
344 // the new tracker will close down immediately.
345 //
346 // This allows the dispatcher to put its own RunContainer func into a
347 // cleanup phase (for example, to kill local processes created by a
348 // prevous dispatch process that are still running even though the
349 // container state is final) without the risk of having multiple
350 // goroutines monitoring the same UUID.
351 func (d *Dispatcher) TrackContainer(uuid string) error {
352         var cntr arvados.Container
353         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
354         if err != nil {
355                 return err
356         }
357         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
358                 return nil
359         }
360
361         d.mtx.Lock()
362         defer d.mtx.Unlock()
363         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
364                 return nil
365         }
366         if d.trackers == nil {
367                 d.trackers = make(map[string]*runTracker)
368         }
369         d.trackers[uuid] = d.start(cntr)
370         switch cntr.State {
371         case Queued, Cancelled, Complete:
372                 d.trackers[uuid].close()
373         }
374         return nil
375 }
376
377 type runTracker struct {
378         closing bool
379         updates chan arvados.Container
380         logger  Logger
381 }
382
383 func (tracker *runTracker) close() {
384         if !tracker.closing {
385                 close(tracker.updates)
386         }
387         tracker.closing = true
388 }
389
390 func (tracker *runTracker) update(c arvados.Container) {
391         if tracker.closing {
392                 return
393         }
394         select {
395         case <-tracker.updates:
396                 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
397         default:
398         }
399         tracker.updates <- c
400 }