X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7f4da9388e3d5ec8f38f6d6408916d1d46dfb10f..d90fffe9d937d3e05c04106904b18dc4da235bc6:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index 6fd47e9194..ee7ab50883 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -30,6 +31,7 @@ import ( // shuts down idle workers, in case they are consuming quota. type Scheduler struct { logger logrus.FieldLogger + client *arvados.Client queue ContainerQueue pool WorkerPool reg *prometheus.Registry @@ -44,17 +46,27 @@ type Scheduler struct { stop chan struct{} stopped chan struct{} + last503time time.Time // last time API responded 503 + maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue() + supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners) + maxInstances int // maximum number of instances the pool will bring up (0 = unlimited) + instancesWithinQuota int // max concurrency achieved since last quota error (0 = no quota error yet) + mContainersAllocatedNotStarted prometheus.Gauge mContainersNotAllocatedOverQuota prometheus.Gauge + mLongestWaitTimeSinceQueue prometheus.Gauge + mLast503Time prometheus.Gauge + mMaxContainerConcurrency prometheus.Gauge } // New returns a new unstarted Scheduler. // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { +func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, minQuota, maxInstances int, supervisorFraction float64) *Scheduler { sch := &Scheduler{ logger: ctxlog.FromContext(ctx), + client: client, queue: queue, pool: pool, reg: reg, @@ -64,6 +76,13 @@ func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *promet stop: make(chan struct{}), stopped: make(chan struct{}), uuidOp: map[string]string{}, + supervisorFraction: supervisorFraction, + maxInstances: maxInstances, + } + if minQuota > 0 { + sch.maxConcurrency = minQuota + } else { + sch.maxConcurrency = maxInstances } sch.registerMetrics(reg) return sch @@ -87,6 +106,63 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) { Help: "Number of containers not allocated to a worker because the system has hit a quota.", }) reg.MustRegister(sch.mContainersNotAllocatedOverQuota) + sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_longest_wait_time_seconds", + Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.", + }) + reg.MustRegister(sch.mLongestWaitTimeSinceQueue) + sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "last_503_time", + Help: "Time of most recent 503 error received from API.", + }) + reg.MustRegister(sch.mLast503Time) + sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "max_concurrent_containers", + Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.", + }) + reg.MustRegister(sch.mMaxContainerConcurrency) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "at_quota", + Help: "Flag indicating the cloud driver is reporting an at-quota condition.", + }, func() float64 { + if sch.pool.AtQuota() { + return 1 + } else { + return 0 + } + })) +} + +func (sch *Scheduler) updateMetrics() { + earliest := time.Time{} + entries, _ := sch.queue.Entries() + running := sch.pool.Running() + for _, ent := range entries { + if ent.Container.Priority > 0 && + (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + // Exclude containers that are preparing to run the payload (i.e. + // ContainerStateLocked and running on a worker, most likely loading the + // payload image + if _, ok := running[ent.Container.UUID]; !ok { + if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() { + earliest = ent.Container.CreatedAt + } + } + } + } + if !earliest.IsZero() { + sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds()) + } else { + sch.mLongestWaitTimeSinceQueue.Set(0) + } } // Start starts the scheduler. @@ -116,14 +192,23 @@ func (sch *Scheduler) run() { } // Keep the queue up to date. - poll := time.NewTicker(sch.queueUpdateInterval) - defer poll.Stop() go func() { - for range poll.C { + for { + starttime := time.Now() err := sch.queue.Update() if err != nil { sch.logger.Errorf("error updating queue: %s", err) } + // If the previous update took a long time, + // that probably means the server is + // overloaded, so wait that long before doing + // another. Otherwise, wait for the configured + // poll interval. + delay := time.Since(starttime) + if delay < sch.queueUpdateInterval { + delay = sch.queueUpdateInterval + } + time.Sleep(delay) } }() @@ -141,6 +226,7 @@ func (sch *Scheduler) run() { for { sch.runQueue() sch.sync() + sch.updateMetrics() select { case <-sch.stop: return