X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c232747bc0896ada224d0453de58d24a6f14cf9b..7345838cb097f11e2ba8239020762ae867591510:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index b1f8ea2223..bc6574a21a 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -9,6 +9,7 @@ package scheduler import ( "context" "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -46,23 +47,26 @@ type Scheduler struct { stop chan struct{} stopped chan struct{} - last503time time.Time // last time API responded 503 - maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue() - supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners) - maxInstances int // maximum number of instances the pool will bring up (0 = unlimited) + last503time time.Time // last time API responded 503 + maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue() + supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners) + maxInstances int // maximum number of instances the pool will bring up (0 = unlimited) + instancesWithinQuota int // max concurrency achieved since last quota error (0 = no quota error yet) mContainersAllocatedNotStarted prometheus.Gauge mContainersNotAllocatedOverQuota prometheus.Gauge mLongestWaitTimeSinceQueue prometheus.Gauge mLast503Time prometheus.Gauge mMaxContainerConcurrency prometheus.Gauge + + lastQueue atomic.Value // stores a []QueueEnt } // New returns a new unstarted Scheduler. // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler { +func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler { sch := &Scheduler{ logger: ctxlog.FromContext(ctx), client: client, @@ -75,10 +79,14 @@ func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool stop: make(chan struct{}), stopped: make(chan struct{}), uuidOp: map[string]string{}, - maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted supervisorFraction: supervisorFraction, maxInstances: maxInstances, } + if minQuota > 0 { + sch.maxConcurrency = minQuota + } else { + sch.maxConcurrency = maxInstances + } sch.registerMetrics(reg) return sch } @@ -122,6 +130,18 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) { Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.", }) reg.MustRegister(sch.mMaxContainerConcurrency) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "at_quota", + Help: "Flag indicating the cloud driver is reporting an at-quota condition.", + }, func() float64 { + if sch.pool.AtQuota() { + return 1 + } else { + return 0 + } + })) } func (sch *Scheduler) updateMetrics() {