5f6fdefbd51052289407930bbd0aa14b0ae6039e
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
19
20 // An APIClient performs Arvados API requests. It is typically an
21 // *arvados.Client.
22 type APIClient interface {
23         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
24 }
25
26 // A QueueEnt is an entry in the queue, consisting of a container
27 // record and the instance type that should be used to run it.
28 type QueueEnt struct {
29         // The container to run. Only the UUID, State, Priority,
30         // RuntimeConstraints, Mounts, and ContainerImage fields are
31         // populated.
32         Container    arvados.Container    `json:"container"`
33         InstanceType arvados.InstanceType `json:"instance_type"`
34         FirstSeenAt  time.Time            `json:"first_seen_at"`
35 }
36
37 // String implements fmt.Stringer by returning the queued container's
38 // UUID.
39 func (c *QueueEnt) String() string {
40         return c.Container.UUID
41 }
42
43 // A Queue is an interface to an Arvados cluster's container
44 // database. It presents only the containers that are eligible to be
45 // run by, are already being run by, or have recently been run by the
46 // present dispatcher.
47 //
48 // The Entries, Get, and Forget methods do not block: they return
49 // immediately, using cached data.
50 //
51 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
52 // return only after the operation has completed.
53 //
54 // A Queue's Update method should be called periodically to keep the
55 // cache up to date.
56 type Queue struct {
57         logger     logrus.FieldLogger
58         chooseType typeChooser
59         client     APIClient
60
61         auth    *arvados.APIClientAuthorization
62         current map[string]QueueEnt
63         updated time.Time
64         mtx     sync.Mutex
65
66         // Methods that modify the Queue (like Lock) add the affected
67         // container UUIDs to dontupdate. When applying a batch of
68         // updates received from the network, anything appearing in
69         // dontupdate is skipped, in case the received update has
70         // already been superseded by the locally initiated change.
71         // When no network update is in progress, this protection is
72         // not needed, and dontupdate is nil.
73         dontupdate map[string]struct{}
74
75         // active notification subscribers (see Subscribe)
76         subscribers map[<-chan struct{}]chan struct{}
77 }
78
79 // NewQueue returns a new Queue. When a new container appears in the
80 // Arvados cluster's queue during Update, chooseType will be called to
81 // assign an appropriate arvados.InstanceType for the queue entry.
82 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
83         cq := &Queue{
84                 logger:      logger,
85                 chooseType:  chooseType,
86                 client:      client,
87                 current:     map[string]QueueEnt{},
88                 subscribers: map[<-chan struct{}]chan struct{}{},
89         }
90         if reg != nil {
91                 go cq.runMetrics(reg)
92         }
93         return cq
94 }
95
96 // Subscribe returns a channel that becomes ready to receive when an
97 // entry in the Queue is updated.
98 //
99 //      ch := q.Subscribe()
100 //      defer q.Unsubscribe(ch)
101 //      for range ch {
102 //              // ...
103 //      }
104 func (cq *Queue) Subscribe() <-chan struct{} {
105         cq.mtx.Lock()
106         defer cq.mtx.Unlock()
107         ch := make(chan struct{}, 1)
108         cq.subscribers[ch] = ch
109         return ch
110 }
111
112 // Unsubscribe stops sending updates to the given channel. See
113 // Subscribe.
114 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
115         cq.mtx.Lock()
116         defer cq.mtx.Unlock()
117         delete(cq.subscribers, ch)
118 }
119
120 // Caller must have lock.
121 func (cq *Queue) notify() {
122         for _, ch := range cq.subscribers {
123                 select {
124                 case ch <- struct{}{}:
125                 default:
126                 }
127         }
128 }
129
130 // Forget drops the specified container from the cache. It should be
131 // called on finalized containers to avoid leaking memory over
132 // time. It is a no-op if the indicated container is not in a
133 // finalized state.
134 func (cq *Queue) Forget(uuid string) {
135         cq.mtx.Lock()
136         defer cq.mtx.Unlock()
137         ctr := cq.current[uuid].Container
138         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
139                 cq.delEnt(uuid, ctr.State)
140         }
141 }
142
143 // Get returns the (partial) Container record for the specified
144 // container. Like a map lookup, its second return value is false if
145 // the specified container is not in the Queue.
146 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
147         cq.mtx.Lock()
148         defer cq.mtx.Unlock()
149         ctr, ok := cq.current[uuid]
150         if !ok {
151                 return arvados.Container{}, false
152         }
153         return ctr.Container, true
154 }
155
156 // Entries returns all cache entries, keyed by container UUID.
157 //
158 // The returned threshold indicates the maximum age of any cached data
159 // returned in the map. This makes it possible for a scheduler to
160 // determine correctly the outcome of a remote process that updates
161 // container state. It must first wait for the remote process to exit,
162 // then wait for the Queue to start and finish its next Update --
163 // i.e., it must wait until threshold > timeProcessExited.
164 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
165         cq.mtx.Lock()
166         defer cq.mtx.Unlock()
167         entries = make(map[string]QueueEnt, len(cq.current))
168         for uuid, ctr := range cq.current {
169                 entries[uuid] = ctr
170         }
171         threshold = cq.updated
172         return
173 }
174
175 // Update refreshes the cache from the Arvados API. It adds newly
176 // queued containers, and updates the state of previously queued
177 // containers.
178 func (cq *Queue) Update() error {
179         cq.mtx.Lock()
180         cq.dontupdate = map[string]struct{}{}
181         updateStarted := time.Now()
182         cq.mtx.Unlock()
183
184         next, err := cq.poll()
185         if err != nil {
186                 return err
187         }
188
189         cq.mtx.Lock()
190         defer cq.mtx.Unlock()
191         for uuid, ctr := range next {
192                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
193                         // Don't clobber a local update that happened
194                         // after we started polling.
195                         continue
196                 }
197                 if cur, ok := cq.current[uuid]; !ok {
198                         cq.addEnt(uuid, *ctr)
199                 } else {
200                         cur.Container = *ctr
201                         cq.current[uuid] = cur
202                 }
203         }
204         for uuid, ent := range cq.current {
205                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
206                         // Don't expunge an entry that was
207                         // added/updated locally after we started
208                         // polling.
209                         continue
210                 } else if _, stillpresent := next[uuid]; !stillpresent {
211                         // Expunge an entry that no longer appears in
212                         // the poll response (evidently it's
213                         // cancelled, completed, deleted, or taken by
214                         // a different dispatcher).
215                         cq.delEnt(uuid, ent.Container.State)
216                 }
217         }
218         cq.dontupdate = nil
219         cq.updated = updateStarted
220         cq.notify()
221         return nil
222 }
223
224 // Caller must have lock.
225 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
226         cq.logger.WithFields(logrus.Fields{
227                 "ContainerUUID": uuid,
228                 "State":         state,
229         }).Info("dropping container from queue")
230         delete(cq.current, uuid)
231 }
232
233 // Caller must have lock.
234 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
235         it, err := cq.chooseType(&ctr)
236
237         // Avoid wasting memory on a large Mounts attr (we don't need
238         // it after choosing type).
239         ctr.Mounts = nil
240
241         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
242                 // We assume here that any chooseType error is a hard
243                 // error: it wouldn't help to try again, or to leave
244                 // it for a different dispatcher process to attempt.
245                 errorString := err.Error()
246                 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
247                 logger.WithError(err).Warn("cancel container with no suitable instance type")
248                 go func() {
249                         if ctr.State == arvados.ContainerStateQueued {
250                                 // Can't set runtime error without
251                                 // locking first.
252                                 err := cq.Lock(ctr.UUID)
253                                 if err != nil {
254                                         logger.WithError(err).Warn("lock failed")
255                                         return
256                                         // ...and try again on the
257                                         // next Update, if the problem
258                                         // still exists.
259                                 }
260                         }
261                         var err error
262                         defer func() {
263                                 if err == nil {
264                                         return
265                                 }
266                                 // On failure, check current container
267                                 // state, and don't log the error if
268                                 // the failure came from losing a
269                                 // race.
270                                 var latest arvados.Container
271                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
272                                 if latest.State == arvados.ContainerStateCancelled {
273                                         return
274                                 }
275                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
276                         }()
277                         err = cq.setRuntimeError(ctr.UUID, errorString)
278                         if err != nil {
279                                 return
280                         }
281                         err = cq.Cancel(ctr.UUID)
282                         if err != nil {
283                                 return
284                         }
285                 }()
286                 return
287         }
288         cq.logger.WithFields(logrus.Fields{
289                 "ContainerUUID": ctr.UUID,
290                 "State":         ctr.State,
291                 "Priority":      ctr.Priority,
292                 "InstanceType":  it.Name,
293         }).Info("adding container to queue")
294         cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
295 }
296
297 // Lock acquires the dispatch lock for the given container.
298 func (cq *Queue) Lock(uuid string) error {
299         return cq.apiUpdate(uuid, "lock")
300 }
301
302 // Unlock releases the dispatch lock for the given container.
303 func (cq *Queue) Unlock(uuid string) error {
304         return cq.apiUpdate(uuid, "unlock")
305 }
306
307 // setRuntimeError sets runtime_status["error"] to the given value.
308 // Container should already have state==Locked or Running.
309 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
310         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
311                 "container": {
312                         "runtime_status": {
313                                 "error": errorString,
314                         },
315                 },
316         })
317 }
318
319 // Cancel cancels the given container.
320 func (cq *Queue) Cancel(uuid string) error {
321         var resp arvados.Container
322         err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
323                 "container": {"state": arvados.ContainerStateCancelled},
324         })
325         if err != nil {
326                 return err
327         }
328         cq.updateWithResp(uuid, resp)
329         return nil
330 }
331
332 func (cq *Queue) apiUpdate(uuid, action string) error {
333         var resp arvados.Container
334         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
335         if err != nil {
336                 return err
337         }
338         cq.updateWithResp(uuid, resp)
339         return nil
340 }
341
342 // Update the local queue with the response received from a
343 // state-changing API request (lock/unlock/cancel).
344 func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
345         cq.mtx.Lock()
346         defer cq.mtx.Unlock()
347         if cq.dontupdate != nil {
348                 cq.dontupdate[uuid] = struct{}{}
349         }
350         ent, ok := cq.current[uuid]
351         if !ok {
352                 // Container is not in queue (e.g., it was not added
353                 // because there is no suitable instance type, and
354                 // we're just locking/updating it in order to set an
355                 // error message). No need to add it, and we don't
356                 // necessarily have enough information to add it here
357                 // anyway because lock/unlock responses don't include
358                 // runtime_constraints.
359                 return
360         }
361         ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
362         cq.current[uuid] = ent
363         cq.notify()
364 }
365
366 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
367         cq.mtx.Lock()
368         size := len(cq.current)
369         auth := cq.auth
370         cq.mtx.Unlock()
371
372         if auth == nil {
373                 auth = &arvados.APIClientAuthorization{}
374                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
375                 if err != nil {
376                         return nil, err
377                 }
378                 cq.mtx.Lock()
379                 cq.auth = auth
380                 cq.mtx.Unlock()
381         }
382
383         next := make(map[string]*arvados.Container, size)
384         apply := func(updates []arvados.Container) {
385                 for _, upd := range updates {
386                         if next[upd.UUID] == nil {
387                                 next[upd.UUID] = &arvados.Container{}
388                         }
389                         *next[upd.UUID] = upd
390                 }
391         }
392         selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
393         limitParam := 1000
394
395         mine, err := cq.fetchAll(arvados.ResourceListParams{
396                 Select:  selectParam,
397                 Order:   "uuid",
398                 Limit:   &limitParam,
399                 Count:   "none",
400                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
401         })
402         if err != nil {
403                 return nil, err
404         }
405         apply(mine)
406
407         avail, err := cq.fetchAll(arvados.ResourceListParams{
408                 Select:  selectParam,
409                 Order:   "uuid",
410                 Limit:   &limitParam,
411                 Count:   "none",
412                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
413         })
414         if err != nil {
415                 return nil, err
416         }
417         apply(avail)
418
419         missing := map[string]bool{}
420         cq.mtx.Lock()
421         for uuid, ent := range cq.current {
422                 if next[uuid] == nil &&
423                         ent.Container.State != arvados.ContainerStateCancelled &&
424                         ent.Container.State != arvados.ContainerStateComplete {
425                         missing[uuid] = true
426                 }
427         }
428         cq.mtx.Unlock()
429
430         for len(missing) > 0 {
431                 var batch []string
432                 for uuid := range missing {
433                         batch = append(batch, uuid)
434                         if len(batch) == 20 {
435                                 break
436                         }
437                 }
438                 filters := []arvados.Filter{{"uuid", "in", batch}}
439                 ended, err := cq.fetchAll(arvados.ResourceListParams{
440                         Select:  selectParam,
441                         Order:   "uuid",
442                         Count:   "none",
443                         Filters: filters,
444                 })
445                 if err != nil {
446                         return nil, err
447                 }
448                 apply(ended)
449                 if len(ended) == 0 {
450                         // This is the only case where we can conclude
451                         // a container has been deleted from the
452                         // database. A short (but non-zero) page, on
453                         // the other hand, can be caused by a response
454                         // size limit.
455                         for _, uuid := range batch {
456                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
457                                 delete(missing, uuid)
458                                 cq.mtx.Lock()
459                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
460                                 cq.mtx.Unlock()
461                         }
462                         continue
463                 }
464                 for _, ctr := range ended {
465                         if _, ok := missing[ctr.UUID]; !ok {
466                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
467                                 cq.logger.WithFields(logrus.Fields{
468                                         "ContainerUUID": ctr.UUID,
469                                         "Filters":       filters,
470                                 }).Error(msg)
471                                 return nil, errors.New(msg)
472                         }
473                         delete(missing, ctr.UUID)
474                 }
475         }
476         return next, nil
477 }
478
479 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
480         var results []arvados.Container
481         params := initialParams
482         params.Offset = 0
483         for {
484                 // This list variable must be a new one declared
485                 // inside the loop: otherwise, items in the API
486                 // response would get deep-merged into the items
487                 // loaded in previous iterations.
488                 var list arvados.ContainerList
489
490                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
491                 if err != nil {
492                         return nil, err
493                 }
494                 if len(list.Items) == 0 {
495                         break
496                 }
497
498                 results = append(results, list.Items...)
499                 if len(params.Order) == 1 && params.Order == "uuid" {
500                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
501                 } else {
502                         params.Offset += len(list.Items)
503                 }
504         }
505         return results, nil
506 }
507
508 func (cq *Queue) runMetrics(reg *prometheus.Registry) {
509         mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
510                 Namespace: "arvados",
511                 Subsystem: "dispatchcloud",
512                 Name:      "queue_entries",
513                 Help:      "Number of active container entries in the controller database.",
514         }, []string{"state", "instance_type"})
515         reg.MustRegister(mEntries)
516
517         type entKey struct {
518                 state arvados.ContainerState
519                 inst  string
520         }
521         count := map[entKey]int{}
522
523         ch := cq.Subscribe()
524         defer cq.Unsubscribe(ch)
525         for range ch {
526                 for k := range count {
527                         count[k] = 0
528                 }
529                 ents, _ := cq.Entries()
530                 for _, ent := range ents {
531                         count[entKey{ent.Container.State, ent.InstanceType.Name}]++
532                 }
533                 for k, v := range count {
534                         mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
535                 }
536         }
537 }