// load at the cost of increased under light load.
const queuedContainersTarget = 100
-type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
// An APIClient performs Arvados API requests. It is typically an
// *arvados.Client.
// record and the instance type that should be used to run it.
type QueueEnt struct {
// The container to run. Only the UUID, State, Priority,
- // RuntimeConstraints, Mounts, and ContainerImage fields are
- // populated.
- Container arvados.Container `json:"container"`
- InstanceType arvados.InstanceType `json:"instance_type"`
- FirstSeenAt time.Time `json:"first_seen_at"`
+ // RuntimeConstraints, ContainerImage, SchedulingParameters,
+ // and CreatedAt fields are populated.
+ Container arvados.Container `json:"container"`
+ InstanceTypes []arvados.InstanceType `json:"instance_types"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
// Caller must have lock.
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
- it, err := cq.chooseType(&ctr)
+ logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+ // We didn't ask for the Mounts field when polling
+ // controller/RailsAPI, because it can be expensive on the
+ // Rails side, and most of the time we already have it. But
+ // this is the first time we're seeing this container, so we
+ // need to fetch mounts in order to choose an instance type.
+ err := cq.client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+ctr.UUID, nil, arvados.GetOptions{
+ Select: []string{"mounts"},
+ })
+ if err != nil {
+ logger.WithError(err).Warn("error getting mounts")
+ return
+ }
+ types, err := cq.chooseType(&ctr)
// Avoid wasting memory on a large Mounts attr (we don't need
// it after choosing type).
// error: it wouldn't help to try again, or to leave
// it for a different dispatcher process to attempt.
errorString := err.Error()
- logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
logger.WithError(err).Warn("cancel container with no suitable instance type")
go func() {
if ctr.State == arvados.ContainerStateQueued {
}()
return
}
+ typeNames := ""
+ for _, it := range types {
+ if typeNames != "" {
+ typeNames += ", "
+ }
+ typeNames += it.Name
+ }
cq.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"State": ctr.State,
"Priority": ctr.Priority,
- "InstanceType": it.Name,
+ "InstanceTypes": typeNames,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
}
// Lock acquires the dispatch lock for the given container.
*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "scheduling_parameters", "created_at"}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
}
apply(avail)
+ // Check for containers that we already know about but weren't
+ // returned by any of the above queries, and fetch them
+ // explicitly by UUID. If they're in a final state we can drop
+ // them, but otherwise we need to apply updates, e.g.,
+ //
+ // - Queued container priority has been reduced
+ // - Locked container has been requeued with lower priority
missing := map[string]bool{}
cq.mtx.Lock()
for uuid, ent := range cq.current {
}
ents, _ := cq.Entries()
for _, ent := range ents {
- count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
}
for k, v := range count {
mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))