1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/sirupsen/logrus"
18 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
20 // An APIClient performs Arvados API requests. It is typically an
22 type APIClient interface {
23 RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
26 // A QueueEnt is an entry in the queue, consisting of a container
27 // record and the instance type that should be used to run it.
28 type QueueEnt struct {
29 // The container to run. Only the UUID, State, Priority,
30 // RuntimeConstraints, Mounts, and ContainerImage fields are
32 Container arvados.Container `json:"container"`
33 InstanceType arvados.InstanceType `json:"instance_type"`
34 FirstSeenAt time.Time `json:"first_seen_at"`
37 // String implements fmt.Stringer by returning the queued container's
39 func (c *QueueEnt) String() string {
40 return c.Container.UUID
43 // A Queue is an interface to an Arvados cluster's container
44 // database. It presents only the containers that are eligible to be
45 // run by, are already being run by, or have recently been run by the
46 // present dispatcher.
48 // The Entries, Get, and Forget methods do not block: they return
49 // immediately, using cached data.
51 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
52 // return only after the operation has completed.
54 // A Queue's Update method should be called periodically to keep the
57 logger logrus.FieldLogger
58 chooseType typeChooser
61 auth *arvados.APIClientAuthorization
62 current map[string]QueueEnt
66 // Methods that modify the Queue (like Lock) add the affected
67 // container UUIDs to dontupdate. When applying a batch of
68 // updates received from the network, anything appearing in
69 // dontupdate is skipped, in case the received update has
70 // already been superseded by the locally initiated change.
71 // When no network update is in progress, this protection is
72 // not needed, and dontupdate is nil.
73 dontupdate map[string]struct{}
75 // active notification subscribers (see Subscribe)
76 subscribers map[<-chan struct{}]chan struct{}
79 // NewQueue returns a new Queue. When a new container appears in the
80 // Arvados cluster's queue during Update, chooseType will be called to
81 // assign an appropriate arvados.InstanceType for the queue entry.
82 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
85 chooseType: chooseType,
87 current: map[string]QueueEnt{},
88 subscribers: map[<-chan struct{}]chan struct{}{},
96 // Subscribe returns a channel that becomes ready to receive when an
97 // entry in the Queue is updated.
99 // ch := q.Subscribe()
100 // defer q.Unsubscribe(ch)
104 func (cq *Queue) Subscribe() <-chan struct{} {
106 defer cq.mtx.Unlock()
107 ch := make(chan struct{}, 1)
108 cq.subscribers[ch] = ch
112 // Unsubscribe stops sending updates to the given channel. See
114 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
116 defer cq.mtx.Unlock()
117 delete(cq.subscribers, ch)
120 // Caller must have lock.
121 func (cq *Queue) notify() {
122 for _, ch := range cq.subscribers {
124 case ch <- struct{}{}:
130 // Forget drops the specified container from the cache. It should be
131 // called on finalized containers to avoid leaking memory over
132 // time. It is a no-op if the indicated container is not in a
134 func (cq *Queue) Forget(uuid string) {
136 defer cq.mtx.Unlock()
137 ctr := cq.current[uuid].Container
138 if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
139 cq.delEnt(uuid, ctr.State)
143 // Get returns the (partial) Container record for the specified
144 // container. Like a map lookup, its second return value is false if
145 // the specified container is not in the Queue.
146 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
148 defer cq.mtx.Unlock()
149 ctr, ok := cq.current[uuid]
151 return arvados.Container{}, false
153 return ctr.Container, true
156 // Entries returns all cache entries, keyed by container UUID.
158 // The returned threshold indicates the maximum age of any cached data
159 // returned in the map. This makes it possible for a scheduler to
160 // determine correctly the outcome of a remote process that updates
161 // container state. It must first wait for the remote process to exit,
162 // then wait for the Queue to start and finish its next Update --
163 // i.e., it must wait until threshold > timeProcessExited.
164 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
166 defer cq.mtx.Unlock()
167 entries = make(map[string]QueueEnt, len(cq.current))
168 for uuid, ctr := range cq.current {
171 threshold = cq.updated
175 // Update refreshes the cache from the Arvados API. It adds newly
176 // queued containers, and updates the state of previously queued
178 func (cq *Queue) Update() error {
180 cq.dontupdate = map[string]struct{}{}
181 updateStarted := time.Now()
184 next, err := cq.poll()
190 defer cq.mtx.Unlock()
191 for uuid, ctr := range next {
192 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
193 // Don't clobber a local update that happened
194 // after we started polling.
197 if cur, ok := cq.current[uuid]; !ok {
198 cq.addEnt(uuid, *ctr)
201 cq.current[uuid] = cur
204 for uuid, ent := range cq.current {
205 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
206 // Don't expunge an entry that was
207 // added/updated locally after we started
210 } else if _, stillpresent := next[uuid]; !stillpresent {
211 // Expunge an entry that no longer appears in
212 // the poll response (evidently it's
213 // cancelled, completed, deleted, or taken by
214 // a different dispatcher).
215 cq.delEnt(uuid, ent.Container.State)
219 cq.updated = updateStarted
224 // Caller must have lock.
225 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
226 cq.logger.WithFields(logrus.Fields{
227 "ContainerUUID": uuid,
229 }).Info("dropping container from queue")
230 delete(cq.current, uuid)
233 // Caller must have lock.
234 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
235 it, err := cq.chooseType(&ctr)
236 if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
237 // We assume here that any chooseType error is a hard
238 // error: it wouldn't help to try again, or to leave
239 // it for a different dispatcher process to attempt.
240 errorString := err.Error()
241 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
242 logger.WithError(err).Warn("cancel container with no suitable instance type")
244 if ctr.State == arvados.ContainerStateQueued {
245 // Can't set runtime error without
247 err := cq.Lock(ctr.UUID)
249 logger.WithError(err).Warn("lock failed")
251 // ...and try again on the
252 // next Update, if the problem
261 // On failure, check current container
262 // state, and don't log the error if
263 // the failure came from losing a
265 var latest arvados.Container
266 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
267 if latest.State == arvados.ContainerStateCancelled {
270 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
272 err = cq.setRuntimeError(ctr.UUID, errorString)
276 err = cq.Cancel(ctr.UUID)
283 cq.logger.WithFields(logrus.Fields{
284 "ContainerUUID": ctr.UUID,
286 "Priority": ctr.Priority,
287 "InstanceType": it.Name,
288 }).Info("adding container to queue")
289 cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
292 // Lock acquires the dispatch lock for the given container.
293 func (cq *Queue) Lock(uuid string) error {
294 return cq.apiUpdate(uuid, "lock")
297 // Unlock releases the dispatch lock for the given container.
298 func (cq *Queue) Unlock(uuid string) error {
299 return cq.apiUpdate(uuid, "unlock")
302 // setRuntimeError sets runtime_status["error"] to the given value.
303 // Container should already have state==Locked or Running.
304 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
305 return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
308 "error": errorString,
314 // Cancel cancels the given container.
315 func (cq *Queue) Cancel(uuid string) error {
316 var resp arvados.Container
317 err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
318 "container": {"state": arvados.ContainerStateCancelled},
323 cq.updateWithResp(uuid, resp)
327 func (cq *Queue) apiUpdate(uuid, action string) error {
328 var resp arvados.Container
329 err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
333 cq.updateWithResp(uuid, resp)
337 // Update the local queue with the response received from a
338 // state-changing API request (lock/unlock/cancel).
339 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
341 defer cq.mtx.Unlock()
342 if cq.dontupdate != nil {
343 cq.dontupdate[uuid] = struct{}{}
345 ent, ok := cq.current[uuid]
347 // Container is not in queue (e.g., it was not added
348 // because there is no suitable instance type, and
349 // we're just locking/updating it in order to set an
350 // error message). No need to add it, and we don't
351 // necessarily have enough information to add it here
352 // anyway because lock/unlock responses don't include
353 // runtime_constraints.
356 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
357 cq.current[uuid] = ent
361 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
363 size := len(cq.current)
368 auth = &arvados.APIClientAuthorization{}
369 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
378 next := make(map[string]*arvados.Container, size)
379 apply := func(updates []arvados.Container) {
380 for _, upd := range updates {
381 if next[upd.UUID] == nil {
382 next[upd.UUID] = &arvados.Container{}
384 *next[upd.UUID] = upd
387 selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
390 mine, err := cq.fetchAll(arvados.ResourceListParams{
395 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
402 avail, err := cq.fetchAll(arvados.ResourceListParams{
407 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
414 missing := map[string]bool{}
416 for uuid, ent := range cq.current {
417 if next[uuid] == nil &&
418 ent.Container.State != arvados.ContainerStateCancelled &&
419 ent.Container.State != arvados.ContainerStateComplete {
425 for len(missing) > 0 {
427 for uuid := range missing {
428 batch = append(batch, uuid)
429 if len(batch) == 20 {
433 filters := []arvados.Filter{{"uuid", "in", batch}}
434 ended, err := cq.fetchAll(arvados.ResourceListParams{
445 // This is the only case where we can conclude
446 // a container has been deleted from the
447 // database. A short (but non-zero) page, on
448 // the other hand, can be caused by a response
450 for _, uuid := range batch {
451 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
452 delete(missing, uuid)
454 cq.delEnt(uuid, cq.current[uuid].Container.State)
459 for _, ctr := range ended {
460 if _, ok := missing[ctr.UUID]; !ok {
461 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
462 cq.logger.WithFields(logrus.Fields{
463 "ContainerUUID": ctr.UUID,
466 return nil, errors.New(msg)
468 delete(missing, ctr.UUID)
474 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
475 var results []arvados.Container
476 params := initialParams
479 // This list variable must be a new one declared
480 // inside the loop: otherwise, items in the API
481 // response would get deep-merged into the items
482 // loaded in previous iterations.
483 var list arvados.ContainerList
485 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
489 if len(list.Items) == 0 {
493 results = append(results, list.Items...)
494 if len(params.Order) == 1 && params.Order == "uuid" {
495 params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
497 params.Offset += len(list.Items)
503 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
504 mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
505 Namespace: "arvados",
506 Subsystem: "dispatchcloud",
507 Name: "queue_entries",
508 Help: "Number of active container entries in the controller database.",
509 }, []string{"state", "instance_type"})
510 reg.MustRegister(mEntries)
513 state arvados.ContainerState
516 count := map[entKey]int{}
519 defer cq.Unsubscribe(ch)
521 for k := range count {
524 ents, _ := cq.Entries()
525 for _, ent := range ents {
526 count[entKey{ent.Container.State, ent.InstanceType.Name}]++
528 for k, v := range count {
529 mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))