20601: Rename Scheduler.maxSupervisors -> Scheduler.supervisorFraction
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index 4644dc4ea4db00782b38589f546f5cb22d577e88..b1f8ea222329e981334739af30e12bbb1edf7d44 100644 (file)
@@ -46,8 +46,10 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
-       last503time    time.Time // last time API responded 503
-       maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+       last503time        time.Time // last time API responded 503
+       maxConcurrency     int       // dynamic container limit (0 = unlimited), see runQueue()
+       supervisorFraction float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+       maxInstances       int       // maximum number of instances the pool will bring up (0 = unlimited)
 
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
@@ -60,7 +62,7 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
        sch := &Scheduler{
                logger:              ctxlog.FromContext(ctx),
                client:              client,
@@ -73,6 +75,9 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool
                stop:                make(chan struct{}),
                stopped:             make(chan struct{}),
                uuidOp:              map[string]string{},
+               maxConcurrency:      maxInstances, // initial value -- will be dynamically adjusted
+               supervisorFraction:  supervisorFraction,
+               maxInstances:        maxInstances,
        }
        sch.registerMetrics(reg)
        return sch
@@ -170,14 +175,23 @@ func (sch *Scheduler) run() {
        }
 
        // Keep the queue up to date.
-       poll := time.NewTicker(sch.queueUpdateInterval)
-       defer poll.Stop()
        go func() {
-               for range poll.C {
+               for {
+                       starttime := time.Now()
                        err := sch.queue.Update()
                        if err != nil {
                                sch.logger.Errorf("error updating queue: %s", err)
                        }
+                       // If the previous update took a long time,
+                       // that probably means the server is
+                       // overloaded, so wait that long before doing
+                       // another. Otherwise, wait for the configured
+                       // poll interval.
+                       delay := time.Since(starttime)
+                       if delay < sch.queueUpdateInterval {
+                               delay = sch.queueUpdateInterval
+                       }
+                       time.Sleep(delay)
                }
        }()