// shuts down idle workers, in case they are consuming quota.
type Scheduler struct {
logger logrus.FieldLogger
+ client *arvados.Client
queue ContainerQueue
pool WorkerPool
reg *prometheus.Registry
stop chan struct{}
stopped chan struct{}
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
mLongestWaitTimeSinceQueue prometheus.Gauge
+ mLast503Time prometheus.Gauge
+ mMaxContainerConcurrency prometheus.Gauge
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
+ client: client,
queue: queue,
pool: pool,
reg: reg,
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
+ maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted
+ supervisorFraction: supervisorFraction,
+ maxInstances: maxInstances,
}
sch.registerMetrics(reg)
return sch
Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
})
reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+ sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "last_503_time",
+ Help: "Time of most recent 503 error received from API.",
+ })
+ reg.MustRegister(sch.mLast503Time)
+ sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "max_concurrent_containers",
+ Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+ })
+ reg.MustRegister(sch.mMaxContainerConcurrency)
}
func (sch *Scheduler) updateMetrics() {
}
// Keep the queue up to date.
- poll := time.NewTicker(sch.queueUpdateInterval)
- defer poll.Stop()
go func() {
- for range poll.C {
+ for {
+ starttime := time.Now()
err := sch.queue.Update()
if err != nil {
sch.logger.Errorf("error updating queue: %s", err)
}
+ // If the previous update took a long time,
+ // that probably means the server is
+ // overloaded, so wait that long before doing
+ // another. Otherwise, wait for the configured
+ // poll interval.
+ delay := time.Since(starttime)
+ if delay < sch.queueUpdateInterval {
+ delay = sch.queueUpdateInterval
+ }
+ time.Sleep(delay)
}
}()