Merge branch '15964-fix-docs' refs #15964
[arvados.git] / lib / dispatchcloud / scheduler / run_queue_test.go
index 8945f88a14385af1961f080f0e05601a994badeb..992edddfba6370198a16def5a6b57aed18575aa4 100644 (file)
@@ -5,19 +5,18 @@
 package scheduler
 
 import (
-       "errors"
+       "context"
+       "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
 var (
-       logger = logrus.StandardLogger()
-
        // arbitrary example container UUIDs
        uuids = func() (r []string) {
                for i := 0; i < 16; i++ {
@@ -37,49 +36,74 @@ type stubPool struct {
        notify    <-chan struct{}
        unalloc   map[arvados.InstanceType]int // idle+booting+unknown
        idle      map[arvados.InstanceType]int
+       unknown   map[arvados.InstanceType]int
        running   map[string]time.Time
        atQuota   bool
        canCreate int
        creates   []arvados.InstanceType
        starts    []string
        shutdowns int
+       sync.Mutex
 }
 
-func (p *stubPool) AtQuota() bool                 { return p.atQuota }
-func (p *stubPool) Subscribe() <-chan struct{}    { return p.notify }
-func (p *stubPool) Unsubscribe(<-chan struct{})   {}
-func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) AtQuota() bool               { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]time.Time {
+       p.Lock()
+       defer p.Unlock()
+       r := map[string]time.Time{}
+       for k, v := range p.running {
+               r[k] = v
+       }
+       return r
+}
 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+       p.Lock()
+       defer p.Unlock()
        r := map[arvados.InstanceType]int{}
        for it, n := range p.unalloc {
-               r[it] = n
+               r[it] = n - p.unknown[it]
        }
        return r
 }
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
+       p.Lock()
+       defer p.Unlock()
        p.creates = append(p.creates, it)
        if p.canCreate < 1 {
-               return stubQuotaError{errors.New("quota")}
+               return false
        }
        p.canCreate--
        p.unalloc[it]++
-       return nil
+       return true
+}
+func (p *stubPool) ForgetContainer(uuid string) {
 }
-func (p *stubPool) KillContainer(uuid string) {
-       p.running[uuid] = time.Now()
+func (p *stubPool) KillContainer(uuid, reason string) bool {
+       p.Lock()
+       defer p.Unlock()
+       defer delete(p.running, uuid)
+       t, ok := p.running[uuid]
+       return ok && t.IsZero()
 }
 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
        p.shutdowns++
        return false
 }
 func (p *stubPool) CountWorkers() map[worker.State]int {
+       p.Lock()
+       defer p.Unlock()
        return map[worker.State]int{
                worker.StateBooting: len(p.unalloc) - len(p.idle),
                worker.StateIdle:    len(p.idle),
                worker.StateRunning: len(p.running),
+               worker.StateUnknown: len(p.unknown),
        }
 }
 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       p.Lock()
+       defer p.Unlock()
        p.starts = append(p.starts, ctr.UUID)
        if p.idle[it] == 0 {
                return false
@@ -90,6 +114,10 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
        return true
 }
 
+func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
+       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+}
+
 var _ = check.Suite(&SchedulerSuite{})
 
 type SchedulerSuite struct{}
@@ -100,10 +128,9 @@ type SchedulerSuite struct{}
 // immediately. Don't try to create any other nodes after the failed
 // create.
 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        queue := test.Queue{
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-               },
+               ChooseType: chooseType,
                Containers: []arvados.Container{
                        {
                                UUID:     test.ContainerUUID(1),
@@ -156,7 +183,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                running:   map[string]time.Time{},
                canCreate: 0,
        }
-       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
@@ -168,6 +195,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 // If Create() fails, shutdown some nodes, and don't call Create()
 // again.  Don't call Create() at all if AtQuota() is true.
 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        for quota := 0; quota < 2; quota++ {
                c.Logf("quota=%d", quota)
                shouldCreate := []arvados.InstanceType{}
@@ -175,9 +203,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        shouldCreate = append(shouldCreate, test.InstanceType(3))
                }
                queue := test.Queue{
-                       ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                               return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-                       },
+                       ChooseType: chooseType,
                        Containers: []arvados.Container{
                                {
                                        UUID:     test.ContainerUUID(2),
@@ -213,7 +239,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        starts:    []string{},
                        canCreate: 0,
                }
-               New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
                c.Check(pool.creates, check.DeepEquals, shouldCreate)
                c.Check(pool.starts, check.DeepEquals, []string{})
                c.Check(pool.shutdowns, check.Not(check.Equals), 0)
@@ -223,6 +249,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 // Start lower-priority containers while waiting for new/existing
 // workers to come up for higher-priority containers.
 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        pool := stubPool{
                unalloc: map[arvados.InstanceType]int{
                        test.InstanceType(1): 2,
@@ -236,9 +263,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                canCreate: 4,
        }
        queue := test.Queue{
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-               },
+               ChooseType: chooseType,
                Containers: []arvados.Container{
                        {
                                // create a new worker
@@ -303,7 +328,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                },
        }
        queue.Update()
-       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
        running := map[string]bool{}
@@ -316,3 +341,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
        }
        c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
 }
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               running: map[string]time.Time{
+                       test.ContainerUUID(2): time.Time{},
+               },
+       }
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+       c.Check(pool.running, check.HasLen, 1)
+       sch.sync()
+       for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+       }
+       c.Check(pool.Running(), check.HasLen, 0)
+}