X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/800139c8dee7d9a563a8a2dca9e45e283c55c22c..ac39afed6cd3de1704d75aecdbc46544b02f02b2:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 847fe9e27c..8d8b7ff9af 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -5,16 +5,24 @@ package container import ( + "errors" "io" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) -type typeChooser func(*arvados.Container) (arvados.InstanceType, error) +// Stop fetching queued containers after this many of the highest +// priority non-supervisor containers. Reduces API load when queue is +// long. This also limits how quickly a large batch of queued +// containers can be started, which improves reliability under high +// load at the cost of increased under light load. +const queuedContainersTarget = 100 + +type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error) // An APIClient performs Arvados API requests. It is typically an // *arvados.Client. @@ -25,10 +33,12 @@ type APIClient interface { // A QueueEnt is an entry in the queue, consisting of a container // record and the instance type that should be used to run it. type QueueEnt struct { - // The container to run. Only the UUID, State, Priority, and - // RuntimeConstraints fields are populated. - Container arvados.Container `json:"container"` - InstanceType arvados.InstanceType `json:"instance_type"` + // The container to run. Only the UUID, State, Priority, + // RuntimeConstraints, ContainerImage, SchedulingParameters, + // and CreatedAt fields are populated. + Container arvados.Container `json:"container"` + InstanceTypes []arvados.InstanceType `json:"instance_types"` + FirstSeenAt time.Time `json:"first_seen_at"` } // String implements fmt.Stringer by returning the queued container's @@ -52,7 +62,6 @@ func (c *QueueEnt) String() string { // cache up to date. type Queue struct { logger logrus.FieldLogger - reg *prometheus.Registry chooseType typeChooser client APIClient @@ -78,14 +87,17 @@ type Queue struct { // Arvados cluster's queue during Update, chooseType will be called to // assign an appropriate arvados.InstanceType for the queue entry. func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue { - return &Queue{ + cq := &Queue{ logger: logger, - reg: reg, chooseType: chooseType, client: client, current: map[string]QueueEnt{}, subscribers: map[<-chan struct{}]chan struct{}{}, } + if reg != nil { + go cq.runMetrics(reg) + } + return cq } // Subscribe returns a channel that becomes ready to receive when an @@ -130,8 +142,8 @@ func (cq *Queue) Forget(uuid string) { cq.mtx.Lock() defer cq.mtx.Unlock() ctr := cq.current[uuid].Container - if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled { - delete(cq.current, uuid) + if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) { + cq.delEnt(uuid, ctr.State) } } @@ -141,11 +153,11 @@ func (cq *Queue) Forget(uuid string) { func (cq *Queue) Get(uuid string) (arvados.Container, bool) { cq.mtx.Lock() defer cq.mtx.Unlock() - if ctr, ok := cq.current[uuid]; !ok { + ctr, ok := cq.current[uuid] + if !ok { return arvados.Container{}, false - } else { - return ctr.Container, true } + return ctr.Container, true } // Entries returns all cache entries, keyed by container UUID. @@ -196,7 +208,7 @@ func (cq *Queue) Update() error { cq.current[uuid] = cur } } - for uuid := range cq.current { + for uuid, ent := range cq.current { if _, dontupdate := cq.dontupdate[uuid]; dontupdate { // Don't expunge an entry that was // added/updated locally after we started @@ -207,7 +219,7 @@ func (cq *Queue) Update() error { // the poll response (evidently it's // cancelled, completed, deleted, or taken by // a different dispatcher). - delete(cq.current, uuid) + cq.delEnt(uuid, ent.Container.State) } } cq.dontupdate = nil @@ -216,15 +228,55 @@ func (cq *Queue) Update() error { return nil } +// Caller must have lock. +func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) { + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": state, + }).Info("dropping container from queue") + delete(cq.current, uuid) +} + +// Caller must have lock. func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { - it, err := cq.chooseType(&ctr) + logger := cq.logger.WithField("ContainerUUID", ctr.UUID) + // We didn't ask for the Mounts field when polling + // controller/RailsAPI, because it can be expensive on the + // Rails side, and most of the time we already have it. But + // this is the first time we're seeing this container, so we + // need to fetch mounts in order to choose an instance type. + err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{ + Select: []string{"mounts"}, + }) + if err != nil { + logger.WithError(err).Warn("error getting mounts") + return + } + types, err := cq.chooseType(&ctr) + + // Avoid wasting memory on a large Mounts attr (we don't need + // it after choosing type). + ctr.Mounts = nil + if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) { // We assume here that any chooseType error is a hard // error: it wouldn't help to try again, or to leave // it for a different dispatcher process to attempt. errorString := err.Error() - cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type") + logger.WithError(err).Warn("cancel container with no suitable instance type") go func() { + if ctr.State == arvados.ContainerStateQueued { + // Can't set runtime error without + // locking first. + err := cq.Lock(ctr.UUID) + if err != nil { + logger.WithError(err).Warn("lock failed") + return + // ...and try again on the + // next Update, if the problem + // still exists. + } + } var err error defer func() { if err == nil { @@ -239,14 +291,8 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { if latest.State == arvados.ContainerStateCancelled { return } - cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container") + logger.WithError(err).Warn("error while trying to cancel unsatisfiable container") }() - if ctr.State == arvados.ContainerStateQueued { - err = cq.Lock(ctr.UUID) - if err != nil { - return - } - } err = cq.setRuntimeError(ctr.UUID, errorString) if err != nil { return @@ -258,7 +304,20 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { }() return } - cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it} + typeNames := "" + for _, it := range types { + if typeNames != "" { + typeNames += ", " + } + typeNames += it.Name + } + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "State": ctr.State, + "Priority": ctr.Priority, + "InstanceTypes": typeNames, + }).Info("adding container to queue") + cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()} } // Lock acquires the dispatch lock for the given container. @@ -285,15 +344,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error { // Cancel cancels the given container. func (cq *Queue) Cancel(uuid string) error { - err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ + var resp arvados.Container + err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ "container": {"state": arvados.ContainerStateCancelled}, }) if err != nil { return err } - cq.mtx.Lock() - defer cq.mtx.Unlock() - cq.notify() + cq.updateWithResp(uuid, resp) return nil } @@ -303,20 +361,32 @@ func (cq *Queue) apiUpdate(uuid, action string) error { if err != nil { return err } + cq.updateWithResp(uuid, resp) + return nil +} +// Update the local queue with the response received from a +// state-changing API request (lock/unlock/cancel). +func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) { cq.mtx.Lock() defer cq.mtx.Unlock() if cq.dontupdate != nil { cq.dontupdate[uuid] = struct{}{} } - if ent, ok := cq.current[uuid]; !ok { - cq.addEnt(uuid, resp) - } else { - ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID - cq.current[uuid] = ent + ent, ok := cq.current[uuid] + if !ok { + // Container is not in queue (e.g., it was not added + // because there is no suitable instance type, and + // we're just locking/updating it in order to set an + // error message). No need to add it, and we don't + // necessarily have enough information to add it here + // anyway because lock/unlock responses don't include + // runtime_constraints. + return } + ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID + cq.current[uuid] = ent cq.notify() - return nil } func (cq *Queue) poll() (map[string]*arvados.Container, error) { @@ -345,7 +415,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { *next[upd.UUID] = upd } } - selectParam := []string{"uuid", "state", "priority", "runtime_constraints"} + selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"} limitParam := 1000 mine, err := cq.fetchAll(arvados.ResourceListParams{ @@ -354,7 +424,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { Limit: &limitParam, Count: "none", Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}}, - }) + }, 0) if err != nil { return nil, err } @@ -362,50 +432,95 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { avail, err := cq.fetchAll(arvados.ResourceListParams{ Select: selectParam, - Order: "uuid", + Order: "priority desc", Limit: &limitParam, Count: "none", Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}}, - }) + }, queuedContainersTarget) if err != nil { return nil, err } apply(avail) - var missing []string + // Check for containers that we already know about but weren't + // returned by any of the above queries, and fetch them + // explicitly by UUID. If they're in a final state we can drop + // them, but otherwise we need to apply updates, e.g., + // + // - Queued container priority has been reduced + // - Locked container has been requeued with lower priority + missing := map[string]bool{} cq.mtx.Lock() for uuid, ent := range cq.current { if next[uuid] == nil && ent.Container.State != arvados.ContainerStateCancelled && ent.Container.State != arvados.ContainerStateComplete { - missing = append(missing, uuid) + missing[uuid] = true } } cq.mtx.Unlock() - for i, page := 0, 20; i < len(missing); i += page { - batch := missing[i:] - if len(batch) > page { - batch = batch[:page] + for len(missing) > 0 { + var batch []string + for uuid := range missing { + batch = append(batch, uuid) + if len(batch) == 20 { + break + } } + filters := []arvados.Filter{{"uuid", "in", batch}} ended, err := cq.fetchAll(arvados.ResourceListParams{ Select: selectParam, Order: "uuid", Count: "none", - Filters: []arvados.Filter{{"uuid", "in", batch}}, - }) + Filters: filters, + }, 0) if err != nil { return nil, err } apply(ended) + if len(ended) == 0 { + // This is the only case where we can conclude + // a container has been deleted from the + // database. A short (but non-zero) page, on + // the other hand, can be caused by a response + // size limit. + for _, uuid := range batch { + cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)") + delete(missing, uuid) + cq.mtx.Lock() + cq.delEnt(uuid, cq.current[uuid].Container.State) + cq.mtx.Unlock() + } + continue + } + for _, ctr := range ended { + if _, ok := missing[ctr.UUID]; !ok { + msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock" + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "Filters": filters, + }).Error(msg) + return nil, errors.New(msg) + } + delete(missing, ctr.UUID) + } } return next, nil } -func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) { +// Fetch all pages of containers. +// +// Except: if maxNonSuper>0, stop fetching more pages after receving +// that many non-supervisor containers. Along with {Order: "priority +// desc"}, this enables fetching enough high priority scheduling-ready +// containers to make progress, without necessarily fetching the +// entire queue. +func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) { var results []arvados.Container params := initialParams params.Offset = 0 + nonSuper := 0 for { // This list variable must be a new one declared // inside the loop: otherwise, items in the API @@ -421,8 +536,23 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C break } + // Conserve memory by deleting mounts that aren't + // relevant to choosing the instance type. + for _, c := range list.Items { + for path, mnt := range c.Mounts { + if mnt.Kind != "tmp" { + delete(c.Mounts, path) + } + } + if !c.SchedulingParameters.Supervisor { + nonSuper++ + } + } + results = append(results, list.Items...) - if len(params.Order) == 1 && params.Order == "uuid" { + if maxNonSuper > 0 && nonSuper >= maxNonSuper { + break + } else if params.Order == "uuid" { params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID}) } else { params.Offset += len(list.Items) @@ -430,3 +560,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C } return results, nil } + +func (cq *Queue) runMetrics(reg *prometheus.Registry) { + mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "queue_entries", + Help: "Number of active container entries in the controller database.", + }, []string{"state", "instance_type"}) + reg.MustRegister(mEntries) + + type entKey struct { + state arvados.ContainerState + inst string + } + count := map[entKey]int{} + + ch := cq.Subscribe() + defer cq.Unsubscribe(ch) + for range ch { + for k := range count { + count[k] = 0 + } + ents, _ := cq.Entries() + for _, ent := range ents { + count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++ + } + for k, v := range count { + mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v)) + } + } +}