]> git.arvados.org - arvados.git/blob - lib/dispatchcloud/container/queue.go
Merge branch '23009-multiselect-bug' into main. Closes #23009
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "strings"
11         "sync"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/prometheus/client_golang/prometheus"
16         "github.com/sirupsen/logrus"
17 )
18
19 // Stop fetching queued containers after this many of the highest
20 // priority non-supervisor containers. Reduces API load when queue is
21 // long. This also limits how quickly a large batch of queued
22 // containers can be started, which improves reliability under high
23 // load at the cost of increased under light load.
24 const queuedContainersTarget = 100
25
26 type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
27
28 // An APIClient performs Arvados API requests. It is typically an
29 // *arvados.Client.
30 type APIClient interface {
31         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
32 }
33
34 // A QueueEnt is an entry in the queue, consisting of a container
35 // record and the instance type that should be used to run it.
36 type QueueEnt struct {
37         // The container to run. Only the UUID, State, Priority,
38         // RuntimeConstraints, ContainerImage, SchedulingParameters,
39         // and CreatedAt fields are populated.
40         Container     arvados.Container      `json:"container"`
41         InstanceTypes []arvados.InstanceType `json:"instance_types"`
42         FirstSeenAt   time.Time              `json:"first_seen_at"`
43 }
44
45 // String implements fmt.Stringer by returning the queued container's
46 // UUID.
47 func (c *QueueEnt) String() string {
48         return c.Container.UUID
49 }
50
51 // A Queue is an interface to an Arvados cluster's container
52 // database. It presents only the containers that are eligible to be
53 // run by, are already being run by, or have recently been run by the
54 // present dispatcher.
55 //
56 // The Entries, Get, and Forget methods do not block: they return
57 // immediately, using cached data.
58 //
59 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
60 // return only after the operation has completed.
61 //
62 // A Queue's Update method should be called periodically to keep the
63 // cache up to date.
64 type Queue struct {
65         logger     logrus.FieldLogger
66         chooseType typeChooser
67         client     APIClient
68
69         auth    *arvados.APIClientAuthorization
70         current map[string]QueueEnt
71         updated time.Time
72         mtx     sync.Mutex
73
74         // Methods that modify the Queue (like Lock) add the affected
75         // container UUIDs to dontupdate. When applying a batch of
76         // updates received from the network, anything appearing in
77         // dontupdate is skipped, in case the received update has
78         // already been superseded by the locally initiated change.
79         // When no network update is in progress, this protection is
80         // not needed, and dontupdate is nil.
81         dontupdate map[string]struct{}
82
83         // active notification subscribers (see Subscribe)
84         subscribers map[<-chan struct{}]chan struct{}
85 }
86
87 // NewQueue returns a new Queue. When a new container appears in the
88 // Arvados cluster's queue during Update, chooseType will be called to
89 // assign an appropriate arvados.InstanceType for the queue entry.
90 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
91         cq := &Queue{
92                 logger:      logger,
93                 chooseType:  chooseType,
94                 client:      client,
95                 current:     map[string]QueueEnt{},
96                 subscribers: map[<-chan struct{}]chan struct{}{},
97         }
98         if reg != nil {
99                 go cq.runMetrics(reg)
100         }
101         return cq
102 }
103
104 // Subscribe returns a channel that becomes ready to receive when an
105 // entry in the Queue is updated.
106 //
107 //      ch := q.Subscribe()
108 //      defer q.Unsubscribe(ch)
109 //      for range ch {
110 //              // ...
111 //      }
112 func (cq *Queue) Subscribe() <-chan struct{} {
113         cq.mtx.Lock()
114         defer cq.mtx.Unlock()
115         ch := make(chan struct{}, 1)
116         cq.subscribers[ch] = ch
117         return ch
118 }
119
120 // Unsubscribe stops sending updates to the given channel. See
121 // Subscribe.
122 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
123         cq.mtx.Lock()
124         defer cq.mtx.Unlock()
125         delete(cq.subscribers, ch)
126 }
127
128 // Caller must have lock.
129 func (cq *Queue) notify() {
130         for _, ch := range cq.subscribers {
131                 select {
132                 case ch <- struct{}{}:
133                 default:
134                 }
135         }
136 }
137
138 // Forget drops the specified container from the cache. It should be
139 // called on finalized containers to avoid leaking memory over
140 // time. It is a no-op if the indicated container is not in a
141 // finalized state.
142 func (cq *Queue) Forget(uuid string) {
143         cq.mtx.Lock()
144         defer cq.mtx.Unlock()
145         ctr := cq.current[uuid].Container
146         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
147                 cq.delEnt(uuid, ctr.State)
148         }
149 }
150
151 // Get returns the (partial) Container record for the specified
152 // container. Like a map lookup, its second return value is false if
153 // the specified container is not in the Queue.
154 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
155         cq.mtx.Lock()
156         defer cq.mtx.Unlock()
157         ctr, ok := cq.current[uuid]
158         if !ok {
159                 return arvados.Container{}, false
160         }
161         return ctr.Container, true
162 }
163
164 // Entries returns all cache entries, keyed by container UUID.
165 //
166 // The returned threshold indicates the maximum age of any cached data
167 // returned in the map. This makes it possible for a scheduler to
168 // determine correctly the outcome of a remote process that updates
169 // container state. It must first wait for the remote process to exit,
170 // then wait for the Queue to start and finish its next Update --
171 // i.e., it must wait until threshold > timeProcessExited.
172 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
173         cq.mtx.Lock()
174         defer cq.mtx.Unlock()
175         entries = make(map[string]QueueEnt, len(cq.current))
176         for uuid, ctr := range cq.current {
177                 entries[uuid] = ctr
178         }
179         threshold = cq.updated
180         return
181 }
182
183 // Update refreshes the cache from the Arvados API. It adds newly
184 // queued containers, and updates the state of previously queued
185 // containers.
186 func (cq *Queue) Update() error {
187         cq.mtx.Lock()
188         cq.dontupdate = map[string]struct{}{}
189         updateStarted := time.Now()
190         cq.mtx.Unlock()
191
192         next, err := cq.poll()
193         if err != nil {
194                 return err
195         }
196
197         cq.mtx.Lock()
198         defer cq.mtx.Unlock()
199         for uuid, ctr := range next {
200                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
201                         // Don't clobber a local update that happened
202                         // after we started polling.
203                         continue
204                 }
205                 if cur, ok := cq.current[uuid]; !ok {
206                         cq.addEnt(uuid, *ctr)
207                 } else {
208                         cur.Container = *ctr
209                         cq.current[uuid] = cur
210                 }
211         }
212         for uuid, ent := range cq.current {
213                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
214                         // Don't expunge an entry that was
215                         // added/updated locally after we started
216                         // polling.
217                         continue
218                 } else if _, stillpresent := next[uuid]; !stillpresent {
219                         // Expunge an entry that no longer appears in
220                         // the poll response (evidently it's
221                         // cancelled, completed, deleted, or taken by
222                         // a different dispatcher).
223                         cq.delEnt(uuid, ent.Container.State)
224                 }
225         }
226         cq.dontupdate = nil
227         cq.updated = updateStarted
228         cq.notify()
229         return nil
230 }
231
232 // Caller must have lock.
233 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
234         cq.logger.WithFields(logrus.Fields{
235                 "ContainerUUID": uuid,
236                 "State":         state,
237         }).Info("dropping container from queue")
238         delete(cq.current, uuid)
239 }
240
241 // Caller must have lock.
242 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
243         logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
244         // We didn't ask for the Mounts field when polling
245         // controller/RailsAPI, because it can be expensive on the
246         // Rails side, and most of the time we already have it.  But
247         // this is the first time we're seeing this container, so we
248         // need to fetch mounts in order to choose an instance type.
249         err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
250                 Select: []string{"mounts"},
251         })
252         if err != nil {
253                 logger.WithError(err).Warn("error getting mounts")
254                 if strings.Contains(err.Error(), "json: cannot unmarshal") {
255                         // see https://dev.arvados.org/issues/21314
256                         go cq.cancelUnsatisfiableContainer(ctr, "error getting mounts from container record: "+err.Error())
257                 }
258                 return
259         }
260         types, err := cq.chooseType(&ctr)
261
262         // Avoid wasting memory on a large Mounts attr (we don't need
263         // it after choosing type).
264         ctr.Mounts = nil
265
266         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
267                 // We assume here that any chooseType error is a hard
268                 // error: it wouldn't help to try again, or to leave
269                 // it for a different dispatcher process to attempt.
270                 logger.WithError(err).Warn("cancel container with no suitable instance type")
271                 go cq.cancelUnsatisfiableContainer(ctr, err.Error())
272                 return
273         }
274         typeNames := ""
275         for _, it := range types {
276                 if typeNames != "" {
277                         typeNames += ", "
278                 }
279                 typeNames += it.Name
280         }
281         cq.logger.WithFields(logrus.Fields{
282                 "ContainerUUID": ctr.UUID,
283                 "State":         ctr.State,
284                 "Priority":      ctr.Priority,
285                 "InstanceTypes": typeNames,
286         }).Info("adding container to queue")
287         cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
288 }
289
290 func (cq *Queue) cancelUnsatisfiableContainer(ctr arvados.Container, errorString string) {
291         logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
292         if ctr.State == arvados.ContainerStateQueued {
293                 // Can't set runtime error without locking first.
294                 err := cq.Lock(ctr.UUID)
295                 if err != nil {
296                         logger.WithError(err).Warn("lock failed")
297                         return
298                         // ...and try again on the next Update, if the
299                         // problem still exists.
300                 }
301         }
302         var err error
303         defer func() {
304                 if err == nil {
305                         return
306                 }
307                 // On failure, check current container state, and
308                 // don't log the error if the failure came from losing
309                 // a race.
310                 var latest arvados.Container
311                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
312                 if latest.State == arvados.ContainerStateCancelled {
313                         return
314                 }
315                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
316         }()
317         err = cq.setRuntimeError(ctr.UUID, errorString)
318         if err != nil {
319                 return
320         }
321         err = cq.Cancel(ctr.UUID)
322         if err != nil {
323                 return
324         }
325 }
326
327 // Lock acquires the dispatch lock for the given container.
328 func (cq *Queue) Lock(uuid string) error {
329         return cq.apiUpdate(uuid, "lock")
330 }
331
332 // Unlock releases the dispatch lock for the given container.
333 func (cq *Queue) Unlock(uuid string) error {
334         return cq.apiUpdate(uuid, "unlock")
335 }
336
337 // setRuntimeError sets runtime_status["error"] to the given value.
338 // Container should already have state==Locked or Running.
339 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
340         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
341                 "container": {
342                         "runtime_status": {
343                                 "error": errorString,
344                         },
345                 },
346         })
347 }
348
349 // Cancel cancels the given container.
350 func (cq *Queue) Cancel(uuid string) error {
351         var resp arvados.Container
352         err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
353                 "container": {"state": arvados.ContainerStateCancelled},
354         })
355         if err != nil {
356                 return err
357         }
358         cq.updateWithResp(uuid, resp)
359         return nil
360 }
361
362 func (cq *Queue) apiUpdate(uuid, action string) error {
363         var resp arvados.Container
364         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
365         if err != nil {
366                 return err
367         }
368         cq.updateWithResp(uuid, resp)
369         return nil
370 }
371
372 // Update the local queue with the response received from a
373 // state-changing API request (lock/unlock/cancel).
374 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
375         cq.mtx.Lock()
376         defer cq.mtx.Unlock()
377         if cq.dontupdate != nil {
378                 cq.dontupdate[uuid] = struct{}{}
379         }
380         ent, ok := cq.current[uuid]
381         if !ok {
382                 // Container is not in queue (e.g., it was not added
383                 // because there is no suitable instance type, and
384                 // we're just locking/updating it in order to set an
385                 // error message). No need to add it, and we don't
386                 // necessarily have enough information to add it here
387                 // anyway because lock/unlock responses don't include
388                 // runtime_constraints.
389                 return
390         }
391         ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
392         cq.current[uuid] = ent
393         cq.notify()
394 }
395
396 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
397         cq.mtx.Lock()
398         size := len(cq.current)
399         auth := cq.auth
400         cq.mtx.Unlock()
401
402         if auth == nil {
403                 auth = &arvados.APIClientAuthorization{}
404                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
405                 if err != nil {
406                         return nil, err
407                 }
408                 cq.mtx.Lock()
409                 cq.auth = auth
410                 cq.mtx.Unlock()
411         }
412
413         next := make(map[string]*arvados.Container, size)
414         apply := func(updates []arvados.Container) {
415                 for _, upd := range updates {
416                         if next[upd.UUID] == nil {
417                                 next[upd.UUID] = &arvados.Container{}
418                         }
419                         *next[upd.UUID] = upd
420                 }
421         }
422         selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
423         limitParam := 1000
424
425         mine, err := cq.fetchAll(arvados.ResourceListParams{
426                 Select:  selectParam,
427                 Order:   "uuid",
428                 Limit:   &limitParam,
429                 Count:   "none",
430                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
431         }, 0)
432         if err != nil {
433                 return nil, err
434         }
435         apply(mine)
436
437         avail, err := cq.fetchAll(arvados.ResourceListParams{
438                 Select:  selectParam,
439                 Order:   "priority desc",
440                 Limit:   &limitParam,
441                 Count:   "none",
442                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
443         }, queuedContainersTarget)
444         if err != nil {
445                 return nil, err
446         }
447         apply(avail)
448
449         // Check for containers that we already know about but weren't
450         // returned by any of the above queries, and fetch them
451         // explicitly by UUID. If they're in a final state we can drop
452         // them, but otherwise we need to apply updates, e.g.,
453         //
454         // - Queued container priority has been reduced
455         // - Locked container has been requeued with lower priority
456         missing := map[string]bool{}
457         cq.mtx.Lock()
458         for uuid, ent := range cq.current {
459                 if next[uuid] == nil &&
460                         ent.Container.State != arvados.ContainerStateCancelled &&
461                         ent.Container.State != arvados.ContainerStateComplete {
462                         missing[uuid] = true
463                 }
464         }
465         cq.mtx.Unlock()
466
467         for len(missing) > 0 {
468                 var batch []string
469                 for uuid := range missing {
470                         batch = append(batch, uuid)
471                         if len(batch) == 20 {
472                                 break
473                         }
474                 }
475                 filters := []arvados.Filter{{"uuid", "in", batch}}
476                 ended, err := cq.fetchAll(arvados.ResourceListParams{
477                         Select:  selectParam,
478                         Order:   "uuid",
479                         Count:   "none",
480                         Filters: filters,
481                 }, 0)
482                 if err != nil {
483                         return nil, err
484                 }
485                 apply(ended)
486                 if len(ended) == 0 {
487                         // This is the only case where we can conclude
488                         // a container has been deleted from the
489                         // database. A short (but non-zero) page, on
490                         // the other hand, can be caused by a response
491                         // size limit.
492                         for _, uuid := range batch {
493                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
494                                 delete(missing, uuid)
495                                 cq.mtx.Lock()
496                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
497                                 cq.mtx.Unlock()
498                         }
499                         continue
500                 }
501                 for _, ctr := range ended {
502                         if _, ok := missing[ctr.UUID]; !ok {
503                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
504                                 cq.logger.WithFields(logrus.Fields{
505                                         "ContainerUUID": ctr.UUID,
506                                         "Filters":       filters,
507                                 }).Error(msg)
508                                 return nil, errors.New(msg)
509                         }
510                         delete(missing, ctr.UUID)
511                 }
512         }
513         return next, nil
514 }
515
516 // Fetch all pages of containers.
517 //
518 // Except: if maxNonSuper>0, stop fetching more pages after receving
519 // that many non-supervisor containers. Along with {Order: "priority
520 // desc"}, this enables fetching enough high priority scheduling-ready
521 // containers to make progress, without necessarily fetching the
522 // entire queue.
523 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
524         var results []arvados.Container
525         params := initialParams
526         params.Offset = 0
527         nonSuper := 0
528         for {
529                 // This list variable must be a new one declared
530                 // inside the loop: otherwise, items in the API
531                 // response would get deep-merged into the items
532                 // loaded in previous iterations.
533                 var list arvados.ContainerList
534
535                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
536                 if err != nil {
537                         return nil, err
538                 }
539                 if len(list.Items) == 0 {
540                         break
541                 }
542
543                 // Conserve memory by deleting mounts that aren't
544                 // relevant to choosing the instance type.
545                 for _, c := range list.Items {
546                         for path, mnt := range c.Mounts {
547                                 if mnt.Kind != "tmp" {
548                                         delete(c.Mounts, path)
549                                 }
550                         }
551                         if !c.SchedulingParameters.Supervisor {
552                                 nonSuper++
553                         }
554                 }
555
556                 results = append(results, list.Items...)
557                 if maxNonSuper > 0 && nonSuper >= maxNonSuper {
558                         break
559                 } else if params.Order == "uuid" {
560                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
561                 } else {
562                         params.Offset += len(list.Items)
563                 }
564         }
565         return results, nil
566 }
567
568 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
569         mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
570                 Namespace: "arvados",
571                 Subsystem: "dispatchcloud",
572                 Name:      "queue_entries",
573                 Help:      "Number of active container entries in the controller database.",
574         }, []string{"state", "instance_type"})
575         reg.MustRegister(mEntries)
576
577         type entKey struct {
578                 state arvados.ContainerState
579                 inst  string
580         }
581         count := map[entKey]int{}
582
583         ch := cq.Subscribe()
584         defer cq.Unsubscribe(ch)
585         for range ch {
586                 for k := range count {
587                         count[k] = 0
588                 }
589                 ents, _ := cq.Entries()
590                 for _, ent := range ents {
591                         count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
592                 }
593                 for k, v := range count {
594                         mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
595                 }
596         }
597 }