"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// shuts down idle workers, in case they are consuming quota.
type Scheduler struct {
logger logrus.FieldLogger
+ client *arvados.Client
queue ContainerQueue
pool WorkerPool
+ reg *prometheus.Registry
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
runOnce sync.Once
stop chan struct{}
stopped chan struct{}
+
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+ supervisorFraction float64 // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+ maxInstances int // maximum number of instances the pool will bring up (0 = unlimited)
+
+ mContainersAllocatedNotStarted prometheus.Gauge
+ mContainersNotAllocatedOverQuota prometheus.Gauge
+ mLongestWaitTimeSinceQueue prometheus.Gauge
+ mLast503Time prometheus.Gauge
+ mMaxContainerConcurrency prometheus.Gauge
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
- return &Scheduler{
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration, maxInstances int, supervisorFraction float64) *Scheduler {
+ sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
+ client: client,
queue: queue,
pool: pool,
+ reg: reg,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
wakeup: time.NewTimer(time.Second),
stop: make(chan struct{}),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
+ maxConcurrency: maxInstances, // initial value -- will be dynamically adjusted
+ supervisorFraction: supervisorFraction,
+ maxInstances: maxInstances,
+ }
+ sch.registerMetrics(reg)
+ return sch
+}
+
+func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
+ if reg == nil {
+ reg = prometheus.NewRegistry()
+ }
+ sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_allocated_not_started",
+ Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
+ })
+ reg.MustRegister(sch.mContainersAllocatedNotStarted)
+ sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_not_allocated_over_quota",
+ Help: "Number of containers not allocated to a worker because the system has hit a quota.",
+ })
+ reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
+ sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_longest_wait_time_seconds",
+ Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
+ })
+ reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+ sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "last_503_time",
+ Help: "Time of most recent 503 error received from API.",
+ })
+ reg.MustRegister(sch.mLast503Time)
+ sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "max_concurrent_containers",
+ Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+ })
+ reg.MustRegister(sch.mMaxContainerConcurrency)
+}
+
+func (sch *Scheduler) updateMetrics() {
+ earliest := time.Time{}
+ entries, _ := sch.queue.Entries()
+ running := sch.pool.Running()
+ for _, ent := range entries {
+ if ent.Container.Priority > 0 &&
+ (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+ // Exclude containers that are preparing to run the payload (i.e.
+ // ContainerStateLocked and running on a worker, most likely loading the
+ // payload image
+ if _, ok := running[ent.Container.UUID]; !ok {
+ if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
+ earliest = ent.Container.CreatedAt
+ }
+ }
+ }
+ }
+ if !earliest.IsZero() {
+ sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
+ } else {
+ sch.mLongestWaitTimeSinceQueue.Set(0)
}
}
}
// Keep the queue up to date.
- poll := time.NewTicker(sch.queueUpdateInterval)
- defer poll.Stop()
go func() {
- for range poll.C {
+ for {
+ starttime := time.Now()
err := sch.queue.Update()
if err != nil {
sch.logger.Errorf("error updating queue: %s", err)
}
+ // If the previous update took a long time,
+ // that probably means the server is
+ // overloaded, so wait that long before doing
+ // another. Otherwise, wait for the configured
+ // poll interval.
+ delay := time.Since(starttime)
+ if delay < sch.queueUpdateInterval {
+ delay = sch.queueUpdateInterval
+ }
+ time.Sleep(delay)
}
}()
for {
sch.runQueue()
sch.sync()
+ sch.updateMetrics()
select {
case <-sch.stop:
return