17995: Fix merge error.
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
19
20 // An APIClient performs Arvados API requests. It is typically an
21 // *arvados.Client.
22 type APIClient interface {
23         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
24 }
25
26 // A QueueEnt is an entry in the queue, consisting of a container
27 // record and the instance type that should be used to run it.
28 type QueueEnt struct {
29         // The container to run. Only the UUID, State, Priority,
30         // RuntimeConstraints, Mounts, and ContainerImage fields are
31         // populated.
32         Container    arvados.Container    `json:"container"`
33         InstanceType arvados.InstanceType `json:"instance_type"`
34 }
35
36 // String implements fmt.Stringer by returning the queued container's
37 // UUID.
38 func (c *QueueEnt) String() string {
39         return c.Container.UUID
40 }
41
42 // A Queue is an interface to an Arvados cluster's container
43 // database. It presents only the containers that are eligible to be
44 // run by, are already being run by, or have recently been run by the
45 // present dispatcher.
46 //
47 // The Entries, Get, and Forget methods do not block: they return
48 // immediately, using cached data.
49 //
50 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
51 // return only after the operation has completed.
52 //
53 // A Queue's Update method should be called periodically to keep the
54 // cache up to date.
55 type Queue struct {
56         logger     logrus.FieldLogger
57         chooseType typeChooser
58         client     APIClient
59
60         auth    *arvados.APIClientAuthorization
61         current map[string]QueueEnt
62         updated time.Time
63         mtx     sync.Mutex
64
65         // Methods that modify the Queue (like Lock) add the affected
66         // container UUIDs to dontupdate. When applying a batch of
67         // updates received from the network, anything appearing in
68         // dontupdate is skipped, in case the received update has
69         // already been superseded by the locally initiated change.
70         // When no network update is in progress, this protection is
71         // not needed, and dontupdate is nil.
72         dontupdate map[string]struct{}
73
74         // active notification subscribers (see Subscribe)
75         subscribers map[<-chan struct{}]chan struct{}
76 }
77
78 // NewQueue returns a new Queue. When a new container appears in the
79 // Arvados cluster's queue during Update, chooseType will be called to
80 // assign an appropriate arvados.InstanceType for the queue entry.
81 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
82         cq := &Queue{
83                 logger:      logger,
84                 chooseType:  chooseType,
85                 client:      client,
86                 current:     map[string]QueueEnt{},
87                 subscribers: map[<-chan struct{}]chan struct{}{},
88         }
89         if reg != nil {
90                 go cq.runMetrics(reg)
91         }
92         return cq
93 }
94
95 // Subscribe returns a channel that becomes ready to receive when an
96 // entry in the Queue is updated.
97 //
98 //      ch := q.Subscribe()
99 //      defer q.Unsubscribe(ch)
100 //      for range ch {
101 //              // ...
102 //      }
103 func (cq *Queue) Subscribe() <-chan struct{} {
104         cq.mtx.Lock()
105         defer cq.mtx.Unlock()
106         ch := make(chan struct{}, 1)
107         cq.subscribers[ch] = ch
108         return ch
109 }
110
111 // Unsubscribe stops sending updates to the given channel. See
112 // Subscribe.
113 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
114         cq.mtx.Lock()
115         defer cq.mtx.Unlock()
116         delete(cq.subscribers, ch)
117 }
118
119 // Caller must have lock.
120 func (cq *Queue) notify() {
121         for _, ch := range cq.subscribers {
122                 select {
123                 case ch <- struct{}{}:
124                 default:
125                 }
126         }
127 }
128
129 // Forget drops the specified container from the cache. It should be
130 // called on finalized containers to avoid leaking memory over
131 // time. It is a no-op if the indicated container is not in a
132 // finalized state.
133 func (cq *Queue) Forget(uuid string) {
134         cq.mtx.Lock()
135         defer cq.mtx.Unlock()
136         ctr := cq.current[uuid].Container
137         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
138                 cq.delEnt(uuid, ctr.State)
139         }
140 }
141
142 // Get returns the (partial) Container record for the specified
143 // container. Like a map lookup, its second return value is false if
144 // the specified container is not in the Queue.
145 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
146         cq.mtx.Lock()
147         defer cq.mtx.Unlock()
148         ctr, ok := cq.current[uuid]
149         if !ok {
150                 return arvados.Container{}, false
151         }
152         return ctr.Container, true
153 }
154
155 // Entries returns all cache entries, keyed by container UUID.
156 //
157 // The returned threshold indicates the maximum age of any cached data
158 // returned in the map. This makes it possible for a scheduler to
159 // determine correctly the outcome of a remote process that updates
160 // container state. It must first wait for the remote process to exit,
161 // then wait for the Queue to start and finish its next Update --
162 // i.e., it must wait until threshold > timeProcessExited.
163 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
164         cq.mtx.Lock()
165         defer cq.mtx.Unlock()
166         entries = make(map[string]QueueEnt, len(cq.current))
167         for uuid, ctr := range cq.current {
168                 entries[uuid] = ctr
169         }
170         threshold = cq.updated
171         return
172 }
173
174 // Update refreshes the cache from the Arvados API. It adds newly
175 // queued containers, and updates the state of previously queued
176 // containers.
177 func (cq *Queue) Update() error {
178         cq.mtx.Lock()
179         cq.dontupdate = map[string]struct{}{}
180         updateStarted := time.Now()
181         cq.mtx.Unlock()
182
183         next, err := cq.poll()
184         if err != nil {
185                 return err
186         }
187
188         cq.mtx.Lock()
189         defer cq.mtx.Unlock()
190         for uuid, ctr := range next {
191                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
192                         // Don't clobber a local update that happened
193                         // after we started polling.
194                         continue
195                 }
196                 if cur, ok := cq.current[uuid]; !ok {
197                         cq.addEnt(uuid, *ctr)
198                 } else {
199                         cur.Container = *ctr
200                         cq.current[uuid] = cur
201                 }
202         }
203         for uuid, ent := range cq.current {
204                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
205                         // Don't expunge an entry that was
206                         // added/updated locally after we started
207                         // polling.
208                         continue
209                 } else if _, stillpresent := next[uuid]; !stillpresent {
210                         // Expunge an entry that no longer appears in
211                         // the poll response (evidently it's
212                         // cancelled, completed, deleted, or taken by
213                         // a different dispatcher).
214                         cq.delEnt(uuid, ent.Container.State)
215                 }
216         }
217         cq.dontupdate = nil
218         cq.updated = updateStarted
219         cq.notify()
220         return nil
221 }
222
223 // Caller must have lock.
224 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
225         cq.logger.WithFields(logrus.Fields{
226                 "ContainerUUID": uuid,
227                 "State":         state,
228         }).Info("dropping container from queue")
229         delete(cq.current, uuid)
230 }
231
232 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
233         it, err := cq.chooseType(&ctr)
234         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
235                 // We assume here that any chooseType error is a hard
236                 // error: it wouldn't help to try again, or to leave
237                 // it for a different dispatcher process to attempt.
238                 errorString := err.Error()
239                 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
240                 logger.WithError(err).Warn("cancel container with no suitable instance type")
241                 go func() {
242                         if ctr.State == arvados.ContainerStateQueued {
243                                 // Can't set runtime error without
244                                 // locking first.
245                                 err := cq.Lock(ctr.UUID)
246                                 if err != nil {
247                                         logger.WithError(err).Warn("lock failed")
248                                         return
249                                         // ...and try again on the
250                                         // next Update, if the problem
251                                         // still exists.
252                                 }
253                         }
254                         var err error
255                         defer func() {
256                                 if err == nil {
257                                         return
258                                 }
259                                 // On failure, check current container
260                                 // state, and don't log the error if
261                                 // the failure came from losing a
262                                 // race.
263                                 var latest arvados.Container
264                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
265                                 if latest.State == arvados.ContainerStateCancelled {
266                                         return
267                                 }
268                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
269                         }()
270                         err = cq.setRuntimeError(ctr.UUID, errorString)
271                         if err != nil {
272                                 return
273                         }
274                         err = cq.Cancel(ctr.UUID)
275                         if err != nil {
276                                 return
277                         }
278                 }()
279                 return
280         }
281         cq.logger.WithFields(logrus.Fields{
282                 "ContainerUUID": ctr.UUID,
283                 "State":         ctr.State,
284                 "Priority":      ctr.Priority,
285                 "InstanceType":  it.Name,
286         }).Info("adding container to queue")
287         cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
288 }
289
290 // Lock acquires the dispatch lock for the given container.
291 func (cq *Queue) Lock(uuid string) error {
292         return cq.apiUpdate(uuid, "lock")
293 }
294
295 // Unlock releases the dispatch lock for the given container.
296 func (cq *Queue) Unlock(uuid string) error {
297         return cq.apiUpdate(uuid, "unlock")
298 }
299
300 // setRuntimeError sets runtime_status["error"] to the given value.
301 // Container should already have state==Locked or Running.
302 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
303         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
304                 "container": {
305                         "runtime_status": {
306                                 "error": errorString,
307                         },
308                 },
309         })
310 }
311
312 // Cancel cancels the given container.
313 func (cq *Queue) Cancel(uuid string) error {
314         var resp arvados.Container
315         err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
316                 "container": {"state": arvados.ContainerStateCancelled},
317         })
318         if err != nil {
319                 return err
320         }
321         cq.updateWithResp(uuid, resp)
322         return nil
323 }
324
325 func (cq *Queue) apiUpdate(uuid, action string) error {
326         var resp arvados.Container
327         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
328         if err != nil {
329                 return err
330         }
331         cq.updateWithResp(uuid, resp)
332         return nil
333 }
334
335 // Update the local queue with the response received from a
336 // state-changing API request (lock/unlock/cancel).
337 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
338         cq.mtx.Lock()
339         defer cq.mtx.Unlock()
340         if cq.dontupdate != nil {
341                 cq.dontupdate[uuid] = struct{}{}
342         }
343         ent, ok := cq.current[uuid]
344         if !ok {
345                 // Container is not in queue (e.g., it was not added
346                 // because there is no suitable instance type, and
347                 // we're just locking/updating it in order to set an
348                 // error message). No need to add it, and we don't
349                 // necessarily have enough information to add it here
350                 // anyway because lock/unlock responses don't include
351                 // runtime_constraints.
352                 return
353         }
354         ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
355         cq.current[uuid] = ent
356         cq.notify()
357 }
358
359 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
360         cq.mtx.Lock()
361         size := len(cq.current)
362         auth := cq.auth
363         cq.mtx.Unlock()
364
365         if auth == nil {
366                 auth = &arvados.APIClientAuthorization{}
367                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
368                 if err != nil {
369                         return nil, err
370                 }
371                 cq.mtx.Lock()
372                 cq.auth = auth
373                 cq.mtx.Unlock()
374         }
375
376         next := make(map[string]*arvados.Container, size)
377         apply := func(updates []arvados.Container) {
378                 for _, upd := range updates {
379                         if next[upd.UUID] == nil {
380                                 next[upd.UUID] = &arvados.Container{}
381                         }
382                         *next[upd.UUID] = upd
383                 }
384         }
385         selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
386         limitParam := 1000
387
388         mine, err := cq.fetchAll(arvados.ResourceListParams{
389                 Select:  selectParam,
390                 Order:   "uuid",
391                 Limit:   &limitParam,
392                 Count:   "none",
393                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
394         })
395         if err != nil {
396                 return nil, err
397         }
398         apply(mine)
399
400         avail, err := cq.fetchAll(arvados.ResourceListParams{
401                 Select:  selectParam,
402                 Order:   "uuid",
403                 Limit:   &limitParam,
404                 Count:   "none",
405                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
406         })
407         if err != nil {
408                 return nil, err
409         }
410         apply(avail)
411
412         missing := map[string]bool{}
413         cq.mtx.Lock()
414         for uuid, ent := range cq.current {
415                 if next[uuid] == nil &&
416                         ent.Container.State != arvados.ContainerStateCancelled &&
417                         ent.Container.State != arvados.ContainerStateComplete {
418                         missing[uuid] = true
419                 }
420         }
421         cq.mtx.Unlock()
422
423         for len(missing) > 0 {
424                 var batch []string
425                 for uuid := range missing {
426                         batch = append(batch, uuid)
427                         if len(batch) == 20 {
428                                 break
429                         }
430                 }
431                 filters := []arvados.Filter{{"uuid", "in", batch}}
432                 ended, err := cq.fetchAll(arvados.ResourceListParams{
433                         Select:  selectParam,
434                         Order:   "uuid",
435                         Count:   "none",
436                         Filters: filters,
437                 })
438                 if err != nil {
439                         return nil, err
440                 }
441                 apply(ended)
442                 if len(ended) == 0 {
443                         // This is the only case where we can conclude
444                         // a container has been deleted from the
445                         // database. A short (but non-zero) page, on
446                         // the other hand, can be caused by a response
447                         // size limit.
448                         for _, uuid := range batch {
449                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
450                                 delete(missing, uuid)
451                                 cq.mtx.Lock()
452                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
453                                 cq.mtx.Unlock()
454                         }
455                         continue
456                 }
457                 for _, ctr := range ended {
458                         if _, ok := missing[ctr.UUID]; !ok {
459                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
460                                 cq.logger.WithFields(logrus.Fields{
461                                         "ContainerUUID": ctr.UUID,
462                                         "Filters":       filters,
463                                 }).Error(msg)
464                                 return nil, errors.New(msg)
465                         }
466                         delete(missing, ctr.UUID)
467                 }
468         }
469         return next, nil
470 }
471
472 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
473         var results []arvados.Container
474         params := initialParams
475         params.Offset = 0
476         for {
477                 // This list variable must be a new one declared
478                 // inside the loop: otherwise, items in the API
479                 // response would get deep-merged into the items
480                 // loaded in previous iterations.
481                 var list arvados.ContainerList
482
483                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
484                 if err != nil {
485                         return nil, err
486                 }
487                 if len(list.Items) == 0 {
488                         break
489                 }
490
491                 results = append(results, list.Items...)
492                 if len(params.Order) == 1 && params.Order == "uuid" {
493                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
494                 } else {
495                         params.Offset += len(list.Items)
496                 }
497         }
498         return results, nil
499 }
500
501 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
502         mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
503                 Namespace: "arvados",
504                 Subsystem: "dispatchcloud",
505                 Name:      "queue_entries",
506                 Help:      "Number of active container entries in the controller database.",
507         }, []string{"state", "instance_type"})
508         reg.MustRegister(mEntries)
509
510         type entKey struct {
511                 state arvados.ContainerState
512                 inst  string
513         }
514         count := map[entKey]int{}
515
516         ch := cq.Subscribe()
517         defer cq.Unsubscribe(ch)
518         for range ch {
519                 for k := range count {
520                         count[k] = 0
521                 }
522                 ents, _ := cq.Entries()
523                 for _, ent := range ents {
524                         count[entKey{ent.Container.State, ent.InstanceType.Name}]++
525                 }
526                 for k, v := range count {
527                         mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
528                 }
529         }
530 }