16663: Merge branch 'master'
authorTom Clegg <tom@tomclegg.ca>
Mon, 17 Aug 2020 14:17:28 +0000 (10:17 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 17 Aug 2020 14:17:28 +0000 (10:17 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/scheduler/sync_test.go

index 8ab1cd9ba7337a0a2c33d9628b33f257a46fa6e9..32c6b3b24d198b90adb5f2899580783beb2dd9cb 100644 (file)
@@ -36,6 +36,7 @@ type stubPool struct {
        notify    <-chan struct{}
        unalloc   map[arvados.InstanceType]int // idle+booting+unknown
        idle      map[arvados.InstanceType]int
+       unknown   map[arvados.InstanceType]int
        running   map[string]time.Time
        atQuota   bool
        canCreate int
@@ -62,7 +63,7 @@ func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
        defer p.Unlock()
        r := map[arvados.InstanceType]int{}
        for it, n := range p.unalloc {
-               r[it] = n
+               r[it] = n - p.unknown[it]
        }
        return r
 }
@@ -96,6 +97,7 @@ func (p *stubPool) CountWorkers() map[worker.State]int {
                worker.StateBooting: len(p.unalloc) - len(p.idle),
                worker.StateIdle:    len(p.idle),
                worker.StateRunning: len(p.running),
+               worker.StateUnknown: len(p.unknown),
        }
 }
 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
index de69df98227e624fc29ef8e55884e8457db29592..116ca7643117d3f4df3b6e8d4e99864a44d6dfe6 100644 (file)
@@ -8,6 +8,7 @@ import (
        "fmt"
 
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
@@ -23,6 +24,7 @@ import (
 // Running containers whose crunch-run processes have exited are
 // cancelled.
 func (sch *Scheduler) sync() {
+       anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
        running := sch.pool.Running()
        qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
@@ -30,7 +32,9 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(uuid, "not running on any worker")
+                               if !anyUnknownWorkers {
+                                       go sch.cancel(uuid, "not running on any worker")
+                               }
                        } else if !exited.IsZero() && qUpdated.After(exited) {
                                go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
index 305ab9e04eb379c82288853b3df9891bc639bf5b..538f5ea8cfd0b9e14edec62d629eaa104ff70514 100644 (file)
@@ -54,3 +54,65 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
        ents, _ = queue.Entries()
        c.Check(ents, check.HasLen, 0)
 }
+
+func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+               unknown: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+       }
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 0,
+                               State:    arvados.ContainerStateRunning,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+
+       ents, _ := queue.Entries()
+       c.Check(ents, check.HasLen, 1)
+
+       sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+
+       // Sync shouldn't cancel the container because it might be
+       // running on the VM with state=="unknown".
+       //
+       // (Cancel+forget happens asynchronously and requires multiple
+       // sync() calls, so even after 10x sync-and-sleep iterations,
+       // we aren't 100% confident that sync isn't trying to
+       // cancel. But in the test environment, the goroutines started
+       // by sync() access stubs and therefore run quickly, so it
+       // works fine in practice. We accept that if the code is
+       // broken, the test will still pass occasionally.)
+       for i := 0; i < 10; i++ {
+               sch.sync()
+               time.Sleep(time.Millisecond)
+       }
+       ents, _ = queue.Entries()
+       c.Check(ents, check.HasLen, 1)
+       c.Check(ents[test.ContainerUUID(1)].Container.State, check.Equals, arvados.ContainerStateRunning)
+
+       // Sync should cancel & forget the container when the
+       // "unknown" node goes away.
+       //
+       // (As above, cancel+forget is async and requires multiple
+       // sync() calls, but stubs are fast so in practice this takes
+       // much less than 1s to complete.)
+       pool.unknown = nil
+       for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Millisecond) {
+               sch.sync()
+               ents, _ = queue.Entries()
+               if len(ents) == 0 || time.Now().After(deadline) {
+                       break
+               }
+       }
+       c.Check(ents, check.HasLen, 0)
+}