Merge branch '14977-kill-if-not-in-queue'
[arvados.git] / lib / dispatchcloud / container / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package container
6
7 import (
8         "errors"
9         "io"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
19
20 // An APIClient performs Arvados API requests. It is typically an
21 // *arvados.Client.
22 type APIClient interface {
23         RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
24 }
25
26 // A QueueEnt is an entry in the queue, consisting of a container
27 // record and the instance type that should be used to run it.
28 type QueueEnt struct {
29         // The container to run. Only the UUID, State, Priority, and
30         // RuntimeConstraints fields are populated.
31         Container    arvados.Container    `json:"container"`
32         InstanceType arvados.InstanceType `json:"instance_type"`
33 }
34
35 // String implements fmt.Stringer by returning the queued container's
36 // UUID.
37 func (c *QueueEnt) String() string {
38         return c.Container.UUID
39 }
40
41 // A Queue is an interface to an Arvados cluster's container
42 // database. It presents only the containers that are eligible to be
43 // run by, are already being run by, or have recently been run by the
44 // present dispatcher.
45 //
46 // The Entries, Get, and Forget methods do not block: they return
47 // immediately, using cached data.
48 //
49 // The updating methods (Cancel, Lock, Unlock, Update) do block: they
50 // return only after the operation has completed.
51 //
52 // A Queue's Update method should be called periodically to keep the
53 // cache up to date.
54 type Queue struct {
55         logger     logrus.FieldLogger
56         reg        *prometheus.Registry
57         chooseType typeChooser
58         client     APIClient
59
60         auth    *arvados.APIClientAuthorization
61         current map[string]QueueEnt
62         updated time.Time
63         mtx     sync.Mutex
64
65         // Methods that modify the Queue (like Lock) add the affected
66         // container UUIDs to dontupdate. When applying a batch of
67         // updates received from the network, anything appearing in
68         // dontupdate is skipped, in case the received update has
69         // already been superseded by the locally initiated change.
70         // When no network update is in progress, this protection is
71         // not needed, and dontupdate is nil.
72         dontupdate map[string]struct{}
73
74         // active notification subscribers (see Subscribe)
75         subscribers map[<-chan struct{}]chan struct{}
76 }
77
78 // NewQueue returns a new Queue. When a new container appears in the
79 // Arvados cluster's queue during Update, chooseType will be called to
80 // assign an appropriate arvados.InstanceType for the queue entry.
81 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
82         return &Queue{
83                 logger:      logger,
84                 reg:         reg,
85                 chooseType:  chooseType,
86                 client:      client,
87                 current:     map[string]QueueEnt{},
88                 subscribers: map[<-chan struct{}]chan struct{}{},
89         }
90 }
91
92 // Subscribe returns a channel that becomes ready to receive when an
93 // entry in the Queue is updated.
94 //
95 //      ch := q.Subscribe()
96 //      defer q.Unsubscribe(ch)
97 //      for range ch {
98 //              // ...
99 //      }
100 func (cq *Queue) Subscribe() <-chan struct{} {
101         cq.mtx.Lock()
102         defer cq.mtx.Unlock()
103         ch := make(chan struct{}, 1)
104         cq.subscribers[ch] = ch
105         return ch
106 }
107
108 // Unsubscribe stops sending updates to the given channel. See
109 // Subscribe.
110 func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
111         cq.mtx.Lock()
112         defer cq.mtx.Unlock()
113         delete(cq.subscribers, ch)
114 }
115
116 // Caller must have lock.
117 func (cq *Queue) notify() {
118         for _, ch := range cq.subscribers {
119                 select {
120                 case ch <- struct{}{}:
121                 default:
122                 }
123         }
124 }
125
126 // Forget drops the specified container from the cache. It should be
127 // called on finalized containers to avoid leaking memory over
128 // time. It is a no-op if the indicated container is not in a
129 // finalized state.
130 func (cq *Queue) Forget(uuid string) {
131         cq.mtx.Lock()
132         defer cq.mtx.Unlock()
133         ctr := cq.current[uuid].Container
134         if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
135                 cq.delEnt(uuid, ctr.State)
136         }
137 }
138
139 // Get returns the (partial) Container record for the specified
140 // container. Like a map lookup, its second return value is false if
141 // the specified container is not in the Queue.
142 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
143         cq.mtx.Lock()
144         defer cq.mtx.Unlock()
145         if ctr, ok := cq.current[uuid]; !ok {
146                 return arvados.Container{}, false
147         } else {
148                 return ctr.Container, true
149         }
150 }
151
152 // Entries returns all cache entries, keyed by container UUID.
153 //
154 // The returned threshold indicates the maximum age of any cached data
155 // returned in the map. This makes it possible for a scheduler to
156 // determine correctly the outcome of a remote process that updates
157 // container state. It must first wait for the remote process to exit,
158 // then wait for the Queue to start and finish its next Update --
159 // i.e., it must wait until threshold > timeProcessExited.
160 func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
161         cq.mtx.Lock()
162         defer cq.mtx.Unlock()
163         entries = make(map[string]QueueEnt, len(cq.current))
164         for uuid, ctr := range cq.current {
165                 entries[uuid] = ctr
166         }
167         threshold = cq.updated
168         return
169 }
170
171 // Update refreshes the cache from the Arvados API. It adds newly
172 // queued containers, and updates the state of previously queued
173 // containers.
174 func (cq *Queue) Update() error {
175         cq.mtx.Lock()
176         cq.dontupdate = map[string]struct{}{}
177         updateStarted := time.Now()
178         cq.mtx.Unlock()
179
180         next, err := cq.poll()
181         if err != nil {
182                 return err
183         }
184
185         cq.mtx.Lock()
186         defer cq.mtx.Unlock()
187         for uuid, ctr := range next {
188                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
189                         // Don't clobber a local update that happened
190                         // after we started polling.
191                         continue
192                 }
193                 if cur, ok := cq.current[uuid]; !ok {
194                         cq.addEnt(uuid, *ctr)
195                 } else {
196                         cur.Container = *ctr
197                         cq.current[uuid] = cur
198                 }
199         }
200         for uuid, ent := range cq.current {
201                 if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
202                         // Don't expunge an entry that was
203                         // added/updated locally after we started
204                         // polling.
205                         continue
206                 } else if _, stillpresent := next[uuid]; !stillpresent {
207                         // Expunge an entry that no longer appears in
208                         // the poll response (evidently it's
209                         // cancelled, completed, deleted, or taken by
210                         // a different dispatcher).
211                         cq.delEnt(uuid, ent.Container.State)
212                 }
213         }
214         cq.dontupdate = nil
215         cq.updated = updateStarted
216         cq.notify()
217         return nil
218 }
219
220 // Caller must have lock.
221 func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
222         cq.logger.WithFields(logrus.Fields{
223                 "ContainerUUID": uuid,
224                 "State":         state,
225         }).Info("dropping container from queue")
226         delete(cq.current, uuid)
227 }
228
229 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
230         it, err := cq.chooseType(&ctr)
231         if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
232                 // We assume here that any chooseType error is a hard
233                 // error: it wouldn't help to try again, or to leave
234                 // it for a different dispatcher process to attempt.
235                 errorString := err.Error()
236                 logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
237                 logger.WithError(err).Warn("cancel container with no suitable instance type")
238                 go func() {
239                         if ctr.State == arvados.ContainerStateQueued {
240                                 // Can't set runtime error without
241                                 // locking first. If Lock() is
242                                 // successful, it will call addEnt()
243                                 // again itself, and we'll fall
244                                 // through to the
245                                 // setRuntimeError/Cancel code below.
246                                 err := cq.Lock(ctr.UUID)
247                                 if err != nil {
248                                         logger.WithError(err).Warn("lock failed")
249                                         // ...and try again on the
250                                         // next Update, if the problem
251                                         // still exists.
252                                 }
253                                 return
254                         }
255                         var err error
256                         defer func() {
257                                 if err == nil {
258                                         return
259                                 }
260                                 // On failure, check current container
261                                 // state, and don't log the error if
262                                 // the failure came from losing a
263                                 // race.
264                                 var latest arvados.Container
265                                 cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
266                                 if latest.State == arvados.ContainerStateCancelled {
267                                         return
268                                 }
269                                 logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
270                         }()
271                         err = cq.setRuntimeError(ctr.UUID, errorString)
272                         if err != nil {
273                                 return
274                         }
275                         err = cq.Cancel(ctr.UUID)
276                         if err != nil {
277                                 return
278                         }
279                 }()
280                 return
281         }
282         cq.logger.WithFields(logrus.Fields{
283                 "ContainerUUID": ctr.UUID,
284                 "State":         ctr.State,
285                 "Priority":      ctr.Priority,
286                 "InstanceType":  it.Name,
287         }).Info("adding container to queue")
288         cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
289 }
290
291 // Lock acquires the dispatch lock for the given container.
292 func (cq *Queue) Lock(uuid string) error {
293         return cq.apiUpdate(uuid, "lock")
294 }
295
296 // Unlock releases the dispatch lock for the given container.
297 func (cq *Queue) Unlock(uuid string) error {
298         return cq.apiUpdate(uuid, "unlock")
299 }
300
301 // setRuntimeError sets runtime_status["error"] to the given value.
302 // Container should already have state==Locked or Running.
303 func (cq *Queue) setRuntimeError(uuid, errorString string) error {
304         return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
305                 "container": {
306                         "runtime_status": {
307                                 "error": errorString,
308                         },
309                 },
310         })
311 }
312
313 // Cancel cancels the given container.
314 func (cq *Queue) Cancel(uuid string) error {
315         err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
316                 "container": {"state": arvados.ContainerStateCancelled},
317         })
318         if err != nil {
319                 return err
320         }
321         cq.mtx.Lock()
322         defer cq.mtx.Unlock()
323         cq.notify()
324         return nil
325 }
326
327 func (cq *Queue) apiUpdate(uuid, action string) error {
328         var resp arvados.Container
329         err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
330         if err != nil {
331                 return err
332         }
333
334         cq.mtx.Lock()
335         defer cq.mtx.Unlock()
336         if cq.dontupdate != nil {
337                 cq.dontupdate[uuid] = struct{}{}
338         }
339         if ent, ok := cq.current[uuid]; !ok {
340                 cq.addEnt(uuid, resp)
341         } else {
342                 ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
343                 cq.current[uuid] = ent
344         }
345         cq.notify()
346         return nil
347 }
348
349 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
350         cq.mtx.Lock()
351         size := len(cq.current)
352         auth := cq.auth
353         cq.mtx.Unlock()
354
355         if auth == nil {
356                 auth = &arvados.APIClientAuthorization{}
357                 err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
358                 if err != nil {
359                         return nil, err
360                 }
361                 cq.mtx.Lock()
362                 cq.auth = auth
363                 cq.mtx.Unlock()
364         }
365
366         next := make(map[string]*arvados.Container, size)
367         apply := func(updates []arvados.Container) {
368                 for _, upd := range updates {
369                         if next[upd.UUID] == nil {
370                                 next[upd.UUID] = &arvados.Container{}
371                         }
372                         *next[upd.UUID] = upd
373                 }
374         }
375         selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
376         limitParam := 1000
377
378         mine, err := cq.fetchAll(arvados.ResourceListParams{
379                 Select:  selectParam,
380                 Order:   "uuid",
381                 Limit:   &limitParam,
382                 Count:   "none",
383                 Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
384         })
385         if err != nil {
386                 return nil, err
387         }
388         apply(mine)
389
390         avail, err := cq.fetchAll(arvados.ResourceListParams{
391                 Select:  selectParam,
392                 Order:   "uuid",
393                 Limit:   &limitParam,
394                 Count:   "none",
395                 Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
396         })
397         if err != nil {
398                 return nil, err
399         }
400         apply(avail)
401
402         missing := map[string]bool{}
403         cq.mtx.Lock()
404         for uuid, ent := range cq.current {
405                 if next[uuid] == nil &&
406                         ent.Container.State != arvados.ContainerStateCancelled &&
407                         ent.Container.State != arvados.ContainerStateComplete {
408                         missing[uuid] = true
409                 }
410         }
411         cq.mtx.Unlock()
412
413         for len(missing) > 0 {
414                 var batch []string
415                 for uuid := range missing {
416                         batch = append(batch, uuid)
417                         if len(batch) == 20 {
418                                 break
419                         }
420                 }
421                 filters := []arvados.Filter{{"uuid", "in", batch}}
422                 ended, err := cq.fetchAll(arvados.ResourceListParams{
423                         Select:  selectParam,
424                         Order:   "uuid",
425                         Count:   "none",
426                         Filters: filters,
427                 })
428                 if err != nil {
429                         return nil, err
430                 }
431                 apply(ended)
432                 if len(ended) == 0 {
433                         // This is the only case where we can conclude
434                         // a container has been deleted from the
435                         // database. A short (but non-zero) page, on
436                         // the other hand, can be caused by a response
437                         // size limit.
438                         for _, uuid := range batch {
439                                 cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
440                                 delete(missing, uuid)
441                                 cq.mtx.Lock()
442                                 cq.delEnt(uuid, cq.current[uuid].Container.State)
443                                 cq.mtx.Unlock()
444                         }
445                         continue
446                 }
447                 for _, ctr := range ended {
448                         if _, ok := missing[ctr.UUID]; !ok {
449                                 msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
450                                 cq.logger.WithFields(logrus.Fields{
451                                         "ContainerUUID": ctr.UUID,
452                                         "Filters":       filters,
453                                 }).Error(msg)
454                                 return nil, errors.New(msg)
455                         }
456                         delete(missing, ctr.UUID)
457                 }
458         }
459         return next, nil
460 }
461
462 func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
463         var results []arvados.Container
464         params := initialParams
465         params.Offset = 0
466         for {
467                 // This list variable must be a new one declared
468                 // inside the loop: otherwise, items in the API
469                 // response would get deep-merged into the items
470                 // loaded in previous iterations.
471                 var list arvados.ContainerList
472
473                 err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
474                 if err != nil {
475                         return nil, err
476                 }
477                 if len(list.Items) == 0 {
478                         break
479                 }
480
481                 results = append(results, list.Items...)
482                 if len(params.Order) == 1 && params.Order == "uuid" {
483                         params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
484                 } else {
485                         params.Offset += len(list.Items)
486                 }
487         }
488         return results, nil
489 }