X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3d7b9154170eb4217119ed2279a6e8c365d9288c..7499f61a2912cfdb1a316808fafa6e6ee77ee2e0:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index 070b2de006..c3e67dd11f 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -7,9 +7,13 @@ package scheduler import ( + "context" "sync" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -29,31 +33,92 @@ type Scheduler struct { logger logrus.FieldLogger queue ContainerQueue pool WorkerPool + reg *prometheus.Registry staleLockTimeout time.Duration queueUpdateInterval time.Duration - locking map[string]bool - mtx sync.Mutex + uuidOp map[string]string // operation in progress: "lock", "cancel", ... + mtx sync.Mutex + wakeup *time.Timer runOnce sync.Once stop chan struct{} stopped chan struct{} + + mContainersAllocatedNotStarted prometheus.Gauge + mContainersNotAllocatedOverQuota prometheus.Gauge + mLongestWaitTimeSinceQueue prometheus.Gauge } // New returns a new unstarted Scheduler. // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { - return &Scheduler{ - logger: logger, +func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { + sch := &Scheduler{ + logger: ctxlog.FromContext(ctx), queue: queue, pool: pool, + reg: reg, staleLockTimeout: staleLockTimeout, queueUpdateInterval: queueUpdateInterval, + wakeup: time.NewTimer(time.Second), stop: make(chan struct{}), stopped: make(chan struct{}), - locking: map[string]bool{}, + uuidOp: map[string]string{}, + } + sch.registerMetrics(reg) + return sch +} + +func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) { + if reg == nil { + reg = prometheus.NewRegistry() + } + sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_allocated_not_started", + Help: "Number of containers allocated to a worker but not started yet (worker is booting).", + }) + reg.MustRegister(sch.mContainersAllocatedNotStarted) + sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_not_allocated_over_quota", + Help: "Number of containers not allocated to a worker because the system has hit a quota.", + }) + reg.MustRegister(sch.mContainersNotAllocatedOverQuota) + sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_longest_wait_time_seconds", + Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.", + }) + reg.MustRegister(sch.mLongestWaitTimeSinceQueue) +} + +func (sch *Scheduler) updateMetrics() { + earliest := time.Time{} + entries, _ := sch.queue.Entries() + running := sch.pool.Running() + for _, ent := range entries { + if ent.Container.Priority > 0 && + (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + // Exclude containers that are preparing to run the payload (i.e. + // ContainerStateLocked and running on a worker, most likely loading the + // payload image + if _, ok := running[ent.Container.UUID]; !ok { + if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() { + earliest = ent.Container.CreatedAt + } + } + } + } + if !earliest.IsZero() { + sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds()) + } else { + sch.mLongestWaitTimeSinceQueue.Set(0) } } @@ -109,11 +174,13 @@ func (sch *Scheduler) run() { for { sch.runQueue() sch.sync() + sch.updateMetrics() select { case <-sch.stop: return case <-queueNotify: case <-poolNotify: + case <-sch.wakeup.C: } } }