stop chan struct{}
stopped chan struct{}
- last503time time.Time // last time API responded 503
- maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
- maxSupervisors int // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
client: client,
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
- maxSupervisors: maxSupervisors,
+ maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted
+ supervisorFraction: supervisorFraction,
+ maxInstances: maxInstances,
}
sch.registerMetrics(reg)
return sch
}
// Keep the queue up to date.
- poll := time.NewTicker(sch.queueUpdateInterval)
- defer poll.Stop()
go func() {
- for range poll.C {
+ for {
+ starttime := time.Now()
err := sch.queue.Update()
if err != nil {
sch.logger.Errorf("error updating queue: %s", err)
}
+ // If the previous update took a long time,
+ // that probably means the server is
+ // overloaded, so wait that long before doing
+ // another. Otherwise, wait for the configured
+ // poll interval.
+ delay := time.Since(starttime)
+ if delay < sch.queueUpdateInterval {
+ delay = sch.queueUpdateInterval
+ }
+ time.Sleep(delay)
}
}()