1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "github.com/prometheus/client_golang/prometheus"
16 "github.com/sirupsen/logrus"
19 // Stop fetching queued containers after this many of the highest
20 // priority non-supervisor containers. Reduces API load when queue is
21 // long. This also limits how quickly a large batch of queued
22 // containers can be started, which improves reliability under high
23 // load at the cost of increased under light load.
24 const queuedContainersTarget = 100
26 type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
28 // An APIClient performs Arvados API requests. It is typically an
30 type APIClient interface {
31 RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
34 // A QueueEnt is an entry in the queue, consisting of a container
35 // record and the instance type that should be used to run it.
36 type QueueEnt struct {
37 // The container to run. Only the UUID, State, Priority,
38 // RuntimeConstraints, ContainerImage, SchedulingParameters,
39 // and CreatedAt fields are populated.
40 Container arvados.Container `json:"container"`
41 InstanceTypes []arvados.InstanceType `json:"instance_types"`
42 FirstSeenAt time.Time `json:"first_seen_at"`
45 // String implements fmt.Stringer by returning the queued container's
47 func (c *QueueEnt) String() string {
48 return c.Container.UUID
51 // A Queue is an interface to an Arvados cluster's container
52 // database. It presents only the containers that are eligible to be
53 // run by, are already being run by, or have recently been run by the
54 // present dispatcher.
56 // The Entries, Get, and Forget methods do not block: they return
57 // immediately, using cached data.
59 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
60 // return only after the operation has completed.
62 // A Queue's Update method should be called periodically to keep the
65 logger logrus.FieldLogger
66 chooseType typeChooser
69 auth *arvados.APIClientAuthorization
70 current map[string]QueueEnt
74 // Methods that modify the Queue (like Lock) add the affected
75 // container UUIDs to dontupdate. When applying a batch of
76 // updates received from the network, anything appearing in
77 // dontupdate is skipped, in case the received update has
78 // already been superseded by the locally initiated change.
79 // When no network update is in progress, this protection is
80 // not needed, and dontupdate is nil.
81 dontupdate map[string]struct{}
83 // active notification subscribers (see Subscribe)
84 subscribers map[<-chan struct{}]chan struct{}
87 // NewQueue returns a new Queue. When a new container appears in the
88 // Arvados cluster's queue during Update, chooseType will be called to
89 // assign an appropriate arvados.InstanceType for the queue entry.
90 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
93 chooseType: chooseType,
95 current: map[string]QueueEnt{},
96 subscribers: map[<-chan struct{}]chan struct{}{},
104 // Subscribe returns a channel that becomes ready to receive when an
105 // entry in the Queue is updated.
107 // ch := q.Subscribe()
108 // defer q.Unsubscribe(ch)
112 func (cq *Queue) Subscribe() <-chan struct{} {
114 defer cq.mtx.Unlock()
115 ch := make(chan struct{}, 1)
116 cq.subscribers[ch] = ch
120 // Unsubscribe stops sending updates to the given channel. See
122 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
124 defer cq.mtx.Unlock()
125 delete(cq.subscribers, ch)
128 // Caller must have lock.
129 func (cq *Queue) notify() {
130 for _, ch := range cq.subscribers {
132 case ch <- struct{}{}:
138 // Forget drops the specified container from the cache. It should be
139 // called on finalized containers to avoid leaking memory over
140 // time. It is a no-op if the indicated container is not in a
142 func (cq *Queue) Forget(uuid string) {
144 defer cq.mtx.Unlock()
145 ctr := cq.current[uuid].Container
146 if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
147 cq.delEnt(uuid, ctr.State)
151 // Get returns the (partial) Container record for the specified
152 // container. Like a map lookup, its second return value is false if
153 // the specified container is not in the Queue.
154 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
156 defer cq.mtx.Unlock()
157 ctr, ok := cq.current[uuid]
159 return arvados.Container{}, false
161 return ctr.Container, true
164 // Entries returns all cache entries, keyed by container UUID.
166 // The returned threshold indicates the maximum age of any cached data
167 // returned in the map. This makes it possible for a scheduler to
168 // determine correctly the outcome of a remote process that updates
169 // container state. It must first wait for the remote process to exit,
170 // then wait for the Queue to start and finish its next Update --
171 // i.e., it must wait until threshold > timeProcessExited.
172 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
174 defer cq.mtx.Unlock()
175 entries = make(map[string]QueueEnt, len(cq.current))
176 for uuid, ctr := range cq.current {
179 threshold = cq.updated
183 // Update refreshes the cache from the Arvados API. It adds newly
184 // queued containers, and updates the state of previously queued
186 func (cq *Queue) Update() error {
188 cq.dontupdate = map[string]struct{}{}
189 updateStarted := time.Now()
192 next, err := cq.poll()
198 defer cq.mtx.Unlock()
199 for uuid, ctr := range next {
200 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
201 // Don't clobber a local update that happened
202 // after we started polling.
205 if cur, ok := cq.current[uuid]; !ok {
206 cq.addEnt(uuid, *ctr)
209 cq.current[uuid] = cur
212 for uuid, ent := range cq.current {
213 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
214 // Don't expunge an entry that was
215 // added/updated locally after we started
218 } else if _, stillpresent := next[uuid]; !stillpresent {
219 // Expunge an entry that no longer appears in
220 // the poll response (evidently it's
221 // cancelled, completed, deleted, or taken by
222 // a different dispatcher).
223 cq.delEnt(uuid, ent.Container.State)
227 cq.updated = updateStarted
232 // Caller must have lock.
233 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
234 cq.logger.WithFields(logrus.Fields{
235 "ContainerUUID": uuid,
237 }).Info("dropping container from queue")
238 delete(cq.current, uuid)
241 // Caller must have lock.
242 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
243 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
244 // We didn't ask for the Mounts field when polling
245 // controller/RailsAPI, because it can be expensive on the
246 // Rails side, and most of the time we already have it. But
247 // this is the first time we're seeing this container, so we
248 // need to fetch mounts in order to choose an instance type.
249 err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
250 Select: []string{"mounts"},
253 logger.WithError(err).Warn("error getting mounts")
254 if strings.Contains(err.Error(), "json: cannot unmarshal") {
255 // see https://dev.arvados.org/issues/21314
256 go cq.cancelUnsatisfiableContainer(ctr, "error getting mounts from container record: "+err.Error())
260 types, err := cq.chooseType(&ctr)
262 // Avoid wasting memory on a large Mounts attr (we don't need
263 // it after choosing type).
266 if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
267 // We assume here that any chooseType error is a hard
268 // error: it wouldn't help to try again, or to leave
269 // it for a different dispatcher process to attempt.
270 logger.WithError(err).Warn("cancel container with no suitable instance type")
271 go cq.cancelUnsatisfiableContainer(ctr, err.Error())
275 for _, it := range types {
281 cq.logger.WithFields(logrus.Fields{
282 "ContainerUUID": ctr.UUID,
284 "Priority": ctr.Priority,
285 "InstanceTypes": typeNames,
286 }).Info("adding container to queue")
287 cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
290 func (cq *Queue) cancelUnsatisfiableContainer(ctr arvados.Container, errorString string) {
291 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
292 if ctr.State == arvados.ContainerStateQueued {
293 // Can't set runtime error without locking first.
294 err := cq.Lock(ctr.UUID)
296 logger.WithError(err).Warn("lock failed")
298 // ...and try again on the next Update, if the
299 // problem still exists.
307 // On failure, check current container state, and
308 // don't log the error if the failure came from losing
310 var latest arvados.Container
311 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
312 if latest.State == arvados.ContainerStateCancelled {
315 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
317 err = cq.setRuntimeError(ctr.UUID, errorString)
321 err = cq.Cancel(ctr.UUID)
327 // Lock acquires the dispatch lock for the given container.
328 func (cq *Queue) Lock(uuid string) error {
329 return cq.apiUpdate(uuid, "lock")
332 // Unlock releases the dispatch lock for the given container.
333 func (cq *Queue) Unlock(uuid string) error {
334 return cq.apiUpdate(uuid, "unlock")
337 // setRuntimeError sets runtime_status["error"] to the given value.
338 // Container should already have state==Locked or Running.
339 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
340 return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
343 "error": errorString,
349 // Cancel cancels the given container.
350 func (cq *Queue) Cancel(uuid string) error {
351 var resp arvados.Container
352 err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
353 "container": {"state": arvados.ContainerStateCancelled},
358 cq.updateWithResp(uuid, resp)
362 func (cq *Queue) apiUpdate(uuid, action string) error {
363 var resp arvados.Container
364 err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
368 cq.updateWithResp(uuid, resp)
372 // Update the local queue with the response received from a
373 // state-changing API request (lock/unlock/cancel).
374 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
376 defer cq.mtx.Unlock()
377 if cq.dontupdate != nil {
378 cq.dontupdate[uuid] = struct{}{}
380 ent, ok := cq.current[uuid]
382 // Container is not in queue (e.g., it was not added
383 // because there is no suitable instance type, and
384 // we're just locking/updating it in order to set an
385 // error message). No need to add it, and we don't
386 // necessarily have enough information to add it here
387 // anyway because lock/unlock responses don't include
388 // runtime_constraints.
391 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
392 cq.current[uuid] = ent
396 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
398 size := len(cq.current)
403 auth = &arvados.APIClientAuthorization{}
404 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
413 next := make(map[string]*arvados.Container, size)
414 apply := func(updates []arvados.Container) {
415 for _, upd := range updates {
416 if next[upd.UUID] == nil {
417 next[upd.UUID] = &arvados.Container{}
419 *next[upd.UUID] = upd
422 selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
425 mine, err := cq.fetchAll(arvados.ResourceListParams{
430 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
437 avail, err := cq.fetchAll(arvados.ResourceListParams{
439 Order: "priority desc",
442 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
443 }, queuedContainersTarget)
449 // Check for containers that we already know about but weren't
450 // returned by any of the above queries, and fetch them
451 // explicitly by UUID. If they're in a final state we can drop
452 // them, but otherwise we need to apply updates, e.g.,
454 // - Queued container priority has been reduced
455 // - Locked container has been requeued with lower priority
456 missing := map[string]bool{}
458 for uuid, ent := range cq.current {
459 if next[uuid] == nil &&
460 ent.Container.State != arvados.ContainerStateCancelled &&
461 ent.Container.State != arvados.ContainerStateComplete {
467 for len(missing) > 0 {
469 for uuid := range missing {
470 batch = append(batch, uuid)
471 if len(batch) == 20 {
475 filters := []arvados.Filter{{"uuid", "in", batch}}
476 ended, err := cq.fetchAll(arvados.ResourceListParams{
487 // This is the only case where we can conclude
488 // a container has been deleted from the
489 // database. A short (but non-zero) page, on
490 // the other hand, can be caused by a response
492 for _, uuid := range batch {
493 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
494 delete(missing, uuid)
496 cq.delEnt(uuid, cq.current[uuid].Container.State)
501 for _, ctr := range ended {
502 if _, ok := missing[ctr.UUID]; !ok {
503 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
504 cq.logger.WithFields(logrus.Fields{
505 "ContainerUUID": ctr.UUID,
508 return nil, errors.New(msg)
510 delete(missing, ctr.UUID)
516 // Fetch all pages of containers.
518 // Except: if maxNonSuper>0, stop fetching more pages after receving
519 // that many non-supervisor containers. Along with {Order: "priority
520 // desc"}, this enables fetching enough high priority scheduling-ready
521 // containers to make progress, without necessarily fetching the
523 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
524 var results []arvados.Container
525 params := initialParams
529 // This list variable must be a new one declared
530 // inside the loop: otherwise, items in the API
531 // response would get deep-merged into the items
532 // loaded in previous iterations.
533 var list arvados.ContainerList
535 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
539 if len(list.Items) == 0 {
543 // Conserve memory by deleting mounts that aren't
544 // relevant to choosing the instance type.
545 for _, c := range list.Items {
546 for path, mnt := range c.Mounts {
547 if mnt.Kind != "tmp" {
548 delete(c.Mounts, path)
551 if !c.SchedulingParameters.Supervisor {
556 results = append(results, list.Items...)
557 if maxNonSuper > 0 && nonSuper >= maxNonSuper {
559 } else if params.Order == "uuid" {
560 params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
562 params.Offset += len(list.Items)
568 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
569 mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
570 Namespace: "arvados",
571 Subsystem: "dispatchcloud",
572 Name: "queue_entries",
573 Help: "Number of active container entries in the controller database.",
574 }, []string{"state", "instance_type"})
575 reg.MustRegister(mEntries)
578 state arvados.ContainerState
581 count := map[entKey]int{}
584 defer cq.Unsubscribe(ch)
586 for k := range count {
589 ents, _ := cq.Entries()
590 for _, ent := range ents {
591 count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
593 for k, v := range count {
594 mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))