14360: Don't block scheduling while locking containers.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 8 Nov 2018 21:48:17 +0000 (16:48 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 8 Nov 2018 21:48:17 +0000 (16:48 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/test/queue.go

index 9fc1a16580524f71a910d74f58887bb7fa217062..7b432adc629d0da69f3497e1fa5fd7f7a70176f9 100644 (file)
@@ -34,6 +34,7 @@ func (sch *Scheduler) runQueue() {
        dontstart := map[arvados.InstanceType]bool{}
        var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
 
+tryrun:
        for i, ctr := range sorted {
                ctr, it := ctr.Container, ctr.InstanceType
                logger := sch.logger.WithFields(logrus.Fields{
@@ -43,60 +44,53 @@ func (sch *Scheduler) runQueue() {
                if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
                        continue
                }
-               if ctr.State == arvados.ContainerStateQueued {
+               switch ctr.State {
+               case arvados.ContainerStateQueued:
                        if unalloc[it] < 1 && sch.pool.AtQuota() {
-                               logger.Debugf("not locking: AtQuota and no unalloc workers")
+                               logger.Debug("not locking: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
-                               break
+                               break tryrun
                        }
-                       logger.Debugf("locking")
-                       err := sch.queue.Lock(ctr.UUID)
-                       if err != nil {
-                               logger.WithError(err).Warnf("lock error")
+                       sch.bgLock(logger, ctr.UUID)
+                       unalloc[it]--
+               case arvados.ContainerStateLocked:
+                       if unalloc[it] < 1 {
+                               if sch.pool.AtQuota() {
+                                       logger.Debug("not starting: AtQuota and no unalloc workers")
+                                       overquota = sorted[i:]
+                                       break tryrun
+                               }
+                               logger.Info("creating new instance")
+                               err := sch.pool.Create(it)
+                               if err != nil {
+                                       if _, ok := err.(cloud.QuotaError); !ok {
+                                               logger.WithError(err).Warn("error creating worker")
+                                       }
+                                       sch.queue.Unlock(ctr.UUID)
+                                       // Don't let lower-priority
+                                       // containers starve this one
+                                       // by using keeping idle
+                                       // workers alive on different
+                                       // instance types.  TODO:
+                                       // avoid getting starved here
+                                       // if instances of a specific
+                                       // type always fail.
+                                       overquota = sorted[i:]
+                                       break tryrun
+                               }
                                unalloc[it]++
-                               continue
                        }
-                       var ok bool
-                       ctr, ok = sch.queue.Get(ctr.UUID)
-                       if !ok {
-                               logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
-                               continue
-                       }
-                       if ctr.State != arvados.ContainerStateLocked {
-                               logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
-                       }
-               }
-               if ctr.State != arvados.ContainerStateLocked {
-                       continue
-               }
-               if unalloc[it] < 1 {
-                       logger.Info("creating new instance")
-                       err := sch.pool.Create(it)
-                       if err != nil {
-                               if _, ok := err.(cloud.QuotaError); !ok {
-                                       logger.WithError(err).Warn("error creating worker")
-                               }
-                               sch.queue.Unlock(ctr.UUID)
-                               // Don't let lower-priority containers
-                               // starve this one by using keeping
-                               // idle workers alive on different
-                               // instance types.  TODO: avoid
-                               // getting starved here if instances
-                               // of a specific type always fail.
-                               overquota = sorted[i:]
-                               break
+
+                       if dontstart[it] {
+                               // We already tried & failed to start
+                               // a higher-priority container on the
+                               // same instance type. Don't let this
+                               // one sneak in ahead of it.
+                       } else if sch.pool.StartContainer(it, ctr) {
+                               unalloc[it]--
+                       } else {
+                               dontstart[it] = true
                        }
-                       unalloc[it]++
-               }
-               if dontstart[it] {
-                       // We already tried & failed to start a
-                       // higher-priority container on the same
-                       // instance type. Don't let this one sneak in
-                       // ahead of it.
-               } else if sch.pool.StartContainer(it, ctr) {
-                       unalloc[it]--
-               } else {
-                       dontstart[it] = true
                }
        }
 
@@ -124,3 +118,36 @@ func (sch *Scheduler) runQueue() {
                }
        }
 }
+
+// Start an API call to lock the given container, and return
+// immediately while waiting for the response in a new goroutine. Do
+// nothing if a lock request is already in progress for this
+// container.
+func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
+       logger.Debug("locking")
+       sch.mtx.Lock()
+       defer sch.mtx.Unlock()
+       if sch.locking[uuid] {
+               logger.Debug("locking in progress, doing nothing")
+               return
+       }
+       sch.locking[uuid] = true
+       go func() {
+               defer func() {
+                       sch.mtx.Lock()
+                       defer sch.mtx.Unlock()
+                       delete(sch.locking, uuid)
+               }()
+               err := sch.queue.Lock(uuid)
+               if err != nil {
+                       logger.WithError(err).Warn("error locking container")
+                       return
+               }
+               ctr, ok := sch.queue.Get(uuid)
+               if !ok {
+                       logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+               } else if ctr.State != arvados.ContainerStateLocked {
+                       logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+               }
+       }()
+}
index b8a03275f88bb78f01c2368ad5ffeb04b6e8508d..35db7052210ef8d1cbeb3d1a9cd86faf6920751f 100644 (file)
@@ -108,7 +108,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        {
                                UUID:     test.ContainerUUID(1),
                                Priority: 1,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -117,7 +117,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        {
                                UUID:     test.ContainerUUID(2),
                                Priority: 2,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -126,7 +126,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        {
                                UUID:     test.ContainerUUID(3),
                                Priority: 3,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -135,7 +135,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        {
                                UUID:     test.ContainerUUID(4),
                                Priority: 4,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -154,11 +154,11 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        test.InstanceType(2): 2,
                },
                running:   map[string]time.Time{},
-               canCreate: 1,
+               canCreate: 0,
        }
        New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
-       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)})
+       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
        for uuid := range pool.running {
                c.Check(uuid, check.Equals, uuids[4])
@@ -169,9 +169,10 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 // Create(), if AtQuota() is true.
 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
        for quota := 0; quota < 2; quota++ {
+               c.Logf("quota=%d", quota)
                shouldCreate := []arvados.InstanceType{}
-               for i := 1; i < 1+quota; i++ {
-                       shouldCreate = append(shouldCreate, test.InstanceType(i))
+               for i := 0; i < quota; i++ {
+                       shouldCreate = append(shouldCreate, test.InstanceType(1))
                }
                queue := test.Queue{
                        ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -181,7 +182,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                                {
                                        UUID:     test.ContainerUUID(1),
                                        Priority: 1,
-                                       State:    arvados.ContainerStateQueued,
+                                       State:    arvados.ContainerStateLocked,
                                        RuntimeConstraints: arvados.RuntimeConstraints{
                                                VCPUs: 1,
                                                RAM:   1 << 30,
@@ -223,7 +224,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                        test.InstanceType(2): 1,
                },
                running:   map[string]time.Time{},
-               canCreate: 2,
+               canCreate: 4,
        }
        queue := test.Queue{
                ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -234,7 +235,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // create a new worker
                                UUID:     test.ContainerUUID(1),
                                Priority: 1,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -244,7 +245,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // tentatively map to unalloc worker
                                UUID:     test.ContainerUUID(2),
                                Priority: 2,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -254,7 +255,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // start now on idle worker
                                UUID:     test.ContainerUUID(3),
                                Priority: 3,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 1,
                                        RAM:   1 << 30,
@@ -264,7 +265,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // create a new worker
                                UUID:     test.ContainerUUID(4),
                                Priority: 4,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 2,
                                        RAM:   2 << 30,
@@ -274,7 +275,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // tentatively map to unalloc worker
                                UUID:     test.ContainerUUID(5),
                                Priority: 5,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 2,
                                        RAM:   2 << 30,
@@ -284,7 +285,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                                // start now on idle worker
                                UUID:     test.ContainerUUID(6),
                                Priority: 6,
-                               State:    arvados.ContainerStateQueued,
+                               State:    arvados.ContainerStateLocked,
                                RuntimeConstraints: arvados.RuntimeConstraints{
                                        VCPUs: 2,
                                        RAM:   2 << 30,
index 0be8edb7b6420e00a07192ebcaed7eb5fa8e4186..9a5fb10d51c22de773ace09ad9cc1eee8bb65b3a 100644 (file)
@@ -32,6 +32,9 @@ type Scheduler struct {
        staleLockTimeout    time.Duration
        queueUpdateInterval time.Duration
 
+       locking map[string]bool
+       mtx     sync.Mutex
+
        runOnce sync.Once
        stop    chan struct{}
 }
@@ -48,6 +51,7 @@ func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, stale
                staleLockTimeout:    staleLockTimeout,
                queueUpdateInterval: queueUpdateInterval,
                stop:                make(chan struct{}),
+               locking:             map[string]bool{},
        }
 }
 
index 152094f52ea3b355673077cdbf8888500b0b7111..fda04d52b3b6498e69f98f41a2f1c73696dd3758 100644 (file)
@@ -108,7 +108,7 @@ func (q *Queue) notify() {
 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
        ent := q.entries[uuid]
        if ent.Container.State != from {
-               return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+               return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
        }
        ent.Container.State = to
        q.entries[uuid] = ent