From: Ward Vandewege Date: Tue, 15 Sep 2020 17:59:33 +0000 (-0400) Subject: 16636: implement review feedback. X-Git-Tag: 2.1.0~61^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/407fc461d20ece8b11b7b56f29a3caff3083ff8d 16636: implement review feedback. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 45b346383f..a1ff414b73 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -382,7 +382,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { *next[upd.UUID] = upd } } - selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters"} + selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"} limitParam := 1000 mine, err := cq.fetchAll(arvados.ResourceListParams{ diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 9bbb064fee..b9d653a821 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -34,7 +34,6 @@ func (sch *Scheduler) runQueue() { dontstart := map[arvados.InstanceType]bool{} var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota var containerAllocatedWorkerBootingCount int - var longestWaitTimeCandidate, previousLongestWaitTimeCandidate float64 tryrun: for i, ctr := range sorted { @@ -46,11 +45,6 @@ tryrun: if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } - previousLongestWaitTimeCandidate = longestWaitTimeCandidate - since := time.Since(ctr.CreatedAt).Seconds() - if since > longestWaitTimeCandidate { - longestWaitTimeCandidate = since - } switch ctr.State { case arvados.ContainerStateQueued: if unalloc[it] < 1 && sch.pool.AtQuota() { @@ -98,7 +92,6 @@ tryrun: logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") } else if sch.pool.StartContainer(it, ctr) { // Success. - longestWaitTimeCandidate = previousLongestWaitTimeCandidate } else { containerAllocatedWorkerBootingCount += 1 dontstart[it] = true @@ -108,7 +101,6 @@ tryrun: sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount)) sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota))) - sch.mLongestWaitTimeSinceQueue.Set(longestWaitTimeCandidate) if len(overquota) > 0 { // Unlock any containers that are unmappable while diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go index e7963ca7d4..fd1d0a870b 100644 --- a/lib/dispatchcloud/scheduler/run_queue_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -418,6 +418,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) { } sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond) sch.runQueue() + sch.updateMetrics() c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1) c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0) @@ -429,6 +430,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) { pool = stubPool{} sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond) sch.runQueue() + sch.updateMetrics() c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0) c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1) @@ -461,6 +463,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) { } sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond) sch.runQueue() + sch.updateMetrics() c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0) } diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index c169245735..b1d369ed24 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -97,6 +98,30 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) { reg.MustRegister(sch.mLongestWaitTimeSinceQueue) } +func (sch *Scheduler) updateMetrics() { + earliest := time.Now() + entries, _ := sch.queue.Entries() + running := sch.pool.Running() + for _, ent := range entries { + if ent.Container.Priority > 0 && + (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) { + // Exclude containers that are preparing to run the payload (i.e. + // ContainerStateLocked and running on a worker, most likely loading the + // payload image + if _, ok := running[ent.Container.UUID]; !ok { + if ent.Container.CreatedAt.Before(earliest) { + earliest = ent.Container.CreatedAt + } + } + } + } + if !earliest.IsZero() { + sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds()) + } else { + sch.mLongestWaitTimeSinceQueue.Set(0) + } +} + // Start starts the scheduler. func (sch *Scheduler) Start() { go sch.runOnce.Do(sch.run) @@ -149,6 +174,7 @@ func (sch *Scheduler) run() { for { sch.runQueue() sch.sync() + sch.updateMetrics() select { case <-sch.stop: return diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 67962c9d65..4c90c4e6f4 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -948,7 +948,8 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { if wp.mDisappearances != nil { wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc() } - if wp.mTimeFromShutdownToGone != nil { + // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down + if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() { wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds()) } delete(wp.workers, id)