From f3ba65ddf8642b941cb04ddbb69fdb1e8515a6d2 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 6 Sep 2021 21:51:40 -0400 Subject: [PATCH] 18102: Fix flapping lock/unlock. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/dispatchcloud/container/queue.go | 4 +- lib/dispatchcloud/scheduler/run_queue.go | 11 +++- lib/dispatchcloud/scheduler/run_queue_test.go | 59 +++++++++++++++++++ lib/dispatchcloud/test/queue.go | 1 + 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 7a2727c1e9..938ef915f2 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -31,6 +31,7 @@ type QueueEnt struct { // populated. Container arvados.Container `json:"container"` InstanceType arvados.InstanceType `json:"instance_type"` + FirstSeenAt time.Time `json:"first_seen_at"` } // String implements fmt.Stringer by returning the queued container's @@ -229,6 +230,7 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) { delete(cq.current, uuid) } +// Caller must have lock. func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { it, err := cq.chooseType(&ctr) if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) { @@ -284,7 +286,7 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { "Priority": ctr.Priority, "InstanceType": it.Name, }).Info("adding container to queue") - cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it} + cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()} } // Lock acquires the dispatch lock for the given container. diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index d2f6d1c2cb..5bd4477ca5 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -20,7 +20,16 @@ func (sch *Scheduler) runQueue() { sorted = append(sorted, ent) } sort.Slice(sorted, func(i, j int) bool { - return sorted[i].Container.Priority > sorted[j].Container.Priority + if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj { + return pi > pj + } else { + // When containers have identical priority, + // start them in the order we first noticed + // them. This avoids extra lock/unlock cycles + // when we unlock the containers that don't + // fit in the available pool. + return sorted[i].FirstSeenAt.Before(sorted[j].FirstSeenAt) + } }) running := sch.pool.Running() diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go index c8d45cbd19..5b5fa960a1 100644 --- a/lib/dispatchcloud/scheduler/run_queue_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -263,6 +263,65 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { } } +// Don't flap lock/unlock when equal-priority containers compete for +// limited workers. +// +// (Unless we use FirstSeenAt as a secondary sort key, each runQueue() +// tends to choose a different one of the equal-priority containers as +// the "first" one that should be locked, and unlock the one it chose +// last time. This generates logging noise, and fails containers by +// reaching MaxDispatchAttempts quickly.) +func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) { + logger := ctxlog.TestLogger(c) + ctx := ctxlog.Context(context.Background(), logger) + queue := test.Queue{ + ChooseType: chooseType, + Logger: logger, + } + for i := 0; i < 8; i++ { + queue.Containers = append(queue.Containers, arvados.Container{ + UUID: test.ContainerUUID(i), + Priority: 333, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 3, + RAM: 3 << 30, + }, + }) + } + queue.Update() + pool := stubPool{ + quota: 2, + unalloc: map[arvados.InstanceType]int{ + test.InstanceType(3): 1, + }, + idle: map[arvados.InstanceType]int{ + test.InstanceType(3): 1, + }, + running: map[string]time.Time{}, + creates: []arvados.InstanceType{}, + starts: []string{}, + canCreate: 1, + } + sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond) + for i := 0; i < 30; i++ { + sch.runQueue() + sch.sync() + time.Sleep(time.Millisecond) + } + c.Check(pool.shutdowns, check.Equals, 0) + c.Check(pool.starts, check.HasLen, 1) + unlocked := map[string]int{} + for _, chg := range queue.StateChanges() { + if chg.To == arvados.ContainerStateQueued { + unlocked[chg.UUID]++ + } + } + for uuid, count := range unlocked { + c.Check(count, check.Equals, 1, check.Commentf("%s", uuid)) + } +} + // Start lower-priority containers while waiting for new/existing // workers to come up for higher-priority containers. func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 5973d16390..fcb2cfb33b 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -160,6 +160,7 @@ func (q *Queue) Update() error { upd[ctr.UUID] = container.QueueEnt{ Container: ctr, InstanceType: it, + FirstSeenAt: time.Now(), } } } -- 2.30.2