1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/sirupsen/logrus"
17 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
19 // An APIClient performs Arvados API requests. It is typically an
21 type APIClient interface {
22 RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
25 // A QueueEnt is an entry in the queue, consisting of a container
26 // record and the instance type that should be used to run it.
27 type QueueEnt struct {
28 // The container to run. Only the UUID, State, Priority, and
29 // RuntimeConstraints fields are populated.
30 Container arvados.Container `json:"container"`
31 InstanceType arvados.InstanceType `json:"instance_type"`
34 // String implements fmt.Stringer by returning the queued container's
36 func (c *QueueEnt) String() string {
37 return c.Container.UUID
40 // A Queue is an interface to an Arvados cluster's container
41 // database. It presents only the containers that are eligible to be
42 // run by, are already being run by, or have recently been run by the
43 // present dispatcher.
45 // The Entries, Get, and Forget methods do not block: they return
46 // immediately, using cached data.
48 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
49 // return only after the operation has completed.
51 // A Queue's Update method should be called periodically to keep the
54 logger logrus.FieldLogger
55 reg *prometheus.Registry
56 chooseType typeChooser
59 auth *arvados.APIClientAuthorization
60 current map[string]QueueEnt
64 // Methods that modify the Queue (like Lock) add the affected
65 // container UUIDs to dontupdate. When applying a batch of
66 // updates received from the network, anything appearing in
67 // dontupdate is skipped, in case the received update has
68 // already been superseded by the locally initiated change.
69 // When no network update is in progress, this protection is
70 // not needed, and dontupdate is nil.
71 dontupdate map[string]struct{}
73 // active notification subscribers (see Subscribe)
74 subscribers map[<-chan struct{}]chan struct{}
77 // NewQueue returns a new Queue. When a new container appears in the
78 // Arvados cluster's queue during Update, chooseType will be called to
79 // assign an appropriate arvados.InstanceType for the queue entry.
80 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
84 chooseType: chooseType,
86 current: map[string]QueueEnt{},
87 subscribers: map[<-chan struct{}]chan struct{}{},
91 // Subscribe returns a channel that becomes ready to receive when an
92 // entry in the Queue is updated.
94 // ch := q.Subscribe()
95 // defer q.Unsubscribe(ch)
99 func (cq *Queue) Subscribe() <-chan struct{} {
101 defer cq.mtx.Unlock()
102 ch := make(chan struct{}, 1)
103 cq.subscribers[ch] = ch
107 // Unsubscribe stops sending updates to the given channel. See
109 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
111 defer cq.mtx.Unlock()
112 delete(cq.subscribers, ch)
115 // Caller must have lock.
116 func (cq *Queue) notify() {
117 for _, ch := range cq.subscribers {
119 case ch <- struct{}{}:
125 // Forget drops the specified container from the cache. It should be
126 // called on finalized containers to avoid leaking memory over
127 // time. It is a no-op if the indicated container is not in a
129 func (cq *Queue) Forget(uuid string) {
131 defer cq.mtx.Unlock()
132 ctr := cq.current[uuid].Container
133 if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
134 delete(cq.current, uuid)
138 // Get returns the (partial) Container record for the specified
139 // container. Like a map lookup, its second return value is false if
140 // the specified container is not in the Queue.
141 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
143 defer cq.mtx.Unlock()
144 if ctr, ok := cq.current[uuid]; !ok {
145 return arvados.Container{}, false
147 return ctr.Container, true
151 // Entries returns all cache entries, keyed by container UUID.
153 // The returned threshold indicates the maximum age of any cached data
154 // returned in the map. This makes it possible for a scheduler to
155 // determine correctly the outcome of a remote process that updates
156 // container state. It must first wait for the remote process to exit,
157 // then wait for the Queue to start and finish its next Update --
158 // i.e., it must wait until threshold > timeProcessExited.
159 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
161 defer cq.mtx.Unlock()
162 entries = make(map[string]QueueEnt, len(cq.current))
163 for uuid, ctr := range cq.current {
166 threshold = cq.updated
170 // Update refreshes the cache from the Arvados API. It adds newly
171 // queued containers, and updates the state of previously queued
173 func (cq *Queue) Update() error {
175 cq.dontupdate = map[string]struct{}{}
176 updateStarted := time.Now()
179 next, err := cq.poll()
185 defer cq.mtx.Unlock()
186 for uuid, ctr := range next {
187 if _, keep := cq.dontupdate[uuid]; keep {
190 if cur, ok := cq.current[uuid]; !ok {
191 cq.addEnt(uuid, *ctr)
194 cq.current[uuid] = cur
197 for uuid := range cq.current {
198 if _, keep := cq.dontupdate[uuid]; keep {
200 } else if _, keep = next[uuid]; keep {
203 delete(cq.current, uuid)
207 cq.updated = updateStarted
212 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
213 it, err := cq.chooseType(&ctr)
214 if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
215 // We assume here that any chooseType error is a hard
216 // error: it wouldn't help to try again, or to leave
217 // it for a different dispatcher process to attempt.
218 errorString := err.Error()
219 cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
226 // On failure, check current container
227 // state, and don't log the error if
228 // the failure came from losing a
230 var latest arvados.Container
231 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
232 if latest.State == arvados.ContainerStateCancelled {
235 cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
237 if ctr.State == arvados.ContainerStateQueued {
238 err = cq.Lock(ctr.UUID)
243 err = cq.setRuntimeError(ctr.UUID, errorString)
247 err = cq.Cancel(ctr.UUID)
254 cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
257 // Lock acquires the dispatch lock for the given container.
258 func (cq *Queue) Lock(uuid string) error {
259 return cq.apiUpdate(uuid, "lock")
262 // Unlock releases the dispatch lock for the given container.
263 func (cq *Queue) Unlock(uuid string) error {
264 return cq.apiUpdate(uuid, "unlock")
267 // setRuntimeError sets runtime_status["error"] to the given value.
268 // Container should already have state==Locked or Running.
269 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
270 return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
273 "error": errorString,
279 // Cancel cancels the given container.
280 func (cq *Queue) Cancel(uuid string) error {
281 err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
282 "container": {"state": arvados.ContainerStateCancelled},
288 defer cq.mtx.Unlock()
293 func (cq *Queue) apiUpdate(uuid, action string) error {
294 var resp arvados.Container
295 err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
301 defer cq.mtx.Unlock()
302 if cq.dontupdate != nil {
303 cq.dontupdate[uuid] = struct{}{}
305 if ent, ok := cq.current[uuid]; !ok {
306 cq.addEnt(uuid, resp)
308 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
309 cq.current[uuid] = ent
315 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
317 size := len(cq.current)
322 auth = &arvados.APIClientAuthorization{}
323 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
332 next := make(map[string]*arvados.Container, size)
333 apply := func(updates []arvados.Container) {
334 for _, upd := range updates {
335 if next[upd.UUID] == nil {
336 next[upd.UUID] = &arvados.Container{}
338 *next[upd.UUID] = upd
341 selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
344 mine, err := cq.fetchAll(arvados.ResourceListParams{
349 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
356 avail, err := cq.fetchAll(arvados.ResourceListParams{
361 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
370 for uuid, ent := range cq.current {
371 if next[uuid] == nil &&
372 ent.Container.State != arvados.ContainerStateCancelled &&
373 ent.Container.State != arvados.ContainerStateComplete {
374 missing = append(missing, uuid)
379 for i, page := 0, 20; i < len(missing); i += page {
381 if len(batch) > page {
384 ended, err := cq.fetchAll(arvados.ResourceListParams{
388 Filters: []arvados.Filter{{"uuid", "in", batch}},
398 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
399 var results []arvados.Container
400 params := initialParams
403 // This list variable must be a new one declared
404 // inside the loop: otherwise, items in the API
405 // response would get deep-merged into the items
406 // loaded in previous iterations.
407 var list arvados.ContainerList
409 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
413 if len(list.Items) == 0 {
417 results = append(results, list.Items...)
418 if len(params.Order) == 1 && params.Order == "uuid" {
419 params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
421 params.Offset += len(list.Items)