From ff8c3cfde1e73c1c0adccb670040e98d82120e99 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 8 Nov 2018 16:48:17 -0500 Subject: [PATCH 1/1] 14360: Don't block scheduling while locking containers. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/dispatchcloud/scheduler/run_queue.go | 123 +++++++++++------- lib/dispatchcloud/scheduler/run_queue_test.go | 33 ++--- lib/dispatchcloud/scheduler/scheduler.go | 4 + lib/dispatchcloud/test/queue.go | 2 +- 4 files changed, 97 insertions(+), 65 deletions(-) diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 9fc1a16580..7b432adc62 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -34,6 +34,7 @@ func (sch *Scheduler) runQueue() { dontstart := map[arvados.InstanceType]bool{} var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota +tryrun: for i, ctr := range sorted { ctr, it := ctr.Container, ctr.InstanceType logger := sch.logger.WithFields(logrus.Fields{ @@ -43,60 +44,53 @@ func (sch *Scheduler) runQueue() { if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } - if ctr.State == arvados.ContainerStateQueued { + switch ctr.State { + case arvados.ContainerStateQueued: if unalloc[it] < 1 && sch.pool.AtQuota() { - logger.Debugf("not locking: AtQuota and no unalloc workers") + logger.Debug("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] - break + break tryrun } - logger.Debugf("locking") - err := sch.queue.Lock(ctr.UUID) - if err != nil { - logger.WithError(err).Warnf("lock error") + sch.bgLock(logger, ctr.UUID) + unalloc[it]-- + case arvados.ContainerStateLocked: + if unalloc[it] < 1 { + if sch.pool.AtQuota() { + logger.Debug("not starting: AtQuota and no unalloc workers") + overquota = sorted[i:] + break tryrun + } + logger.Info("creating new instance") + err := sch.pool.Create(it) + if err != nil { + if _, ok := err.(cloud.QuotaError); !ok { + logger.WithError(err).Warn("error creating worker") + } + sch.queue.Unlock(ctr.UUID) + // Don't let lower-priority + // containers starve this one + // by using keeping idle + // workers alive on different + // instance types. TODO: + // avoid getting starved here + // if instances of a specific + // type always fail. + overquota = sorted[i:] + break tryrun + } unalloc[it]++ - continue } - var ok bool - ctr, ok = sch.queue.Get(ctr.UUID) - if !ok { - logger.Error("(BUG?) container disappeared from queue after Lock succeeded") - continue - } - if ctr.State != arvados.ContainerStateLocked { - logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State) - } - } - if ctr.State != arvados.ContainerStateLocked { - continue - } - if unalloc[it] < 1 { - logger.Info("creating new instance") - err := sch.pool.Create(it) - if err != nil { - if _, ok := err.(cloud.QuotaError); !ok { - logger.WithError(err).Warn("error creating worker") - } - sch.queue.Unlock(ctr.UUID) - // Don't let lower-priority containers - // starve this one by using keeping - // idle workers alive on different - // instance types. TODO: avoid - // getting starved here if instances - // of a specific type always fail. - overquota = sorted[i:] - break + + if dontstart[it] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.StartContainer(it, ctr) { + unalloc[it]-- + } else { + dontstart[it] = true } - unalloc[it]++ - } - if dontstart[it] { - // We already tried & failed to start a - // higher-priority container on the same - // instance type. Don't let this one sneak in - // ahead of it. - } else if sch.pool.StartContainer(it, ctr) { - unalloc[it]-- - } else { - dontstart[it] = true } } @@ -124,3 +118,36 @@ func (sch *Scheduler) runQueue() { } } } + +// Start an API call to lock the given container, and return +// immediately while waiting for the response in a new goroutine. Do +// nothing if a lock request is already in progress for this +// container. +func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) { + logger.Debug("locking") + sch.mtx.Lock() + defer sch.mtx.Unlock() + if sch.locking[uuid] { + logger.Debug("locking in progress, doing nothing") + return + } + sch.locking[uuid] = true + go func() { + defer func() { + sch.mtx.Lock() + defer sch.mtx.Unlock() + delete(sch.locking, uuid) + }() + err := sch.queue.Lock(uuid) + if err != nil { + logger.WithError(err).Warn("error locking container") + return + } + ctr, ok := sch.queue.Get(uuid) + if !ok { + logger.Error("(BUG?) container disappeared from queue after Lock succeeded") + } else if ctr.State != arvados.ContainerStateLocked { + logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State) + } + }() +} diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go index b8a03275f8..35db705221 100644 --- a/lib/dispatchcloud/scheduler/run_queue_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -108,7 +108,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { { UUID: test.ContainerUUID(1), Priority: 1, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -117,7 +117,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { { UUID: test.ContainerUUID(2), Priority: 2, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -126,7 +126,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { { UUID: test.ContainerUUID(3), Priority: 3, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -135,7 +135,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { { UUID: test.ContainerUUID(4), Priority: 4, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -154,11 +154,11 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { test.InstanceType(2): 2, }, running: map[string]time.Time{}, - canCreate: 1, + canCreate: 0, } New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)}) - c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)}) + c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)}) c.Check(pool.running, check.HasLen, 1) for uuid := range pool.running { c.Check(uuid, check.Equals, uuids[4]) @@ -169,9 +169,10 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { // Create(), if AtQuota() is true. func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { for quota := 0; quota < 2; quota++ { + c.Logf("quota=%d", quota) shouldCreate := []arvados.InstanceType{} - for i := 1; i < 1+quota; i++ { - shouldCreate = append(shouldCreate, test.InstanceType(i)) + for i := 0; i < quota; i++ { + shouldCreate = append(shouldCreate, test.InstanceType(1)) } queue := test.Queue{ ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { @@ -181,7 +182,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { { UUID: test.ContainerUUID(1), Priority: 1, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -223,7 +224,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { test.InstanceType(2): 1, }, running: map[string]time.Time{}, - canCreate: 2, + canCreate: 4, } queue := test.Queue{ ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { @@ -234,7 +235,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // create a new worker UUID: test.ContainerUUID(1), Priority: 1, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -244,7 +245,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // tentatively map to unalloc worker UUID: test.ContainerUUID(2), Priority: 2, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -254,7 +255,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // start now on idle worker UUID: test.ContainerUUID(3), Priority: 3, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 1, RAM: 1 << 30, @@ -264,7 +265,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // create a new worker UUID: test.ContainerUUID(4), Priority: 4, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 2, RAM: 2 << 30, @@ -274,7 +275,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // tentatively map to unalloc worker UUID: test.ContainerUUID(5), Priority: 5, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 2, RAM: 2 << 30, @@ -284,7 +285,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { // start now on idle worker UUID: test.ContainerUUID(6), Priority: 6, - State: arvados.ContainerStateQueued, + State: arvados.ContainerStateLocked, RuntimeConstraints: arvados.RuntimeConstraints{ VCPUs: 2, RAM: 2 << 30, diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index 0be8edb7b6..9a5fb10d51 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -32,6 +32,9 @@ type Scheduler struct { staleLockTimeout time.Duration queueUpdateInterval time.Duration + locking map[string]bool + mtx sync.Mutex + runOnce sync.Once stop chan struct{} } @@ -48,6 +51,7 @@ func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, stale staleLockTimeout: staleLockTimeout, queueUpdateInterval: queueUpdateInterval, stop: make(chan struct{}), + locking: map[string]bool{}, } } diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 152094f52e..fda04d52b3 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -108,7 +108,7 @@ func (q *Queue) notify() { func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { ent := q.entries[uuid] if ent.Container.State != from { - return fmt.Errorf("lock failed: state=%q", ent.Container.State) + return fmt.Errorf("changeState failed: state=%q", ent.Container.State) } ent.Container.State = to q.entries[uuid] = ent -- 2.30.2