X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/407fc461d20ece8b11b7b56f29a3caff3083ff8d..4447b5a61f79edf2411ba94f4ad5d90e1ca7e220:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index b1d369ed24..21510ee091 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -31,6 +31,7 @@ import ( // shuts down idle workers, in case they are consuming quota. type Scheduler struct { logger logrus.FieldLogger + client *arvados.Client queue ContainerQueue pool WorkerPool reg *prometheus.Registry @@ -45,18 +46,25 @@ type Scheduler struct { stop chan struct{} stopped chan struct{} + last503time time.Time // last time API responded 503 + maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue() + maxSupervisors int // maximum number of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners) + mContainersAllocatedNotStarted prometheus.Gauge mContainersNotAllocatedOverQuota prometheus.Gauge mLongestWaitTimeSinceQueue prometheus.Gauge + mLast503Time prometheus.Gauge + mMaxContainerConcurrency prometheus.Gauge } // New returns a new unstarted Scheduler. // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { +func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxSupervisors int) *Scheduler { sch := &Scheduler{ logger: ctxlog.FromContext(ctx), + client: client, queue: queue, pool: pool, reg: reg, @@ -66,6 +74,7 @@ func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *promet stop: make(chan struct{}), stopped: make(chan struct{}), uuidOp: map[string]string{}, + maxSupervisors: maxSupervisors, } sch.registerMetrics(reg) return sch @@ -96,10 +105,24 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) { Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.", }) reg.MustRegister(sch.mLongestWaitTimeSinceQueue) + sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "last_503_time", + Help: "Time of most recent 503 error received from API.", + }) + reg.MustRegister(sch.mLast503Time) + sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "max_concurrent_containers", + Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.", + }) + reg.MustRegister(sch.mMaxContainerConcurrency) } func (sch *Scheduler) updateMetrics() { - earliest := time.Now() + earliest := time.Time{} entries, _ := sch.queue.Entries() running := sch.pool.Running() for _, ent := range entries { @@ -109,7 +132,7 @@ func (sch *Scheduler) updateMetrics() { // ContainerStateLocked and running on a worker, most likely loading the // payload image if _, ok := running[ent.Container.UUID]; !ok { - if ent.Container.CreatedAt.Before(earliest) { + if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() { earliest = ent.Container.CreatedAt } }