21700: Install Bundler system-wide in Rails postinst
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 // Stop fetching queued containers after this many of the highest
19 // priority non-supervisor containers. Reduces API load when queue is
20 // long. This also limits how quickly a large batch of queued
21 // containers can be started, which improves reliability under high
22 // load at the cost of increased under light load.
23 const queuedContainersTarget = 100
24
25 type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
26
27 // An APIClient performs Arvados API requests. It is typically an
28 // *arvados.Client.
29 type APIClient interface {
30         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
31 }
32
33 // A QueueEnt is an entry in the queue, consisting of a container
34 // record and the instance type that should be used to run it.
35 type QueueEnt struct {
36         // The container to run. Only the UUID, State, Priority,
37         // RuntimeConstraints, ContainerImage, SchedulingParameters,
38         // and CreatedAt fields are populated.
39         Container     arvados.Container      `json:"container"`
40         InstanceTypes []arvados.InstanceType `json:"instance_types"`
41         FirstSeenAt   time.Time              `json:"first_seen_at"`
42 }
43
44 // String implements fmt.Stringer by returning the queued container's
45 // UUID.
46 func (c *QueueEnt) String() string {
47         return c.Container.UUID
48 }
49
50 // A Queue is an interface to an Arvados cluster's container
51 // database. It presents only the containers that are eligible to be
52 // run by, are already being run by, or have recently been run by the
53 // present dispatcher.
54 //
55 // The Entries, Get, and Forget methods do not block: they return
56 // immediately, using cached data.
57 //
58 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
59 // return only after the operation has completed.
60 //
61 // A Queue's Update method should be called periodically to keep the
62 // cache up to date.
63 type Queue struct {
64         logger     logrus.FieldLogger
65         chooseType typeChooser
66         client     APIClient
67
68         auth    *arvados.APIClientAuthorization
69         current map[string]QueueEnt
70         updated time.Time
71         mtx     sync.Mutex
72
73         // Methods that modify the Queue (like Lock) add the affected
74         // container UUIDs to dontupdate. When applying a batch of
75         // updates received from the network, anything appearing in
76         // dontupdate is skipped, in case the received update has
77         // already been superseded by the locally initiated change.
78         // When no network update is in progress, this protection is
79         // not needed, and dontupdate is nil.
80         dontupdate map[string]struct{}
81
82         // active notification subscribers (see Subscribe)
83         subscribers map[<-chan struct{}]chan struct{}
84 }
85
86 // NewQueue returns a new Queue. When a new container appears in the
87 // Arvados cluster's queue during Update, chooseType will be called to
88 // assign an appropriate arvados.InstanceType for the queue entry.
89 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
90         cq := &Queue{
91                 logger:      logger,
92                 chooseType:  chooseType,
93                 client:      client,
94                 current:     map[string]QueueEnt{},
95                 subscribers: map[<-chan struct{}]chan struct{}{},
96         }
97         if reg != nil {
98                 go cq.runMetrics(reg)
99         }
100         return cq
101 }
102
103 // Subscribe returns a channel that becomes ready to receive when an
104 // entry in the Queue is updated.
105 //
106 //      ch := q.Subscribe()
107 //      defer q.Unsubscribe(ch)
108 //      for range ch {
109 //              // ...
110 //      }
111 func (cq *Queue) Subscribe() <-chan struct{} {
112         cq.mtx.Lock()
113         defer cq.mtx.Unlock()
114         ch := make(chan struct{}, 1)
115         cq.subscribers[ch] = ch
116         return ch
117 }
118
119 // Unsubscribe stops sending updates to the given channel. See
120 // Subscribe.
121 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
122         cq.mtx.Lock()
123         defer cq.mtx.Unlock()
124         delete(cq.subscribers, ch)
125 }
126
127 // Caller must have lock.
128 func (cq *Queue) notify() {
129         for _, ch := range cq.subscribers {
130                 select {
131                 case ch <- struct{}{}:
132                 default:
133                 }
134         }
135 }
136
137 // Forget drops the specified container from the cache. It should be
138 // called on finalized containers to avoid leaking memory over
139 // time. It is a no-op if the indicated container is not in a
140 // finalized state.
141 func (cq *Queue) Forget(uuid string) {
142         cq.mtx.Lock()
143         defer cq.mtx.Unlock()
144         ctr := cq.current[uuid].Container
145         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
146                 cq.delEnt(uuid, ctr.State)
147         }
148 }
149
150 // Get returns the (partial) Container record for the specified
151 // container. Like a map lookup, its second return value is false if
152 // the specified container is not in the Queue.
153 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
154         cq.mtx.Lock()
155         defer cq.mtx.Unlock()
156         ctr, ok := cq.current[uuid]
157         if !ok {
158                 return arvados.Container{}, false
159         }
160         return ctr.Container, true
161 }
162
163 // Entries returns all cache entries, keyed by container UUID.
164 //
165 // The returned threshold indicates the maximum age of any cached data
166 // returned in the map. This makes it possible for a scheduler to
167 // determine correctly the outcome of a remote process that updates
168 // container state. It must first wait for the remote process to exit,
169 // then wait for the Queue to start and finish its next Update --
170 // i.e., it must wait until threshold > timeProcessExited.
171 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
172         cq.mtx.Lock()
173         defer cq.mtx.Unlock()
174         entries = make(map[string]QueueEnt, len(cq.current))
175         for uuid, ctr := range cq.current {
176                 entries[uuid] = ctr
177         }
178         threshold = cq.updated
179         return
180 }
181
182 // Update refreshes the cache from the Arvados API. It adds newly
183 // queued containers, and updates the state of previously queued
184 // containers.
185 func (cq *Queue) Update() error {
186         cq.mtx.Lock()
187         cq.dontupdate = map[string]struct{}{}
188         updateStarted := time.Now()
189         cq.mtx.Unlock()
190
191         next, err := cq.poll()
192         if err != nil {
193                 return err
194         }
195
196         cq.mtx.Lock()
197         defer cq.mtx.Unlock()
198         for uuid, ctr := range next {
199                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
200                         // Don't clobber a local update that happened
201                         // after we started polling.
202                         continue
203                 }
204                 if cur, ok := cq.current[uuid]; !ok {
205                         cq.addEnt(uuid, *ctr)
206                 } else {
207                         cur.Container = *ctr
208                         cq.current[uuid] = cur
209                 }
210         }
211         for uuid, ent := range cq.current {
212                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
213                         // Don't expunge an entry that was
214                         // added/updated locally after we started
215                         // polling.
216                         continue
217                 } else if _, stillpresent := next[uuid]; !stillpresent {
218                         // Expunge an entry that no longer appears in
219                         // the poll response (evidently it's
220                         // cancelled, completed, deleted, or taken by
221                         // a different dispatcher).
222                         cq.delEnt(uuid, ent.Container.State)
223                 }
224         }
225         cq.dontupdate = nil
226         cq.updated = updateStarted
227         cq.notify()
228         return nil
229 }
230
231 // Caller must have lock.
232 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
233         cq.logger.WithFields(logrus.Fields{
234                 "ContainerUUID": uuid,
235                 "State":         state,
236         }).Info("dropping container from queue")
237         delete(cq.current, uuid)
238 }
239
240 // Caller must have lock.
241 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
242         logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
243         // We didn't ask for the Mounts field when polling
244         // controller/RailsAPI, because it can be expensive on the
245         // Rails side, and most of the time we already have it.  But
246         // this is the first time we're seeing this container, so we
247         // need to fetch mounts in order to choose an instance type.
248         err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
249                 Select: []string{"mounts"},
250         })
251         if err != nil {
252                 logger.WithError(err).Warn("error getting mounts")
253                 return
254         }
255         types, err := cq.chooseType(&ctr)
256
257         // Avoid wasting memory on a large Mounts attr (we don't need
258         // it after choosing type).
259         ctr.Mounts = nil
260
261         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
262                 // We assume here that any chooseType error is a hard
263                 // error: it wouldn't help to try again, or to leave
264                 // it for a different dispatcher process to attempt.
265                 errorString := err.Error()
266                 logger.WithError(err).Warn("cancel container with no suitable instance type")
267                 go func() {
268                         if ctr.State == arvados.ContainerStateQueued {
269                                 // Can't set runtime error without
270                                 // locking first.
271                                 err := cq.Lock(ctr.UUID)
272                                 if err != nil {
273                                         logger.WithError(err).Warn("lock failed")
274                                         return
275                                         // ...and try again on the
276                                         // next Update, if the problem
277                                         // still exists.
278                                 }
279                         }
280                         var err error
281                         defer func() {
282                                 if err == nil {
283                                         return
284                                 }
285                                 // On failure, check current container
286                                 // state, and don't log the error if
287                                 // the failure came from losing a
288                                 // race.
289                                 var latest arvados.Container
290                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
291                                 if latest.State == arvados.ContainerStateCancelled {
292                                         return
293                                 }
294                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
295                         }()
296                         err = cq.setRuntimeError(ctr.UUID, errorString)
297                         if err != nil {
298                                 return
299                         }
300                         err = cq.Cancel(ctr.UUID)
301                         if err != nil {
302                                 return
303                         }
304                 }()
305                 return
306         }
307         typeNames := ""
308         for _, it := range types {
309                 if typeNames != "" {
310                         typeNames += ", "
311                 }
312                 typeNames += it.Name
313         }
314         cq.logger.WithFields(logrus.Fields{
315                 "ContainerUUID": ctr.UUID,
316                 "State":         ctr.State,
317                 "Priority":      ctr.Priority,
318                 "InstanceTypes": typeNames,
319         }).Info("adding container to queue")
320         cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
321 }
322
323 // Lock acquires the dispatch lock for the given container.
324 func (cq *Queue) Lock(uuid string) error {
325         return cq.apiUpdate(uuid, "lock")
326 }
327
328 // Unlock releases the dispatch lock for the given container.
329 func (cq *Queue) Unlock(uuid string) error {
330         return cq.apiUpdate(uuid, "unlock")
331 }
332
333 // setRuntimeError sets runtime_status["error"] to the given value.
334 // Container should already have state==Locked or Running.
335 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
336         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
337                 "container": {
338                         "runtime_status": {
339                                 "error": errorString,
340                         },
341                 },
342         })
343 }
344
345 // Cancel cancels the given container.
346 func (cq *Queue) Cancel(uuid string) error {
347         var resp arvados.Container
348         err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
349                 "container": {"state": arvados.ContainerStateCancelled},
350         })
351         if err != nil {
352                 return err
353         }
354         cq.updateWithResp(uuid, resp)
355         return nil
356 }
357
358 func (cq *Queue) apiUpdate(uuid, action string) error {
359         var resp arvados.Container
360         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
361         if err != nil {
362                 return err
363         }
364         cq.updateWithResp(uuid, resp)
365         return nil
366 }
367
368 // Update the local queue with the response received from a
369 // state-changing API request (lock/unlock/cancel).
370 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
371         cq.mtx.Lock()
372         defer cq.mtx.Unlock()
373         if cq.dontupdate != nil {
374                 cq.dontupdate[uuid] = struct{}{}
375         }
376         ent, ok := cq.current[uuid]
377         if !ok {
378                 // Container is not in queue (e.g., it was not added
379                 // because there is no suitable instance type, and
380                 // we're just locking/updating it in order to set an
381                 // error message). No need to add it, and we don't
382                 // necessarily have enough information to add it here
383                 // anyway because lock/unlock responses don't include
384                 // runtime_constraints.
385                 return
386         }
387         ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
388         cq.current[uuid] = ent
389         cq.notify()
390 }
391
392 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
393         cq.mtx.Lock()
394         size := len(cq.current)
395         auth := cq.auth
396         cq.mtx.Unlock()
397
398         if auth == nil {
399                 auth = &arvados.APIClientAuthorization{}
400                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
401                 if err != nil {
402                         return nil, err
403                 }
404                 cq.mtx.Lock()
405                 cq.auth = auth
406                 cq.mtx.Unlock()
407         }
408
409         next := make(map[string]*arvados.Container, size)
410         apply := func(updates []arvados.Container) {
411                 for _, upd := range updates {
412                         if next[upd.UUID] == nil {
413                                 next[upd.UUID] = &arvados.Container{}
414                         }
415                         *next[upd.UUID] = upd
416                 }
417         }
418         selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
419         limitParam := 1000
420
421         mine, err := cq.fetchAll(arvados.ResourceListParams{
422                 Select:  selectParam,
423                 Order:   "uuid",
424                 Limit:   &limitParam,
425                 Count:   "none",
426                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
427         }, 0)
428         if err != nil {
429                 return nil, err
430         }
431         apply(mine)
432
433         avail, err := cq.fetchAll(arvados.ResourceListParams{
434                 Select:  selectParam,
435                 Order:   "priority desc",
436                 Limit:   &limitParam,
437                 Count:   "none",
438                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
439         }, queuedContainersTarget)
440         if err != nil {
441                 return nil, err
442         }
443         apply(avail)
444
445         // Check for containers that we already know about but weren't
446         // returned by any of the above queries, and fetch them
447         // explicitly by UUID. If they're in a final state we can drop
448         // them, but otherwise we need to apply updates, e.g.,
449         //
450         // - Queued container priority has been reduced
451         // - Locked container has been requeued with lower priority
452         missing := map[string]bool{}
453         cq.mtx.Lock()
454         for uuid, ent := range cq.current {
455                 if next[uuid] == nil &&
456                         ent.Container.State != arvados.ContainerStateCancelled &&
457                         ent.Container.State != arvados.ContainerStateComplete {
458                         missing[uuid] = true
459                 }
460         }
461         cq.mtx.Unlock()
462
463         for len(missing) > 0 {
464                 var batch []string
465                 for uuid := range missing {
466                         batch = append(batch, uuid)
467                         if len(batch) == 20 {
468                                 break
469                         }
470                 }
471                 filters := []arvados.Filter{{"uuid", "in", batch}}
472                 ended, err := cq.fetchAll(arvados.ResourceListParams{
473                         Select:  selectParam,
474                         Order:   "uuid",
475                         Count:   "none",
476                         Filters: filters,
477                 }, 0)
478                 if err != nil {
479                         return nil, err
480                 }
481                 apply(ended)
482                 if len(ended) == 0 {
483                         // This is the only case where we can conclude
484                         // a container has been deleted from the
485                         // database. A short (but non-zero) page, on
486                         // the other hand, can be caused by a response
487                         // size limit.
488                         for _, uuid := range batch {
489                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
490                                 delete(missing, uuid)
491                                 cq.mtx.Lock()
492                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
493                                 cq.mtx.Unlock()
494                         }
495                         continue
496                 }
497                 for _, ctr := range ended {
498                         if _, ok := missing[ctr.UUID]; !ok {
499                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
500                                 cq.logger.WithFields(logrus.Fields{
501                                         "ContainerUUID": ctr.UUID,
502                                         "Filters":       filters,
503                                 }).Error(msg)
504                                 return nil, errors.New(msg)
505                         }
506                         delete(missing, ctr.UUID)
507                 }
508         }
509         return next, nil
510 }
511
512 // Fetch all pages of containers.
513 //
514 // Except: if maxNonSuper>0, stop fetching more pages after receving
515 // that many non-supervisor containers. Along with {Order: "priority
516 // desc"}, this enables fetching enough high priority scheduling-ready
517 // containers to make progress, without necessarily fetching the
518 // entire queue.
519 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
520         var results []arvados.Container
521         params := initialParams
522         params.Offset = 0
523         nonSuper := 0
524         for {
525                 // This list variable must be a new one declared
526                 // inside the loop: otherwise, items in the API
527                 // response would get deep-merged into the items
528                 // loaded in previous iterations.
529                 var list arvados.ContainerList
530
531                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
532                 if err != nil {
533                         return nil, err
534                 }
535                 if len(list.Items) == 0 {
536                         break
537                 }
538
539                 // Conserve memory by deleting mounts that aren't
540                 // relevant to choosing the instance type.
541                 for _, c := range list.Items {
542                         for path, mnt := range c.Mounts {
543                                 if mnt.Kind != "tmp" {
544                                         delete(c.Mounts, path)
545                                 }
546                         }
547                         if !c.SchedulingParameters.Supervisor {
548                                 nonSuper++
549                         }
550                 }
551
552                 results = append(results, list.Items...)
553                 if maxNonSuper > 0 && nonSuper >= maxNonSuper {
554                         break
555                 } else if params.Order == "uuid" {
556                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
557                 } else {
558                         params.Offset += len(list.Items)
559                 }
560         }
561         return results, nil
562 }
563
564 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
565         mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
566                 Namespace: "arvados",
567                 Subsystem: "dispatchcloud",
568                 Name:      "queue_entries",
569                 Help:      "Number of active container entries in the controller database.",
570         }, []string{"state", "instance_type"})
571         reg.MustRegister(mEntries)
572
573         type entKey struct {
574                 state arvados.ContainerState
575                 inst  string
576         }
577         count := map[entKey]int{}
578
579         ch := cq.Subscribe()
580         defer cq.Unsubscribe(ch)
581         for range ch {
582                 for k := range count {
583                         count[k] = 0
584                 }
585                 ents, _ := cq.Entries()
586                 for _, ent := range ents {
587                         count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
588                 }
589                 for k, v := range count {
590                         mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
591                 }
592         }
593 }