19177: Fixes tests.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index 6fd47e919411d69026316d24d8643e5693d06f38..c3e67dd11f70a4e00c8a74f59826efb13bf0e35c 100644 (file)
@@ -11,6 +11,7 @@ import (
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -46,6 +47,7 @@ type Scheduler struct {
 
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
+       mLongestWaitTimeSinceQueue       prometheus.Gauge
 }
 
 // New returns a new unstarted Scheduler.
@@ -87,6 +89,37 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers not allocated to a worker because the system has hit a quota.",
        })
        reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
+       sch.mLongestWaitTimeSinceQueue = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "containers_longest_wait_time_seconds",
+               Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
+       })
+       reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+}
+
+func (sch *Scheduler) updateMetrics() {
+       earliest := time.Time{}
+       entries, _ := sch.queue.Entries()
+       running := sch.pool.Running()
+       for _, ent := range entries {
+               if ent.Container.Priority > 0 &&
+                       (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+                       // Exclude containers that are preparing to run the payload (i.e.
+                       // ContainerStateLocked and running on a worker, most likely loading the
+                       // payload image
+                       if _, ok := running[ent.Container.UUID]; !ok {
+                               if ent.Container.CreatedAt.Before(earliest) || earliest.IsZero() {
+                                       earliest = ent.Container.CreatedAt
+                               }
+                       }
+               }
+       }
+       if !earliest.IsZero() {
+               sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
+       } else {
+               sch.mLongestWaitTimeSinceQueue.Set(0)
+       }
 }
 
 // Start starts the scheduler.
@@ -141,6 +174,7 @@ func (sch *Scheduler) run() {
        for {
                sch.runQueue()
                sch.sync()
+               sch.updateMetrics()
                select {
                case <-sch.stop:
                        return