Merge branch '21123-scheduling-status'
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index b1f8ea222329e981334739af30e12bbb1edf7d44..bc6574a21a538134c618320f9e97511b84d9b307 100644 (file)
@@ -9,6 +9,7 @@ package scheduler
 import (
        "context"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -46,23 +47,26 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
-       last503time        time.Time // last time API responded 503
-       maxConcurrency     int       // dynamic container limit (0 = unlimited), see runQueue()
-       supervisorFraction float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
-       maxInstances       int       // maximum number of instances the pool will bring up (0 = unlimited)
+       last503time          time.Time // last time API responded 503
+       maxConcurrency       int       // dynamic container limit (0 = unlimited), see runQueue()
+       supervisorFraction   float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+       maxInstances         int       // maximum number of instances the pool will bring up (0 = unlimited)
+       instancesWithinQuota int       // max concurrency achieved since last quota error (0 = no quota error yet)
 
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
        mLongestWaitTimeSinceQueue       prometheus.Gauge
        mLast503Time                     prometheus.Gauge
        mMaxContainerConcurrency         prometheus.Gauge
+
+       lastQueue atomic.Value // stores a []QueueEnt
 }
 
 // New returns a new unstarted Scheduler.
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler {
        sch := &Scheduler{
                logger:              ctxlog.FromContext(ctx),
                client:              client,
@@ -75,10 +79,14 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool
                stop:                make(chan struct{}),
                stopped:             make(chan struct{}),
                uuidOp:              map[string]string{},
-               maxConcurrency:      maxInstances, // initial value -- will be dynamically adjusted
                supervisorFraction:  supervisorFraction,
                maxInstances:        maxInstances,
        }
+       if minQuota > 0 {
+               sch.maxConcurrency = minQuota
+       } else {
+               sch.maxConcurrency = maxInstances
+       }
        sch.registerMetrics(reg)
        return sch
 }
@@ -122,6 +130,18 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
                Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
        })
        reg.MustRegister(sch.mMaxContainerConcurrency)
+       reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "at_quota",
+               Help:      "Flag indicating the cloud driver is reporting an at-quota condition.",
+       }, func() float64 {
+               if sch.pool.AtQuota() {
+                       return 1
+               } else {
+                       return 0
+               }
+       }))
 }
 
 func (sch *Scheduler) updateMetrics() {