20649: Fix panic on race, worker shutdown vs. container startup.
[arvados.git] / lib / dispatchcloud / container / queue.go
index bbe47625a893d6874d2c3c415952948f290de74f..77752013e8e6444431ffdb49020e9a6781b031ca 100644 (file)
@@ -5,15 +5,23 @@
 package container
 
 import (
+       "errors"
        "io"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
+// Stop fetching queued containers after this many of the highest
+// priority non-supervisor containers. Reduces API load when queue is
+// long. This also limits how quickly a large batch of queued
+// containers can be started, which improves reliability under high
+// load at the cost of increased under light load.
+const queuedContainersTarget = 100
+
 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
 
 // An APIClient performs Arvados API requests. It is typically an
@@ -25,10 +33,12 @@ type APIClient interface {
 // A QueueEnt is an entry in the queue, consisting of a container
 // record and the instance type that should be used to run it.
 type QueueEnt struct {
-       // The container to run. Only the UUID, State, Priority, and
-       // RuntimeConstraints fields are populated.
+       // The container to run. Only the UUID, State, Priority,
+       // RuntimeConstraints, ContainerImage, SchedulingParameters,
+       // and CreatedAt fields are populated.
        Container    arvados.Container    `json:"container"`
        InstanceType arvados.InstanceType `json:"instance_type"`
+       FirstSeenAt  time.Time            `json:"first_seen_at"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -52,7 +62,6 @@ func (c *QueueEnt) String() string {
 // cache up to date.
 type Queue struct {
        logger     logrus.FieldLogger
-       reg        *prometheus.Registry
        chooseType typeChooser
        client     APIClient
 
@@ -78,14 +87,17 @@ type Queue struct {
 // Arvados cluster's queue during Update, chooseType will be called to
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
-       return &Queue{
+       cq := &Queue{
                logger:      logger,
-               reg:         reg,
                chooseType:  chooseType,
                client:      client,
                current:     map[string]QueueEnt{},
                subscribers: map[<-chan struct{}]chan struct{}{},
        }
+       if reg != nil {
+               go cq.runMetrics(reg)
+       }
+       return cq
 }
 
 // Subscribe returns a channel that becomes ready to receive when an
@@ -130,7 +142,7 @@ func (cq *Queue) Forget(uuid string) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
-       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
                cq.delEnt(uuid, ctr.State)
        }
 }
@@ -141,11 +153,11 @@ func (cq *Queue) Forget(uuid string) {
 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
-       if ctr, ok := cq.current[uuid]; !ok {
+       ctr, ok := cq.current[uuid]
+       if !ok {
                return arvados.Container{}, false
-       } else {
-               return ctr.Container, true
        }
+       return ctr.Container, true
 }
 
 // Entries returns all cache entries, keyed by container UUID.
@@ -225,31 +237,45 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
        delete(cq.current, uuid)
 }
 
+// Caller must have lock.
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
+       logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+       // We didn't ask for the Mounts field when polling
+       // controller/RailsAPI, because it can be expensive on the
+       // Rails side, and most of the time we already have it.  But
+       // this is the first time we're seeing this container, so we
+       // need to fetch mounts in order to choose an instance type.
+       err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
+               Select: []string{"mounts"},
+       })
+       if err != nil {
+               logger.WithError(err).Warn("error getting mounts")
+               return
+       }
        it, err := cq.chooseType(&ctr)
+
+       // Avoid wasting memory on a large Mounts attr (we don't need
+       // it after choosing type).
+       ctr.Mounts = nil
+
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
                // We assume here that any chooseType error is a hard
                // error: it wouldn't help to try again, or to leave
                // it for a different dispatcher process to attempt.
                errorString := err.Error()
-               logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
                logger.WithError(err).Warn("cancel container with no suitable instance type")
                go func() {
                        if ctr.State == arvados.ContainerStateQueued {
                                // Can't set runtime error without
-                               // locking first. If Lock() is
-                               // successful, it will call addEnt()
-                               // again itself, and we'll fall
-                               // through to the
-                               // setRuntimeError/Cancel code below.
+                               // locking first.
                                err := cq.Lock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("lock failed")
+                                       return
                                        // ...and try again on the
                                        // next Update, if the problem
                                        // still exists.
                                }
-                               return
                        }
                        var err error
                        defer func() {
@@ -284,7 +310,7 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                "Priority":      ctr.Priority,
                "InstanceType":  it.Name,
        }).Info("adding container to queue")
-       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
 }
 
 // Lock acquires the dispatch lock for the given container.
@@ -311,15 +337,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error {
 
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
-       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
                "container": {"state": arvados.ContainerStateCancelled},
        })
        if err != nil {
                return err
        }
-       cq.mtx.Lock()
-       defer cq.mtx.Unlock()
-       cq.notify()
+       cq.updateWithResp(uuid, resp)
        return nil
 }
 
@@ -329,20 +354,32 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
        if err != nil {
                return err
        }
+       cq.updateWithResp(uuid, resp)
+       return nil
+}
 
+// Update the local queue with the response received from a
+// state-changing API request (lock/unlock/cancel).
+func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        if cq.dontupdate != nil {
                cq.dontupdate[uuid] = struct{}{}
        }
-       if ent, ok := cq.current[uuid]; !ok {
-               cq.addEnt(uuid, resp)
-       } else {
-               ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
-               cq.current[uuid] = ent
+       ent, ok := cq.current[uuid]
+       if !ok {
+               // Container is not in queue (e.g., it was not added
+               // because there is no suitable instance type, and
+               // we're just locking/updating it in order to set an
+               // error message). No need to add it, and we don't
+               // necessarily have enough information to add it here
+               // anyway because lock/unlock responses don't include
+               // runtime_constraints.
+               return
        }
+       ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+       cq.current[uuid] = ent
        cq.notify()
-       return nil
 }
 
 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
@@ -371,7 +408,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                        *next[upd.UUID] = upd
                }
        }
-       selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
        limitParam := 1000
 
        mine, err := cq.fetchAll(arvados.ResourceListParams{
@@ -380,7 +417,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                Limit:   &limitParam,
                Count:   "none",
                Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
-       })
+       }, 0)
        if err != nil {
                return nil, err
        }
@@ -388,50 +425,95 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
 
        avail, err := cq.fetchAll(arvados.ResourceListParams{
                Select:  selectParam,
-               Order:   "uuid",
+               Order:   "priority desc",
                Limit:   &limitParam,
                Count:   "none",
                Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
-       })
+       }, queuedContainersTarget)
        if err != nil {
                return nil, err
        }
        apply(avail)
 
-       var missing []string
+       // Check for containers that we already know about but weren't
+       // returned by any of the above queries, and fetch them
+       // explicitly by UUID. If they're in a final state we can drop
+       // them, but otherwise we need to apply updates, e.g.,
+       //
+       // - Queued container priority has been reduced
+       // - Locked container has been requeued with lower priority
+       missing := map[string]bool{}
        cq.mtx.Lock()
        for uuid, ent := range cq.current {
                if next[uuid] == nil &&
                        ent.Container.State != arvados.ContainerStateCancelled &&
                        ent.Container.State != arvados.ContainerStateComplete {
-                       missing = append(missing, uuid)
+                       missing[uuid] = true
                }
        }
        cq.mtx.Unlock()
 
-       for i, page := 0, 20; i < len(missing); i += page {
-               batch := missing[i:]
-               if len(batch) > page {
-                       batch = batch[:page]
+       for len(missing) > 0 {
+               var batch []string
+               for uuid := range missing {
+                       batch = append(batch, uuid)
+                       if len(batch) == 20 {
+                               break
+                       }
                }
+               filters := []arvados.Filter{{"uuid", "in", batch}}
                ended, err := cq.fetchAll(arvados.ResourceListParams{
                        Select:  selectParam,
                        Order:   "uuid",
                        Count:   "none",
-                       Filters: []arvados.Filter{{"uuid", "in", batch}},
-               })
+                       Filters: filters,
+               }, 0)
                if err != nil {
                        return nil, err
                }
                apply(ended)
+               if len(ended) == 0 {
+                       // This is the only case where we can conclude
+                       // a container has been deleted from the
+                       // database. A short (but non-zero) page, on
+                       // the other hand, can be caused by a response
+                       // size limit.
+                       for _, uuid := range batch {
+                               cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+                               delete(missing, uuid)
+                               cq.mtx.Lock()
+                               cq.delEnt(uuid, cq.current[uuid].Container.State)
+                               cq.mtx.Unlock()
+                       }
+                       continue
+               }
+               for _, ctr := range ended {
+                       if _, ok := missing[ctr.UUID]; !ok {
+                               msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+                               cq.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": ctr.UUID,
+                                       "Filters":       filters,
+                               }).Error(msg)
+                               return nil, errors.New(msg)
+                       }
+                       delete(missing, ctr.UUID)
+               }
        }
        return next, nil
 }
 
-func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+// Fetch all pages of containers.
+//
+// Except: if maxNonSuper>0, stop fetching more pages after receving
+// that many non-supervisor containers. Along with {Order: "priority
+// desc"}, this enables fetching enough high priority scheduling-ready
+// containers to make progress, without necessarily fetching the
+// entire queue.
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
        var results []arvados.Container
        params := initialParams
        params.Offset = 0
+       nonSuper := 0
        for {
                // This list variable must be a new one declared
                // inside the loop: otherwise, items in the API
@@ -447,8 +529,23 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
                        break
                }
 
+               // Conserve memory by deleting mounts that aren't
+               // relevant to choosing the instance type.
+               for _, c := range list.Items {
+                       for path, mnt := range c.Mounts {
+                               if mnt.Kind != "tmp" {
+                                       delete(c.Mounts, path)
+                               }
+                       }
+                       if !c.SchedulingParameters.Supervisor {
+                               nonSuper++
+                       }
+               }
+
                results = append(results, list.Items...)
-               if len(params.Order) == 1 && params.Order == "uuid" {
+               if maxNonSuper > 0 && nonSuper >= maxNonSuper {
+                       break
+               } else if params.Order == "uuid" {
                        params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
                } else {
                        params.Offset += len(list.Items)
@@ -456,3 +553,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
        }
        return results, nil
 }
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+       mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "queue_entries",
+               Help:      "Number of active container entries in the controller database.",
+       }, []string{"state", "instance_type"})
+       reg.MustRegister(mEntries)
+
+       type entKey struct {
+               state arvados.ContainerState
+               inst  string
+       }
+       count := map[entKey]int{}
+
+       ch := cq.Subscribe()
+       defer cq.Unsubscribe(ch)
+       for range ch {
+               for k := range count {
+                       count[k] = 0
+               }
+               ents, _ := cq.Entries()
+               for _, ent := range ents {
+                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+               }
+               for k, v := range count {
+                       mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+               }
+       }
+}