"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// A QueueEnt is an entry in the queue, consisting of a container
// record and the instance type that should be used to run it.
type QueueEnt struct {
- // The container to run. Only the UUID, State, Priority, and
- // RuntimeConstraints fields are populated.
+ // The container to run. Only the UUID, State, Priority,
+ // RuntimeConstraints, Mounts, and ContainerImage fields are
+ // populated.
Container arvados.Container `json:"container"`
InstanceType arvados.InstanceType `json:"instance_type"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
cq.mtx.Lock()
defer cq.mtx.Unlock()
ctr := cq.current[uuid].Container
- if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+ if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
cq.delEnt(uuid, ctr.State)
}
}
func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
- if ctr, ok := cq.current[uuid]; !ok {
+ ctr, ok := cq.current[uuid]
+ if !ok {
return arvados.Container{}, false
- } else {
- return ctr.Container, true
}
+ return ctr.Container, true
}
// Entries returns all cache entries, keyed by container UUID.
delete(cq.current, uuid)
}
+// Caller must have lock.
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
go func() {
if ctr.State == arvados.ContainerStateQueued {
// Can't set runtime error without
- // locking first. If Lock() is
- // successful, it will call addEnt()
- // again itself, and we'll fall
- // through to the
- // setRuntimeError/Cancel code below.
+ // locking first.
err := cq.Lock(ctr.UUID)
if err != nil {
logger.WithError(err).Warn("lock failed")
+ return
// ...and try again on the
// next Update, if the problem
// still exists.
}
- return
}
var err error
defer func() {
"Priority": ctr.Priority,
"InstanceType": it.Name,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
}
// Lock acquires the dispatch lock for the given container.
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
- err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+ var resp arvados.Container
+ err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
"container": {"state": arvados.ContainerStateCancelled},
})
if err != nil {
return err
}
- cq.mtx.Lock()
- defer cq.mtx.Unlock()
- cq.notify()
+ cq.updateWithResp(uuid, resp)
return nil
}
if err != nil {
return err
}
+ cq.updateWithResp(uuid, resp)
+ return nil
+}
+// Update the local queue with the response received from a
+// state-changing API request (lock/unlock/cancel).
+func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
if cq.dontupdate != nil {
cq.dontupdate[uuid] = struct{}{}
}
- if ent, ok := cq.current[uuid]; !ok {
- cq.addEnt(uuid, resp)
- } else {
- ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
- cq.current[uuid] = ent
+ ent, ok := cq.current[uuid]
+ if !ok {
+ // Container is not in queue (e.g., it was not added
+ // because there is no suitable instance type, and
+ // we're just locking/updating it in order to set an
+ // error message). No need to add it, and we don't
+ // necessarily have enough information to add it here
+ // anyway because lock/unlock responses don't include
+ // runtime_constraints.
+ return
}
+ ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+ cq.current[uuid] = ent
cq.notify()
- return nil
}
func (cq *Queue) poll() (map[string]*arvados.Container, error) {
*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}