20978: Add MaximumPriceFactor config.
[arvados.git] / lib / dispatchcloud / container / queue.go
index 7a2727c1e9532271cb5e7df52f1a383e49f2584f..8d8b7ff9af09a152ec106d020097c6b820920b62 100644 (file)
@@ -15,7 +15,14 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+// Stop fetching queued containers after this many of the highest
+// priority non-supervisor containers. Reduces API load when queue is
+// long. This also limits how quickly a large batch of queued
+// containers can be started, which improves reliability under high
+// load at the cost of increased under light load.
+const queuedContainersTarget = 100
+
+type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
 
 // An APIClient performs Arvados API requests. It is typically an
 // *arvados.Client.
@@ -27,10 +34,11 @@ type APIClient interface {
 // record and the instance type that should be used to run it.
 type QueueEnt struct {
        // The container to run. Only the UUID, State, Priority,
-       // RuntimeConstraints, Mounts, and ContainerImage fields are
-       // populated.
-       Container    arvados.Container    `json:"container"`
-       InstanceType arvados.InstanceType `json:"instance_type"`
+       // RuntimeConstraints, ContainerImage, SchedulingParameters,
+       // and CreatedAt fields are populated.
+       Container     arvados.Container      `json:"container"`
+       InstanceTypes []arvados.InstanceType `json:"instance_types"`
+       FirstSeenAt   time.Time              `json:"first_seen_at"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -229,14 +237,32 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
        delete(cq.current, uuid)
 }
 
+// Caller must have lock.
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
-       it, err := cq.chooseType(&ctr)
+       logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+       // We didn't ask for the Mounts field when polling
+       // controller/RailsAPI, because it can be expensive on the
+       // Rails side, and most of the time we already have it.  But
+       // this is the first time we're seeing this container, so we
+       // need to fetch mounts in order to choose an instance type.
+       err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
+               Select: []string{"mounts"},
+       })
+       if err != nil {
+               logger.WithError(err).Warn("error getting mounts")
+               return
+       }
+       types, err := cq.chooseType(&ctr)
+
+       // Avoid wasting memory on a large Mounts attr (we don't need
+       // it after choosing type).
+       ctr.Mounts = nil
+
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
                // We assume here that any chooseType error is a hard
                // error: it wouldn't help to try again, or to leave
                // it for a different dispatcher process to attempt.
                errorString := err.Error()
-               logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
                logger.WithError(err).Warn("cancel container with no suitable instance type")
                go func() {
                        if ctr.State == arvados.ContainerStateQueued {
@@ -278,13 +304,20 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                }()
                return
        }
+       typeNames := ""
+       for _, it := range types {
+               if typeNames != "" {
+                       typeNames += ", "
+               }
+               typeNames += it.Name
+       }
        cq.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "State":         ctr.State,
                "Priority":      ctr.Priority,
-               "InstanceType":  it.Name,
+               "InstanceTypes": typeNames,
        }).Info("adding container to queue")
-       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
 }
 
 // Lock acquires the dispatch lock for the given container.
@@ -382,7 +415,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                        *next[upd.UUID] = upd
                }
        }
-       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
        limitParam := 1000
 
        mine, err := cq.fetchAll(arvados.ResourceListParams{
@@ -391,7 +424,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                Limit:   &limitParam,
                Count:   "none",
                Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
-       })
+       }, 0)
        if err != nil {
                return nil, err
        }
@@ -399,16 +432,23 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
 
        avail, err := cq.fetchAll(arvados.ResourceListParams{
                Select:  selectParam,
-               Order:   "uuid",
+               Order:   "priority desc",
                Limit:   &limitParam,
                Count:   "none",
                Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
-       })
+       }, queuedContainersTarget)
        if err != nil {
                return nil, err
        }
        apply(avail)
 
+       // Check for containers that we already know about but weren't
+       // returned by any of the above queries, and fetch them
+       // explicitly by UUID. If they're in a final state we can drop
+       // them, but otherwise we need to apply updates, e.g.,
+       //
+       // - Queued container priority has been reduced
+       // - Locked container has been requeued with lower priority
        missing := map[string]bool{}
        cq.mtx.Lock()
        for uuid, ent := range cq.current {
@@ -434,7 +474,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                        Order:   "uuid",
                        Count:   "none",
                        Filters: filters,
-               })
+               }, 0)
                if err != nil {
                        return nil, err
                }
@@ -469,10 +509,18 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
        return next, nil
 }
 
-func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+// Fetch all pages of containers.
+//
+// Except: if maxNonSuper>0, stop fetching more pages after receving
+// that many non-supervisor containers. Along with {Order: "priority
+// desc"}, this enables fetching enough high priority scheduling-ready
+// containers to make progress, without necessarily fetching the
+// entire queue.
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
        var results []arvados.Container
        params := initialParams
        params.Offset = 0
+       nonSuper := 0
        for {
                // This list variable must be a new one declared
                // inside the loop: otherwise, items in the API
@@ -488,8 +536,23 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
                        break
                }
 
+               // Conserve memory by deleting mounts that aren't
+               // relevant to choosing the instance type.
+               for _, c := range list.Items {
+                       for path, mnt := range c.Mounts {
+                               if mnt.Kind != "tmp" {
+                                       delete(c.Mounts, path)
+                               }
+                       }
+                       if !c.SchedulingParameters.Supervisor {
+                               nonSuper++
+                       }
+               }
+
                results = append(results, list.Items...)
-               if len(params.Order) == 1 && params.Order == "uuid" {
+               if maxNonSuper > 0 && nonSuper >= maxNonSuper {
+                       break
+               } else if params.Order == "uuid" {
                        params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
                } else {
                        params.Offset += len(list.Items)
@@ -521,7 +584,7 @@ func (cq *Queue) runMetrics(reg *prometheus.Registry) {
                }
                ents, _ := cq.Entries()
                for _, ent := range ents {
-                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+                       count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
                }
                for k, v := range count {
                        mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))