20182: Add option to limit the number of supervisor containers
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index b1d369ed2483ec15e4b36e8e01e18dbac5f11cc3..21510ee091110768c116e2d77f39be18850eefee 100644 (file)
@@ -31,6 +31,7 @@ import (
 // shuts down idle workers, in case they are consuming quota.
 type Scheduler struct {
        logger              logrus.FieldLogger
+       client              *arvados.Client
        queue               ContainerQueue
        pool                WorkerPool
        reg                 *prometheus.Registry
@@ -45,18 +46,25 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
+       last503time    time.Time // last time API responded 503
+       maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+       maxSupervisors int       // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
        mLongestWaitTimeSinceQueue       prometheus.Gauge
+       mLast503Time                     prometheus.Gauge
+       mMaxContainerConcurrency         prometheus.Gauge
 }
 
 // New returns a new unstarted Scheduler.
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
        sch := &Scheduler{
                logger:              ctxlog.FromContext(ctx),
+               client:              client,
                queue:               queue,
                pool:                pool,
                reg:                 reg,
@@ -66,6 +74,7 @@ func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *promet
                stop:                make(chan struct{}),
                stopped:             make(chan struct{}),
                uuidOp:              map[string]string{},
+               maxSupervisors:      maxSupervisors,
        }
        sch.registerMetrics(reg)
        return sch
@@ -96,10 +105,24 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
                Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
        })
        reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+       sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "last_503_time",
+               Help:      "Time of most recent 503 error received from API.",
+       })
+       reg.MustRegister(sch.mLast503Time)
+       sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "max_concurrent_containers",
+               Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+       })
+       reg.MustRegister(sch.mMaxContainerConcurrency)
 }
 
 func (sch *Scheduler) updateMetrics() {
-       earliest := time.Now()
+       earliest := time.Time{}
        entries, _ := sch.queue.Entries()
        running := sch.pool.Running()
        for _, ent := range entries {
@@ -109,7 +132,7 @@ func (sch *Scheduler) updateMetrics() {
                        // ContainerStateLocked and running on a worker, most likely loading the
                        // payload image
                        if _, ok := running[ent.Container.UUID]; !ok {
-                               if ent.Container.CreatedAt.Before(earliest) {
+                               if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
                                        earliest = ent.Container.CreatedAt
                                }
                        }