// shuts down idle workers, in case they are consuming quota.
type Scheduler struct {
logger logrus.FieldLogger
+ client *arvados.Client
queue ContainerQueue
pool WorkerPool
reg *prometheus.Registry
stop chan struct{}
stopped chan struct{}
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ maxSupervisors int // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
mLongestWaitTimeSinceQueue prometheus.Gauge
+ mLast503Time prometheus.Gauge
+ mMaxContainerConcurrency prometheus.Gauge
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
+ client: client,
queue: queue,
pool: pool,
reg: reg,
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
+ maxSupervisors: maxSupervisors,
}
sch.registerMetrics(reg)
return sch
Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
})
reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+ sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "last_503_time",
+ Help: "Time of most recent 503 error received from API.",
+ })
+ reg.MustRegister(sch.mLast503Time)
+ sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "max_concurrent_containers",
+ Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+ })
+ reg.MustRegister(sch.mMaxContainerConcurrency)
}
func (sch *Scheduler) updateMetrics() {
- earliest := time.Now()
+ earliest := time.Time{}
entries, _ := sch.queue.Entries()
running := sch.pool.Running()
for _, ent := range entries {
// ContainerStateLocked and running on a worker, most likely loading the
// payload image
if _, ok := running[ent.Container.UUID]; !ok {
- if ent.Container.CreatedAt.Before(earliest) {
+ if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
earliest = ent.Container.CreatedAt
}
}