19973: Add metrics for automatic container concurrency limit.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index c3e67dd11f70a4e00c8a74f59826efb13bf0e35c..4644dc4ea4db00782b38589f546f5cb22d577e88 100644 (file)
@@ -31,6 +31,7 @@ import (
 // shuts down idle workers, in case they are consuming quota.
 type Scheduler struct {
        logger              logrus.FieldLogger
+       client              *arvados.Client
        queue               ContainerQueue
        pool                WorkerPool
        reg                 *prometheus.Registry
@@ -45,18 +46,24 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
+       last503time    time.Time // last time API responded 503
+       maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
        mLongestWaitTimeSinceQueue       prometheus.Gauge
+       mLast503Time                     prometheus.Gauge
+       mMaxContainerConcurrency         prometheus.Gauge
 }
 
 // New returns a new unstarted Scheduler.
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
        sch := &Scheduler{
                logger:              ctxlog.FromContext(ctx),
+               client:              client,
                queue:               queue,
                pool:                pool,
                reg:                 reg,
@@ -96,6 +103,20 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
                Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
        })
        reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+       sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "last_503_time",
+               Help:      "Time of most recent 503 error received from API.",
+       })
+       reg.MustRegister(sch.mLast503Time)
+       sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "max_concurrent_containers",
+               Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+       })
+       reg.MustRegister(sch.mMaxContainerConcurrency)
 }
 
 func (sch *Scheduler) updateMetrics() {