14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "io"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "github.com/prometheus/client_golang/prometheus"
14         "github.com/sirupsen/logrus"
15 )
16
17 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
18
19 // An APIClient performs Arvados API requests. It is typically an
20 // *arvados.Client.
21 type APIClient interface {
22         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
23 }
24
25 // A QueueEnt is an entry in the queue, consisting of a container
26 // record and the instance type that should be used to run it.
27 type QueueEnt struct {
28         // The container to run. Only the UUID, State, Priority, and
29         // RuntimeConstraints fields are populated.
30         Container    arvados.Container    `json:"container"`
31         InstanceType arvados.InstanceType `json:"instance_type"`
32 }
33
34 // String implements fmt.Stringer by returning the queued container's
35 // UUID.
36 func (c *QueueEnt) String() string {
37         return c.Container.UUID
38 }
39
40 // A Queue is an interface to an Arvados cluster's container
41 // database. It presents only the containers that are eligible to be
42 // run by, are already being run by, or have recently been run by the
43 // present dispatcher.
44 //
45 // The Entries, Get, and Forget methods do not block: they return
46 // immediately, using cached data.
47 //
48 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
49 // return only after the operation has completed.
50 //
51 // A Queue's Update method should be called periodically to keep the
52 // cache up to date.
53 type Queue struct {
54         logger     logrus.FieldLogger
55         reg        *prometheus.Registry
56         chooseType typeChooser
57         client     APIClient
58
59         auth    *arvados.APIClientAuthorization
60         current map[string]QueueEnt
61         updated time.Time
62         mtx     sync.Mutex
63
64         // Methods that modify the Queue (like Lock) add the affected
65         // container UUIDs to dontupdate. When applying a batch of
66         // updates received from the network, anything appearing in
67         // dontupdate is skipped, in case the received update has
68         // already been superseded by the locally initiated change.
69         // When no network update is in progress, this protection is
70         // not needed, and dontupdate is nil.
71         dontupdate map[string]struct{}
72
73         // active notification subscribers (see Subscribe)
74         subscribers map[<-chan struct{}]chan struct{}
75 }
76
77 // NewQueue returns a new Queue. When a new container appears in the
78 // Arvados cluster's queue during Update, chooseType will be called to
79 // assign an appropriate arvados.InstanceType for the queue entry.
80 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
81         return &Queue{
82                 logger:      logger,
83                 reg:         reg,
84                 chooseType:  chooseType,
85                 client:      client,
86                 current:     map[string]QueueEnt{},
87                 subscribers: map[<-chan struct{}]chan struct{}{},
88         }
89 }
90
91 // Subscribe returns a channel that becomes ready to receive when an
92 // entry in the Queue is updated.
93 //
94 //      ch := q.Subscribe()
95 //      defer q.Unsubscribe(ch)
96 //      for range ch {
97 //              // ...
98 //      }
99 func (cq *Queue) Subscribe() <-chan struct{} {
100         cq.mtx.Lock()
101         defer cq.mtx.Unlock()
102         ch := make(chan struct{}, 1)
103         cq.subscribers[ch] = ch
104         return ch
105 }
106
107 // Unsubscribe stops sending updates to the given channel. See
108 // Subscribe.
109 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
110         cq.mtx.Lock()
111         defer cq.mtx.Unlock()
112         delete(cq.subscribers, ch)
113 }
114
115 // Caller must have lock.
116 func (cq *Queue) notify() {
117         for _, ch := range cq.subscribers {
118                 select {
119                 case ch <- struct{}{}:
120                 default:
121                 }
122         }
123 }
124
125 // Forget drops the specified container from the cache. It should be
126 // called on finalized containers to avoid leaking memory over
127 // time. It is a no-op if the indicated container is not in a
128 // finalized state.
129 func (cq *Queue) Forget(uuid string) {
130         cq.mtx.Lock()
131         defer cq.mtx.Unlock()
132         ctr := cq.current[uuid].Container
133         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
134                 cq.delEnt(uuid, ctr.State)
135         }
136 }
137
138 // Get returns the (partial) Container record for the specified
139 // container. Like a map lookup, its second return value is false if
140 // the specified container is not in the Queue.
141 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
142         cq.mtx.Lock()
143         defer cq.mtx.Unlock()
144         if ctr, ok := cq.current[uuid]; !ok {
145                 return arvados.Container{}, false
146         } else {
147                 return ctr.Container, true
148         }
149 }
150
151 // Entries returns all cache entries, keyed by container UUID.
152 //
153 // The returned threshold indicates the maximum age of any cached data
154 // returned in the map. This makes it possible for a scheduler to
155 // determine correctly the outcome of a remote process that updates
156 // container state. It must first wait for the remote process to exit,
157 // then wait for the Queue to start and finish its next Update --
158 // i.e., it must wait until threshold > timeProcessExited.
159 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
160         cq.mtx.Lock()
161         defer cq.mtx.Unlock()
162         entries = make(map[string]QueueEnt, len(cq.current))
163         for uuid, ctr := range cq.current {
164                 entries[uuid] = ctr
165         }
166         threshold = cq.updated
167         return
168 }
169
170 // Update refreshes the cache from the Arvados API. It adds newly
171 // queued containers, and updates the state of previously queued
172 // containers.
173 func (cq *Queue) Update() error {
174         cq.mtx.Lock()
175         cq.dontupdate = map[string]struct{}{}
176         updateStarted := time.Now()
177         cq.mtx.Unlock()
178
179         next, err := cq.poll()
180         if err != nil {
181                 return err
182         }
183
184         cq.mtx.Lock()
185         defer cq.mtx.Unlock()
186         for uuid, ctr := range next {
187                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
188                         // Don't clobber a local update that happened
189                         // after we started polling.
190                         continue
191                 }
192                 if cur, ok := cq.current[uuid]; !ok {
193                         cq.addEnt(uuid, *ctr)
194                 } else {
195                         cur.Container = *ctr
196                         cq.current[uuid] = cur
197                 }
198         }
199         for uuid, ent := range cq.current {
200                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
201                         // Don't expunge an entry that was
202                         // added/updated locally after we started
203                         // polling.
204                         continue
205                 } else if _, stillpresent := next[uuid]; !stillpresent {
206                         // Expunge an entry that no longer appears in
207                         // the poll response (evidently it's
208                         // cancelled, completed, deleted, or taken by
209                         // a different dispatcher).
210                         cq.delEnt(uuid, ent.Container.State)
211                 }
212         }
213         cq.dontupdate = nil
214         cq.updated = updateStarted
215         cq.notify()
216         return nil
217 }
218
219 // Caller must have lock.
220 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
221         cq.logger.WithFields(logrus.Fields{
222                 "ContainerUUID": uuid,
223                 "State":         state,
224         }).Info("dropping container from queue")
225         delete(cq.current, uuid)
226 }
227
228 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
229         it, err := cq.chooseType(&ctr)
230         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
231                 // We assume here that any chooseType error is a hard
232                 // error: it wouldn't help to try again, or to leave
233                 // it for a different dispatcher process to attempt.
234                 errorString := err.Error()
235                 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
236                 logger.WithError(err).Warn("cancel container with no suitable instance type")
237                 go func() {
238                         if ctr.State == arvados.ContainerStateQueued {
239                                 // Can't set runtime error without
240                                 // locking first. If Lock() is
241                                 // successful, it will call addEnt()
242                                 // again itself, and we'll fall
243                                 // through to the
244                                 // setRuntimeError/Cancel code below.
245                                 err := cq.Lock(ctr.UUID)
246                                 if err != nil {
247                                         logger.WithError(err).Warn("lock failed")
248                                         // ...and try again on the
249                                         // next Update, if the problem
250                                         // still exists.
251                                 }
252                                 return
253                         }
254                         var err error
255                         defer func() {
256                                 if err == nil {
257                                         return
258                                 }
259                                 // On failure, check current container
260                                 // state, and don't log the error if
261                                 // the failure came from losing a
262                                 // race.
263                                 var latest arvados.Container
264                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
265                                 if latest.State == arvados.ContainerStateCancelled {
266                                         return
267                                 }
268                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
269                         }()
270                         err = cq.setRuntimeError(ctr.UUID, errorString)
271                         if err != nil {
272                                 return
273                         }
274                         err = cq.Cancel(ctr.UUID)
275                         if err != nil {
276                                 return
277                         }
278                 }()
279                 return
280         }
281         cq.logger.WithFields(logrus.Fields{
282                 "ContainerUUID": ctr.UUID,
283                 "State":         ctr.State,
284                 "Priority":      ctr.Priority,
285                 "InstanceType":  it.Name,
286         }).Info("adding container to queue")
287         cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
288 }
289
290 // Lock acquires the dispatch lock for the given container.
291 func (cq *Queue) Lock(uuid string) error {
292         return cq.apiUpdate(uuid, "lock")
293 }
294
295 // Unlock releases the dispatch lock for the given container.
296 func (cq *Queue) Unlock(uuid string) error {
297         return cq.apiUpdate(uuid, "unlock")
298 }
299
300 // setRuntimeError sets runtime_status["error"] to the given value.
301 // Container should already have state==Locked or Running.
302 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
303         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
304                 "container": {
305                         "runtime_status": {
306                                 "error": errorString,
307                         },
308                 },
309         })
310 }
311
312 // Cancel cancels the given container.
313 func (cq *Queue) Cancel(uuid string) error {
314         err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
315                 "container": {"state": arvados.ContainerStateCancelled},
316         })
317         if err != nil {
318                 return err
319         }
320         cq.mtx.Lock()
321         defer cq.mtx.Unlock()
322         cq.notify()
323         return nil
324 }
325
326 func (cq *Queue) apiUpdate(uuid, action string) error {
327         var resp arvados.Container
328         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
329         if err != nil {
330                 return err
331         }
332
333         cq.mtx.Lock()
334         defer cq.mtx.Unlock()
335         if cq.dontupdate != nil {
336                 cq.dontupdate[uuid] = struct{}{}
337         }
338         if ent, ok := cq.current[uuid]; !ok {
339                 cq.addEnt(uuid, resp)
340         } else {
341                 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
342                 cq.current[uuid] = ent
343         }
344         cq.notify()
345         return nil
346 }
347
348 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
349         cq.mtx.Lock()
350         size := len(cq.current)
351         auth := cq.auth
352         cq.mtx.Unlock()
353
354         if auth == nil {
355                 auth = &arvados.APIClientAuthorization{}
356                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
357                 if err != nil {
358                         return nil, err
359                 }
360                 cq.mtx.Lock()
361                 cq.auth = auth
362                 cq.mtx.Unlock()
363         }
364
365         next := make(map[string]*arvados.Container, size)
366         apply := func(updates []arvados.Container) {
367                 for _, upd := range updates {
368                         if next[upd.UUID] == nil {
369                                 next[upd.UUID] = &arvados.Container{}
370                         }
371                         *next[upd.UUID] = upd
372                 }
373         }
374         selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
375         limitParam := 1000
376
377         mine, err := cq.fetchAll(arvados.ResourceListParams{
378                 Select:  selectParam,
379                 Order:   "uuid",
380                 Limit:   &limitParam,
381                 Count:   "none",
382                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
383         })
384         if err != nil {
385                 return nil, err
386         }
387         apply(mine)
388
389         avail, err := cq.fetchAll(arvados.ResourceListParams{
390                 Select:  selectParam,
391                 Order:   "uuid",
392                 Limit:   &limitParam,
393                 Count:   "none",
394                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
395         })
396         if err != nil {
397                 return nil, err
398         }
399         apply(avail)
400
401         var missing []string
402         cq.mtx.Lock()
403         for uuid, ent := range cq.current {
404                 if next[uuid] == nil &&
405                         ent.Container.State != arvados.ContainerStateCancelled &&
406                         ent.Container.State != arvados.ContainerStateComplete {
407                         missing = append(missing, uuid)
408                 }
409         }
410         cq.mtx.Unlock()
411
412         for i, page := 0, 20; i < len(missing); i += page {
413                 batch := missing[i:]
414                 if len(batch) > page {
415                         batch = batch[:page]
416                 }
417                 ended, err := cq.fetchAll(arvados.ResourceListParams{
418                         Select:  selectParam,
419                         Order:   "uuid",
420                         Count:   "none",
421                         Filters: []arvados.Filter{{"uuid", "in", batch}},
422                 })
423                 if err != nil {
424                         return nil, err
425                 }
426                 apply(ended)
427         }
428         return next, nil
429 }
430
431 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
432         var results []arvados.Container
433         params := initialParams
434         params.Offset = 0
435         for {
436                 // This list variable must be a new one declared
437                 // inside the loop: otherwise, items in the API
438                 // response would get deep-merged into the items
439                 // loaded in previous iterations.
440                 var list arvados.ContainerList
441
442                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
443                 if err != nil {
444                         return nil, err
445                 }
446                 if len(list.Items) == 0 {
447                         break
448                 }
449
450                 results = append(results, list.Items...)
451                 if len(params.Order) == 1 && params.Order == "uuid" {
452                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
453                 } else {
454                         params.Offset += len(list.Items)
455                 }
456         }
457         return results, nil
458 }