X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3aaefcb3c76ff470b475d950398d01255e87712a..1875af9bcf4a1afe435176e952e63341a9ae9c03:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 7a2727c1e9..8d8b7ff9af 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -15,7 +15,14 @@ import ( "github.com/sirupsen/logrus" ) -type typeChooser func(*arvados.Container) (arvados.InstanceType, error) +// Stop fetching queued containers after this many of the highest +// priority non-supervisor containers. Reduces API load when queue is +// long. This also limits how quickly a large batch of queued +// containers can be started, which improves reliability under high +// load at the cost of increased under light load. +const queuedContainersTarget = 100 + +type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error) // An APIClient performs Arvados API requests. It is typically an // *arvados.Client. @@ -27,10 +34,11 @@ type APIClient interface { // record and the instance type that should be used to run it. type QueueEnt struct { // The container to run. Only the UUID, State, Priority, - // RuntimeConstraints, Mounts, and ContainerImage fields are - // populated. - Container arvados.Container `json:"container"` - InstanceType arvados.InstanceType `json:"instance_type"` + // RuntimeConstraints, ContainerImage, SchedulingParameters, + // and CreatedAt fields are populated. + Container arvados.Container `json:"container"` + InstanceTypes []arvados.InstanceType `json:"instance_types"` + FirstSeenAt time.Time `json:"first_seen_at"` } // String implements fmt.Stringer by returning the queued container's @@ -229,14 +237,32 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) { delete(cq.current, uuid) } +// Caller must have lock. func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { - it, err := cq.chooseType(&ctr) + logger := cq.logger.WithField("ContainerUUID", ctr.UUID) + // We didn't ask for the Mounts field when polling + // controller/RailsAPI, because it can be expensive on the + // Rails side, and most of the time we already have it. But + // this is the first time we're seeing this container, so we + // need to fetch mounts in order to choose an instance type. + err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{ + Select: []string{"mounts"}, + }) + if err != nil { + logger.WithError(err).Warn("error getting mounts") + return + } + types, err := cq.chooseType(&ctr) + + // Avoid wasting memory on a large Mounts attr (we don't need + // it after choosing type). + ctr.Mounts = nil + if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) { // We assume here that any chooseType error is a hard // error: it wouldn't help to try again, or to leave // it for a different dispatcher process to attempt. errorString := err.Error() - logger := cq.logger.WithField("ContainerUUID", ctr.UUID) logger.WithError(err).Warn("cancel container with no suitable instance type") go func() { if ctr.State == arvados.ContainerStateQueued { @@ -278,13 +304,20 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { }() return } + typeNames := "" + for _, it := range types { + if typeNames != "" { + typeNames += ", " + } + typeNames += it.Name + } cq.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "State": ctr.State, "Priority": ctr.Priority, - "InstanceType": it.Name, + "InstanceTypes": typeNames, }).Info("adding container to queue") - cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it} + cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()} } // Lock acquires the dispatch lock for the given container. @@ -382,7 +415,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { *next[upd.UUID] = upd } } - selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"} + selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"} limitParam := 1000 mine, err := cq.fetchAll(arvados.ResourceListParams{ @@ -391,7 +424,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { Limit: &limitParam, Count: "none", Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}}, - }) + }, 0) if err != nil { return nil, err } @@ -399,16 +432,23 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { avail, err := cq.fetchAll(arvados.ResourceListParams{ Select: selectParam, - Order: "uuid", + Order: "priority desc", Limit: &limitParam, Count: "none", Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}}, - }) + }, queuedContainersTarget) if err != nil { return nil, err } apply(avail) + // Check for containers that we already know about but weren't + // returned by any of the above queries, and fetch them + // explicitly by UUID. If they're in a final state we can drop + // them, but otherwise we need to apply updates, e.g., + // + // - Queued container priority has been reduced + // - Locked container has been requeued with lower priority missing := map[string]bool{} cq.mtx.Lock() for uuid, ent := range cq.current { @@ -434,7 +474,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { Order: "uuid", Count: "none", Filters: filters, - }) + }, 0) if err != nil { return nil, err } @@ -469,10 +509,18 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { return next, nil } -func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) { +// Fetch all pages of containers. +// +// Except: if maxNonSuper>0, stop fetching more pages after receving +// that many non-supervisor containers. Along with {Order: "priority +// desc"}, this enables fetching enough high priority scheduling-ready +// containers to make progress, without necessarily fetching the +// entire queue. +func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) { var results []arvados.Container params := initialParams params.Offset = 0 + nonSuper := 0 for { // This list variable must be a new one declared // inside the loop: otherwise, items in the API @@ -488,8 +536,23 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C break } + // Conserve memory by deleting mounts that aren't + // relevant to choosing the instance type. + for _, c := range list.Items { + for path, mnt := range c.Mounts { + if mnt.Kind != "tmp" { + delete(c.Mounts, path) + } + } + if !c.SchedulingParameters.Supervisor { + nonSuper++ + } + } + results = append(results, list.Items...) - if len(params.Order) == 1 && params.Order == "uuid" { + if maxNonSuper > 0 && nonSuper >= maxNonSuper { + break + } else if params.Order == "uuid" { params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID}) } else { params.Offset += len(list.Items) @@ -521,7 +584,7 @@ func (cq *Queue) runMetrics(reg *prometheus.Registry) { } ents, _ := cq.Entries() for _, ent := range ents { - count[entKey{ent.Container.State, ent.InstanceType.Name}]++ + count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++ } for k, v := range count { mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))