Bump loofah from 2.2.3 to 2.3.1 in /apps/workbench
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
19
20 // An APIClient performs Arvados API requests. It is typically an
21 // *arvados.Client.
22 type APIClient interface {
23         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
24 }
25
26 // A QueueEnt is an entry in the queue, consisting of a container
27 // record and the instance type that should be used to run it.
28 type QueueEnt struct {
29         // The container to run. Only the UUID, State, Priority, and
30         // RuntimeConstraints fields are populated.
31         Container    arvados.Container    `json:"container"`
32         InstanceType arvados.InstanceType `json:"instance_type"`
33 }
34
35 // String implements fmt.Stringer by returning the queued container's
36 // UUID.
37 func (c *QueueEnt) String() string {
38         return c.Container.UUID
39 }
40
41 // A Queue is an interface to an Arvados cluster's container
42 // database. It presents only the containers that are eligible to be
43 // run by, are already being run by, or have recently been run by the
44 // present dispatcher.
45 //
46 // The Entries, Get, and Forget methods do not block: they return
47 // immediately, using cached data.
48 //
49 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
50 // return only after the operation has completed.
51 //
52 // A Queue's Update method should be called periodically to keep the
53 // cache up to date.
54 type Queue struct {
55         logger     logrus.FieldLogger
56         chooseType typeChooser
57         client     APIClient
58
59         auth    *arvados.APIClientAuthorization
60         current map[string]QueueEnt
61         updated time.Time
62         mtx     sync.Mutex
63
64         // Methods that modify the Queue (like Lock) add the affected
65         // container UUIDs to dontupdate. When applying a batch of
66         // updates received from the network, anything appearing in
67         // dontupdate is skipped, in case the received update has
68         // already been superseded by the locally initiated change.
69         // When no network update is in progress, this protection is
70         // not needed, and dontupdate is nil.
71         dontupdate map[string]struct{}
72
73         // active notification subscribers (see Subscribe)
74         subscribers map[<-chan struct{}]chan struct{}
75 }
76
77 // NewQueue returns a new Queue. When a new container appears in the
78 // Arvados cluster's queue during Update, chooseType will be called to
79 // assign an appropriate arvados.InstanceType for the queue entry.
80 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
81         cq := &Queue{
82                 logger:      logger,
83                 chooseType:  chooseType,
84                 client:      client,
85                 current:     map[string]QueueEnt{},
86                 subscribers: map[<-chan struct{}]chan struct{}{},
87         }
88         if reg != nil {
89                 go cq.runMetrics(reg)
90         }
91         return cq
92 }
93
94 // Subscribe returns a channel that becomes ready to receive when an
95 // entry in the Queue is updated.
96 //
97 //      ch := q.Subscribe()
98 //      defer q.Unsubscribe(ch)
99 //      for range ch {
100 //              // ...
101 //      }
102 func (cq *Queue) Subscribe() <-chan struct{} {
103         cq.mtx.Lock()
104         defer cq.mtx.Unlock()
105         ch := make(chan struct{}, 1)
106         cq.subscribers[ch] = ch
107         return ch
108 }
109
110 // Unsubscribe stops sending updates to the given channel. See
111 // Subscribe.
112 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
113         cq.mtx.Lock()
114         defer cq.mtx.Unlock()
115         delete(cq.subscribers, ch)
116 }
117
118 // Caller must have lock.
119 func (cq *Queue) notify() {
120         for _, ch := range cq.subscribers {
121                 select {
122                 case ch <- struct{}{}:
123                 default:
124                 }
125         }
126 }
127
128 // Forget drops the specified container from the cache. It should be
129 // called on finalized containers to avoid leaking memory over
130 // time. It is a no-op if the indicated container is not in a
131 // finalized state.
132 func (cq *Queue) Forget(uuid string) {
133         cq.mtx.Lock()
134         defer cq.mtx.Unlock()
135         ctr := cq.current[uuid].Container
136         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
137                 cq.delEnt(uuid, ctr.State)
138         }
139 }
140
141 // Get returns the (partial) Container record for the specified
142 // container. Like a map lookup, its second return value is false if
143 // the specified container is not in the Queue.
144 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
145         cq.mtx.Lock()
146         defer cq.mtx.Unlock()
147         if ctr, ok := cq.current[uuid]; !ok {
148                 return arvados.Container{}, false
149         } else {
150                 return ctr.Container, true
151         }
152 }
153
154 // Entries returns all cache entries, keyed by container UUID.
155 //
156 // The returned threshold indicates the maximum age of any cached data
157 // returned in the map. This makes it possible for a scheduler to
158 // determine correctly the outcome of a remote process that updates
159 // container state. It must first wait for the remote process to exit,
160 // then wait for the Queue to start and finish its next Update --
161 // i.e., it must wait until threshold > timeProcessExited.
162 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
163         cq.mtx.Lock()
164         defer cq.mtx.Unlock()
165         entries = make(map[string]QueueEnt, len(cq.current))
166         for uuid, ctr := range cq.current {
167                 entries[uuid] = ctr
168         }
169         threshold = cq.updated
170         return
171 }
172
173 // Update refreshes the cache from the Arvados API. It adds newly
174 // queued containers, and updates the state of previously queued
175 // containers.
176 func (cq *Queue) Update() error {
177         cq.mtx.Lock()
178         cq.dontupdate = map[string]struct{}{}
179         updateStarted := time.Now()
180         cq.mtx.Unlock()
181
182         next, err := cq.poll()
183         if err != nil {
184                 return err
185         }
186
187         cq.mtx.Lock()
188         defer cq.mtx.Unlock()
189         for uuid, ctr := range next {
190                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
191                         // Don't clobber a local update that happened
192                         // after we started polling.
193                         continue
194                 }
195                 if cur, ok := cq.current[uuid]; !ok {
196                         cq.addEnt(uuid, *ctr)
197                 } else {
198                         cur.Container = *ctr
199                         cq.current[uuid] = cur
200                 }
201         }
202         for uuid, ent := range cq.current {
203                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
204                         // Don't expunge an entry that was
205                         // added/updated locally after we started
206                         // polling.
207                         continue
208                 } else if _, stillpresent := next[uuid]; !stillpresent {
209                         // Expunge an entry that no longer appears in
210                         // the poll response (evidently it's
211                         // cancelled, completed, deleted, or taken by
212                         // a different dispatcher).
213                         cq.delEnt(uuid, ent.Container.State)
214                 }
215         }
216         cq.dontupdate = nil
217         cq.updated = updateStarted
218         cq.notify()
219         return nil
220 }
221
222 // Caller must have lock.
223 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
224         cq.logger.WithFields(logrus.Fields{
225                 "ContainerUUID": uuid,
226                 "State":         state,
227         }).Info("dropping container from queue")
228         delete(cq.current, uuid)
229 }
230
231 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
232         it, err := cq.chooseType(&ctr)
233         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
234                 // We assume here that any chooseType error is a hard
235                 // error: it wouldn't help to try again, or to leave
236                 // it for a different dispatcher process to attempt.
237                 errorString := err.Error()
238                 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
239                 logger.WithError(err).Warn("cancel container with no suitable instance type")
240                 go func() {
241                         if ctr.State == arvados.ContainerStateQueued {
242                                 // Can't set runtime error without
243                                 // locking first. If Lock() is
244                                 // successful, it will call addEnt()
245                                 // again itself, and we'll fall
246                                 // through to the
247                                 // setRuntimeError/Cancel code below.
248                                 err := cq.Lock(ctr.UUID)
249                                 if err != nil {
250                                         logger.WithError(err).Warn("lock failed")
251                                         // ...and try again on the
252                                         // next Update, if the problem
253                                         // still exists.
254                                 }
255                                 return
256                         }
257                         var err error
258                         defer func() {
259                                 if err == nil {
260                                         return
261                                 }
262                                 // On failure, check current container
263                                 // state, and don't log the error if
264                                 // the failure came from losing a
265                                 // race.
266                                 var latest arvados.Container
267                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
268                                 if latest.State == arvados.ContainerStateCancelled {
269                                         return
270                                 }
271                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
272                         }()
273                         err = cq.setRuntimeError(ctr.UUID, errorString)
274                         if err != nil {
275                                 return
276                         }
277                         err = cq.Cancel(ctr.UUID)
278                         if err != nil {
279                                 return
280                         }
281                 }()
282                 return
283         }
284         cq.logger.WithFields(logrus.Fields{
285                 "ContainerUUID": ctr.UUID,
286                 "State":         ctr.State,
287                 "Priority":      ctr.Priority,
288                 "InstanceType":  it.Name,
289         }).Info("adding container to queue")
290         cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
291 }
292
293 // Lock acquires the dispatch lock for the given container.
294 func (cq *Queue) Lock(uuid string) error {
295         return cq.apiUpdate(uuid, "lock")
296 }
297
298 // Unlock releases the dispatch lock for the given container.
299 func (cq *Queue) Unlock(uuid string) error {
300         return cq.apiUpdate(uuid, "unlock")
301 }
302
303 // setRuntimeError sets runtime_status["error"] to the given value.
304 // Container should already have state==Locked or Running.
305 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
306         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
307                 "container": {
308                         "runtime_status": {
309                                 "error": errorString,
310                         },
311                 },
312         })
313 }
314
315 // Cancel cancels the given container.
316 func (cq *Queue) Cancel(uuid string) error {
317         var resp arvados.Container
318         err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
319                 "container": {"state": arvados.ContainerStateCancelled},
320         })
321         if err != nil {
322                 return err
323         }
324         cq.updateWithResp(uuid, resp)
325         return nil
326 }
327
328 func (cq *Queue) apiUpdate(uuid, action string) error {
329         var resp arvados.Container
330         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
331         if err != nil {
332                 return err
333         }
334         cq.updateWithResp(uuid, resp)
335         return nil
336 }
337
338 // Update the local queue with the response received from a
339 // state-changing API request (lock/unlock/cancel).
340 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
341         cq.mtx.Lock()
342         defer cq.mtx.Unlock()
343         if cq.dontupdate != nil {
344                 cq.dontupdate[uuid] = struct{}{}
345         }
346         if ent, ok := cq.current[uuid]; !ok {
347                 cq.addEnt(uuid, resp)
348         } else {
349                 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
350                 cq.current[uuid] = ent
351         }
352         cq.notify()
353 }
354
355 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
356         cq.mtx.Lock()
357         size := len(cq.current)
358         auth := cq.auth
359         cq.mtx.Unlock()
360
361         if auth == nil {
362                 auth = &arvados.APIClientAuthorization{}
363                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
364                 if err != nil {
365                         return nil, err
366                 }
367                 cq.mtx.Lock()
368                 cq.auth = auth
369                 cq.mtx.Unlock()
370         }
371
372         next := make(map[string]*arvados.Container, size)
373         apply := func(updates []arvados.Container) {
374                 for _, upd := range updates {
375                         if next[upd.UUID] == nil {
376                                 next[upd.UUID] = &arvados.Container{}
377                         }
378                         *next[upd.UUID] = upd
379                 }
380         }
381         selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
382         limitParam := 1000
383
384         mine, err := cq.fetchAll(arvados.ResourceListParams{
385                 Select:  selectParam,
386                 Order:   "uuid",
387                 Limit:   &limitParam,
388                 Count:   "none",
389                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
390         })
391         if err != nil {
392                 return nil, err
393         }
394         apply(mine)
395
396         avail, err := cq.fetchAll(arvados.ResourceListParams{
397                 Select:  selectParam,
398                 Order:   "uuid",
399                 Limit:   &limitParam,
400                 Count:   "none",
401                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
402         })
403         if err != nil {
404                 return nil, err
405         }
406         apply(avail)
407
408         missing := map[string]bool{}
409         cq.mtx.Lock()
410         for uuid, ent := range cq.current {
411                 if next[uuid] == nil &&
412                         ent.Container.State != arvados.ContainerStateCancelled &&
413                         ent.Container.State != arvados.ContainerStateComplete {
414                         missing[uuid] = true
415                 }
416         }
417         cq.mtx.Unlock()
418
419         for len(missing) > 0 {
420                 var batch []string
421                 for uuid := range missing {
422                         batch = append(batch, uuid)
423                         if len(batch) == 20 {
424                                 break
425                         }
426                 }
427                 filters := []arvados.Filter{{"uuid", "in", batch}}
428                 ended, err := cq.fetchAll(arvados.ResourceListParams{
429                         Select:  selectParam,
430                         Order:   "uuid",
431                         Count:   "none",
432                         Filters: filters,
433                 })
434                 if err != nil {
435                         return nil, err
436                 }
437                 apply(ended)
438                 if len(ended) == 0 {
439                         // This is the only case where we can conclude
440                         // a container has been deleted from the
441                         // database. A short (but non-zero) page, on
442                         // the other hand, can be caused by a response
443                         // size limit.
444                         for _, uuid := range batch {
445                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
446                                 delete(missing, uuid)
447                                 cq.mtx.Lock()
448                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
449                                 cq.mtx.Unlock()
450                         }
451                         continue
452                 }
453                 for _, ctr := range ended {
454                         if _, ok := missing[ctr.UUID]; !ok {
455                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
456                                 cq.logger.WithFields(logrus.Fields{
457                                         "ContainerUUID": ctr.UUID,
458                                         "Filters":       filters,
459                                 }).Error(msg)
460                                 return nil, errors.New(msg)
461                         }
462                         delete(missing, ctr.UUID)
463                 }
464         }
465         return next, nil
466 }
467
468 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
469         var results []arvados.Container
470         params := initialParams
471         params.Offset = 0
472         for {
473                 // This list variable must be a new one declared
474                 // inside the loop: otherwise, items in the API
475                 // response would get deep-merged into the items
476                 // loaded in previous iterations.
477                 var list arvados.ContainerList
478
479                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
480                 if err != nil {
481                         return nil, err
482                 }
483                 if len(list.Items) == 0 {
484                         break
485                 }
486
487                 results = append(results, list.Items...)
488                 if len(params.Order) == 1 && params.Order == "uuid" {
489                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
490                 } else {
491                         params.Offset += len(list.Items)
492                 }
493         }
494         return results, nil
495 }
496
497 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
498         mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
499                 Namespace: "arvados",
500                 Subsystem: "dispatchcloud",
501                 Name:      "queue_entries",
502                 Help:      "Number of active container entries in the controller database.",
503         }, []string{"state", "instance_type"})
504         reg.MustRegister(mEntries)
505
506         type entKey struct {
507                 state arvados.ContainerState
508                 inst  string
509         }
510         count := map[entKey]int{}
511
512         ch := cq.Subscribe()
513         defer cq.Unsubscribe(ch)
514         for range ch {
515                 for k := range count {
516                         count[k] = 0
517                 }
518                 ents, _ := cq.Entries()
519                 for _, ent := range ents {
520                         count[entKey{ent.Container.State, ent.InstanceType.Name}]++
521                 }
522                 for k, v := range count {
523                         mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
524                 }
525         }
526 }