X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aed47afc7647f24af6541ff03a0cc0649a5358ee..0020a0bc96ee13203fbdc2af28ffa077799213d0:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 4e807a12ab..50e73189ef 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -53,7 +53,6 @@ func (c *QueueEnt) String() string { // cache up to date. type Queue struct { logger logrus.FieldLogger - reg *prometheus.Registry chooseType typeChooser client APIClient @@ -79,14 +78,17 @@ type Queue struct { // Arvados cluster's queue during Update, chooseType will be called to // assign an appropriate arvados.InstanceType for the queue entry. func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue { - return &Queue{ + cq := &Queue{ logger: logger, - reg: reg, chooseType: chooseType, client: client, current: map[string]QueueEnt{}, subscribers: map[<-chan struct{}]chan struct{}{}, } + if reg != nil { + go cq.runMetrics(reg) + } + return cq } // Subscribe returns a channel that becomes ready to receive when an @@ -312,15 +314,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error { // Cancel cancels the given container. func (cq *Queue) Cancel(uuid string) error { - err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ + var resp arvados.Container + err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ "container": {"state": arvados.ContainerStateCancelled}, }) if err != nil { return err } - cq.mtx.Lock() - defer cq.mtx.Unlock() - cq.notify() + cq.updateWithResp(uuid, resp) return nil } @@ -330,7 +331,13 @@ func (cq *Queue) apiUpdate(uuid, action string) error { if err != nil { return err } + cq.updateWithResp(uuid, resp) + return nil +} +// Update the local queue with the response received from a +// state-changing API request (lock/unlock/cancel). +func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) { cq.mtx.Lock() defer cq.mtx.Unlock() if cq.dontupdate != nil { @@ -343,7 +350,6 @@ func (cq *Queue) apiUpdate(uuid, action string) error { cq.current[uuid] = ent } cq.notify() - return nil } func (cq *Queue) poll() (map[string]*arvados.Container, error) { @@ -487,3 +493,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C } return results, nil } + +func (cq *Queue) runMetrics(reg *prometheus.Registry) { + mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "queue_entries", + Help: "Number of active container entries in the controller database.", + }, []string{"state", "instance_type"}) + reg.MustRegister(mEntries) + + type entKey struct { + state arvados.ContainerState + inst string + } + count := map[entKey]int{} + + ch := cq.Subscribe() + defer cq.Unsubscribe(ch) + for range ch { + for k := range count { + count[k] = 0 + } + ents, _ := cq.Entries() + for _, ent := range ents { + count[entKey{ent.Container.State, ent.InstanceType.Name}]++ + } + for k, v := range count { + mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v)) + } + } +}