import (
"context"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
stop chan struct{}
stopped chan struct{}
- last503time time.Time // last time API responded 503
- maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
- supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
- maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+ instancesWithinQuota int // max concurrency achieved since last quota error (0 = no quota error yet)
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
mLongestWaitTimeSinceQueue prometheus.Gauge
mLast503Time prometheus.Gauge
mMaxContainerConcurrency prometheus.Gauge
+
+ lastQueue atomic.Value // stores a []QueueEnt
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
client: client,
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
- maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted
supervisorFraction: supervisorFraction,
maxInstances: maxInstances,
}
+ if minQuota > 0 {
+ sch.maxConcurrency = minQuota
+ } else {
+ sch.maxConcurrency = maxInstances
+ }
sch.registerMetrics(reg)
return sch
}
Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
})
reg.MustRegister(sch.mMaxContainerConcurrency)
+ reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "at_quota",
+ Help: "Flag indicating the cloud driver is reporting an at-quota condition.",
+ }, func() float64 {
+ if sch.pool.AtQuota() {
+ return 1
+ } else {
+ return 0
+ }
+ }))
}
func (sch *Scheduler) updateMetrics() {