16636: implement review feedback.
authorWard Vandewege <ward@curii.com>
Tue, 15 Sep 2020 17:59:33 +0000 (13:59 -0400)
committerWard Vandewege <ward@curii.com>
Tue, 15 Sep 2020 17:59:33 +0000 (13:59 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

lib/dispatchcloud/container/queue.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/worker/pool.go

index 45b346383fab8641b27d16063a46bc4468fc96ce..a1ff414b73ecbd3be22276b716a64af6eaa3f225 100644 (file)
@@ -382,7 +382,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                        *next[upd.UUID] = upd
                }
        }
-       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters"}
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
        limitParam := 1000
 
        mine, err := cq.fetchAll(arvados.ResourceListParams{
index 9bbb064fee6a06bcc769b49fe8b7ae2b417b1af3..b9d653a821e4b6650d2666e368414df43843e4b8 100644 (file)
@@ -34,7 +34,6 @@ func (sch *Scheduler) runQueue() {
        dontstart := map[arvados.InstanceType]bool{}
        var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
        var containerAllocatedWorkerBootingCount int
-       var longestWaitTimeCandidate, previousLongestWaitTimeCandidate float64
 
 tryrun:
        for i, ctr := range sorted {
@@ -46,11 +45,6 @@ tryrun:
                if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
                        continue
                }
-               previousLongestWaitTimeCandidate = longestWaitTimeCandidate
-               since := time.Since(ctr.CreatedAt).Seconds()
-               if since > longestWaitTimeCandidate {
-                       longestWaitTimeCandidate = since
-               }
                switch ctr.State {
                case arvados.ContainerStateQueued:
                        if unalloc[it] < 1 && sch.pool.AtQuota() {
@@ -98,7 +92,6 @@ tryrun:
                                logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
                        } else if sch.pool.StartContainer(it, ctr) {
                                // Success.
-                               longestWaitTimeCandidate = previousLongestWaitTimeCandidate
                        } else {
                                containerAllocatedWorkerBootingCount += 1
                                dontstart[it] = true
@@ -108,7 +101,6 @@ tryrun:
 
        sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
        sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota)))
-       sch.mLongestWaitTimeSinceQueue.Set(longestWaitTimeCandidate)
 
        if len(overquota) > 0 {
                // Unlock any containers that are unmappable while
index e7963ca7d4bd99f839fb17a7731a5fe5fefc2c89..fd1d0a870b7ac9f34f9d1dd39f250fed62b4a099 100644 (file)
@@ -418,6 +418,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        }
        sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
+       sch.updateMetrics()
 
        c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
        c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
@@ -429,6 +430,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        pool = stubPool{}
        sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
+       sch.updateMetrics()
 
        c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
        c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
@@ -461,6 +463,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        }
        sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
+       sch.updateMetrics()
 
        c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
 }
index c169245735fb65b8ed7de8c73d504447905fcbae..b1d369ed2483ec15e4b36e8e01e18dbac5f11cc3 100644 (file)
@@ -11,6 +11,7 @@ import (
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -97,6 +98,30 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
        reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
 }
 
+func (sch *Scheduler) updateMetrics() {
+       earliest := time.Now()
+       entries, _ := sch.queue.Entries()
+       running := sch.pool.Running()
+       for _, ent := range entries {
+               if ent.Container.Priority > 0 &&
+                       (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+                       // Exclude containers that are preparing to run the payload (i.e.
+                       // ContainerStateLocked and running on a worker, most likely loading the
+                       // payload image
+                       if _, ok := running[ent.Container.UUID]; !ok {
+                               if ent.Container.CreatedAt.Before(earliest) {
+                                       earliest = ent.Container.CreatedAt
+                               }
+                       }
+               }
+       }
+       if !earliest.IsZero() {
+               sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
+       } else {
+               sch.mLongestWaitTimeSinceQueue.Set(0)
+       }
+}
+
 // Start starts the scheduler.
 func (sch *Scheduler) Start() {
        go sch.runOnce.Do(sch.run)
@@ -149,6 +174,7 @@ func (sch *Scheduler) run() {
        for {
                sch.runQueue()
                sch.sync()
+               sch.updateMetrics()
                select {
                case <-sch.stop:
                        return
index 67962c9d65a034f6cd225a26afdc9c0bece6877e..4c90c4e6f42199606ea7df9ceda09e87e1360c11 100644 (file)
@@ -948,7 +948,8 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                if wp.mDisappearances != nil {
                        wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
                }
-               if wp.mTimeFromShutdownToGone != nil {
+               // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+               if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
                        wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
                }
                delete(wp.workers, id)