"github.com/sirupsen/logrus"
)
-type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+// Stop fetching queued containers after this many of the highest
+// priority non-supervisor containers. Reduces API load when queue is
+// long. This also limits how quickly a large batch of queued
+// containers can be started, which improves reliability under high
+// load at the cost of increased under light load.
+const queuedContainersTarget = 100
+
+type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
// An APIClient performs Arvados API requests. It is typically an
// *arvados.Client.
// A QueueEnt is an entry in the queue, consisting of a container
// record and the instance type that should be used to run it.
type QueueEnt struct {
- // The container to run. Only the UUID, State, Priority, and
- // RuntimeConstraints fields are populated.
- Container arvados.Container `json:"container"`
- InstanceType arvados.InstanceType `json:"instance_type"`
+ // The container to run. Only the UUID, State, Priority,
+ // RuntimeConstraints, ContainerImage, SchedulingParameters,
+ // and CreatedAt fields are populated.
+ Container arvados.Container `json:"container"`
+ InstanceTypes []arvados.InstanceType `json:"instance_types"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
- if ctr, ok := cq.current[uuid]; !ok {
+ ctr, ok := cq.current[uuid]
+ if !ok {
return arvados.Container{}, false
- } else {
- return ctr.Container, true
}
+ return ctr.Container, true
}
// Entries returns all cache entries, keyed by container UUID.
delete(cq.current, uuid)
}
+// Caller must have lock.
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
- it, err := cq.chooseType(&ctr)
+ logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+ // We didn't ask for the Mounts field when polling
+ // controller/RailsAPI, because it can be expensive on the
+ // Rails side, and most of the time we already have it. But
+ // this is the first time we're seeing this container, so we
+ // need to fetch mounts in order to choose an instance type.
+ err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
+ Select: []string{"mounts"},
+ })
+ if err != nil {
+ logger.WithError(err).Warn("error getting mounts")
+ return
+ }
+ types, err := cq.chooseType(&ctr)
+
+ // Avoid wasting memory on a large Mounts attr (we don't need
+ // it after choosing type).
+ ctr.Mounts = nil
+
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
// We assume here that any chooseType error is a hard
// error: it wouldn't help to try again, or to leave
// it for a different dispatcher process to attempt.
errorString := err.Error()
- logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
logger.WithError(err).Warn("cancel container with no suitable instance type")
go func() {
if ctr.State == arvados.ContainerStateQueued {
// Can't set runtime error without
- // locking first. If Lock() is
- // successful, it will call addEnt()
- // again itself, and we'll fall
- // through to the
- // setRuntimeError/Cancel code below.
+ // locking first.
err := cq.Lock(ctr.UUID)
if err != nil {
logger.WithError(err).Warn("lock failed")
+ return
// ...and try again on the
// next Update, if the problem
// still exists.
}
- return
}
var err error
defer func() {
}()
return
}
+ typeNames := ""
+ for _, it := range types {
+ if typeNames != "" {
+ typeNames += ", "
+ }
+ typeNames += it.Name
+ }
cq.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"State": ctr.State,
"Priority": ctr.Priority,
- "InstanceType": it.Name,
+ "InstanceTypes": typeNames,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
}
// Lock acquires the dispatch lock for the given container.
if cq.dontupdate != nil {
cq.dontupdate[uuid] = struct{}{}
}
- if ent, ok := cq.current[uuid]; !ok {
- cq.addEnt(uuid, resp)
- } else {
- ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
- cq.current[uuid] = ent
+ ent, ok := cq.current[uuid]
+ if !ok {
+ // Container is not in queue (e.g., it was not added
+ // because there is no suitable instance type, and
+ // we're just locking/updating it in order to set an
+ // error message). No need to add it, and we don't
+ // necessarily have enough information to add it here
+ // anyway because lock/unlock responses don't include
+ // runtime_constraints.
+ return
}
+ ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+ cq.current[uuid] = ent
cq.notify()
}
*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
Limit: &limitParam,
Count: "none",
Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
- })
+ }, 0)
if err != nil {
return nil, err
}
avail, err := cq.fetchAll(arvados.ResourceListParams{
Select: selectParam,
- Order: "uuid",
+ Order: "priority desc",
Limit: &limitParam,
Count: "none",
Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
- })
+ }, queuedContainersTarget)
if err != nil {
return nil, err
}
apply(avail)
+ // Check for containers that we already know about but weren't
+ // returned by any of the above queries, and fetch them
+ // explicitly by UUID. If they're in a final state we can drop
+ // them, but otherwise we need to apply updates, e.g.,
+ //
+ // - Queued container priority has been reduced
+ // - Locked container has been requeued with lower priority
missing := map[string]bool{}
cq.mtx.Lock()
for uuid, ent := range cq.current {
Order: "uuid",
Count: "none",
Filters: filters,
- })
+ }, 0)
if err != nil {
return nil, err
}
return next, nil
}
-func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+// Fetch all pages of containers.
+//
+// Except: if maxNonSuper>0, stop fetching more pages after receving
+// that many non-supervisor containers. Along with {Order: "priority
+// desc"}, this enables fetching enough high priority scheduling-ready
+// containers to make progress, without necessarily fetching the
+// entire queue.
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams, maxNonSuper int) ([]arvados.Container, error) {
var results []arvados.Container
params := initialParams
params.Offset = 0
+ nonSuper := 0
for {
// This list variable must be a new one declared
// inside the loop: otherwise, items in the API
break
}
+ // Conserve memory by deleting mounts that aren't
+ // relevant to choosing the instance type.
+ for _, c := range list.Items {
+ for path, mnt := range c.Mounts {
+ if mnt.Kind != "tmp" {
+ delete(c.Mounts, path)
+ }
+ }
+ if !c.SchedulingParameters.Supervisor {
+ nonSuper++
+ }
+ }
+
results = append(results, list.Items...)
- if len(params.Order) == 1 && params.Order == "uuid" {
+ if maxNonSuper > 0 && nonSuper >= maxNonSuper {
+ break
+ } else if params.Order == "uuid" {
params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
} else {
params.Offset += len(list.Items)
}
ents, _ := cq.Entries()
for _, ent := range ents {
- count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
}
for k, v := range count {
mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))