Merge branch '16723-kill-vs-requeue'
authorTom Clegg <tom@tomclegg.ca>
Sat, 22 Aug 2020 01:54:54 +0000 (21:54 -0400)
committerTom Clegg <tom@tomclegg.ca>
Sat, 22 Aug 2020 01:54:54 +0000 (21:54 -0400)
fixes #16723

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/test/queue.go
lib/dispatchcloud/test/stub_driver.go

index aa5f22a501331a2bdd108878987e25b133df720b..42decff31d0cada0bb83ce66ba509aa5f5d13448 100644 (file)
@@ -115,6 +115,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
+               Logger: ctxlog.TestLogger(c),
        }
        for i := 0; i < 200; i++ {
                queue.Containers = append(queue.Containers, arvados.Container{
@@ -170,6 +171,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        stubvm.CrunchRunCrashRate = 0.1
                }
        }
+       s.stubDriver.Bugf = c.Errorf
 
        start := time.Now()
        go s.disp.run()
@@ -303,7 +305,7 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
                time.Sleep(time.Millisecond)
        }
        c.Assert(len(sr.Items), check.Equals, 1)
-       c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
+       c.Check(sr.Items[0].Instance, check.Matches, "inst.*")
        c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
        c.Check(sr.Items[0].Price, check.Equals, 0.123)
        c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
index 4447f084a90cff2f962298c2bb3a71fef851ebb1..dddb974b326fbe7d61c280148e71f4f5c86e7abe 100644 (file)
@@ -88,6 +88,8 @@ tryrun:
                                // a higher-priority container on the
                                // same instance type. Don't let this
                                // one sneak in ahead of it.
+                       } else if sch.pool.KillContainer(ctr.UUID, "about to lock") {
+                               logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
                        } else if sch.pool.StartContainer(it, ctr) {
                                // Success.
                        } else {
index 32c6b3b24d198b90adb5f2899580783beb2dd9cb..992edddfba6370198a16def5a6b57aed18575aa4 100644 (file)
@@ -83,8 +83,9 @@ func (p *stubPool) ForgetContainer(uuid string) {
 func (p *stubPool) KillContainer(uuid, reason string) bool {
        p.Lock()
        defer p.Unlock()
-       delete(p.running, uuid)
-       return true
+       defer delete(p.running, uuid)
+       t, ok := p.running[uuid]
+       return ok && t.IsZero()
 }
 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
        p.shutdowns++
index 116ca7643117d3f4df3b6e8d4e99864a44d6dfe6..fc683505f93dbae41ff42f31032dd2d145d72169 100644 (file)
@@ -109,13 +109,17 @@ func (sch *Scheduler) cancel(uuid string, reason string) {
 }
 
 func (sch *Scheduler) kill(uuid string, reason string) {
+       if !sch.uuidLock(uuid, "kill") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
        sch.pool.KillContainer(uuid, reason)
        sch.pool.ForgetContainer(uuid)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
        uuid := ent.Container.UUID
-       if !sch.uuidLock(uuid, "cancel") {
+       if !sch.uuidLock(uuid, "requeue") {
                return
        }
        defer sch.uuidUnlock(uuid)
index 11d410fb1b9a931b8b65cb990aea1298babf7269..74b84122f286d912d8f8ef392e3eb860e6b1831d 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
 // Queue is a test stub for container.Queue. The caller specifies the
@@ -23,6 +24,8 @@ type Queue struct {
        // must not be nil.
        ChooseType func(*arvados.Container) (arvados.InstanceType, error)
 
+       Logger logrus.FieldLogger
+
        entries     map[string]container.QueueEnt
        updTime     time.Time
        subscribers map[<-chan struct{}]chan struct{}
@@ -166,13 +169,36 @@ func (q *Queue) Notify(upd arvados.Container) bool {
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                       if allowContainerUpdate[ctr.State][upd.State] {
                                q.Containers[i] = upd
                                return true
+                       } else {
+                               if q.Logger != nil {
+                                       q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+                               }
+                               return false
                        }
-                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
        return true
 }
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+       arvados.ContainerStateQueued: map[arvados.ContainerState]bool{
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateLocked: map[arvados.ContainerState]bool{
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateRunning: map[arvados.ContainerState]bool{
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+               arvados.ContainerStateComplete:  true,
+       },
+}
index 7a1f42301684a5a042f555c3e17ccda5d2f8b6c5..f6e06d3f7cebd05a97c8851f65d8d79812d18297 100644 (file)
@@ -34,6 +34,11 @@ type StubDriver struct {
        // VM's error rate and other behaviors.
        SetupVM func(*StubVM)
 
+       // Bugf, if set, is called if a bug is detected in the caller
+       // or stub. Typically set to (*check.C)Errorf. If unset,
+       // logger.Warnf is called instead.
+       Bugf func(string, ...interface{})
+
        // StubVM's fake crunch-run uses this Queue to read and update
        // container state.
        Queue *Queue
@@ -99,6 +104,7 @@ type StubInstanceSet struct {
 
        allowCreateCall    time.Time
        allowInstancesCall time.Time
+       lastInstanceID     int
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
@@ -120,9 +126,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
        if authKey != nil {
                ak = append([]ssh.PublicKey{authKey}, ak...)
        }
+       sis.lastInstanceID++
        svm := &StubVM{
                sis:          sis,
-               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               id:           cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
@@ -263,49 +270,68 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
+                       var ctr arvados.Container
+                       var started, completed bool
+                       defer func() {
+                               logger.Print("[test] exiting crunch-run stub")
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] != pid {
+                                       if !completed {
+                                               bugf := svm.sis.driver.Bugf
+                                               if bugf == nil {
+                                                       bugf = logger.Warnf
+                                               }
+                                               bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
+                                       }
+                               } else {
+                                       delete(svm.running, uuid)
+                               }
+                               if !completed {
+                                       logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+                                       if started && svm.CrashRunningContainer != nil {
+                                               svm.CrashRunningContainer(ctr)
+                                       }
+                               }
+                       }()
+
                        crashluck := math_rand.Float64()
+                       wantCrash := crashluck < svm.CrunchRunCrashRate
+                       wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
+
                        ctr, ok := queue.Get(uuid)
                        if !ok {
                                logger.Print("[test] container not in queue")
                                return
                        }
 
-                       defer func() {
-                               if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
-                                       svm.CrashRunningContainer(ctr)
-                               }
-                       }()
-
-                       if crashluck > svm.CrunchRunCrashRate/2 {
-                               time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
-                               ctr.State = arvados.ContainerStateRunning
-                               if !queue.Notify(ctr) {
-                                       ctr, _ = queue.Get(uuid)
-                                       logger.Print("[test] erroring out because state=Running update was rejected")
-                                       return
-                               }
-                       }
-
                        time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
 
                        svm.Lock()
-                       defer svm.Unlock()
-                       if svm.running[uuid] != pid {
-                               logger.Print("[test] container was killed")
+                       killed := svm.running[uuid] != pid
+                       svm.Unlock()
+                       if killed || wantCrashEarly {
                                return
                        }
-                       delete(svm.running, uuid)
 
-                       if crashluck < svm.CrunchRunCrashRate {
+                       ctr.State = arvados.ContainerStateRunning
+                       started = queue.Notify(ctr)
+                       if !started {
+                               ctr, _ = queue.Get(uuid)
+                               logger.Print("[test] erroring out because state=Running update was rejected")
+                               return
+                       }
+
+                       if wantCrash {
                                logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
-                       } else {
-                               if svm.ExecuteContainer != nil {
-                                       ctr.ExitCode = svm.ExecuteContainer(ctr)
-                               }
-                               logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
-                               ctr.State = arvados.ContainerStateComplete
-                               go queue.Notify(ctr)
+                               return
+                       }
+                       if svm.ExecuteContainer != nil {
+                               ctr.ExitCode = svm.ExecuteContainer(ctr)
                        }
+                       logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
+                       ctr.State = arvados.ContainerStateComplete
+                       completed = queue.Notify(ctr)
                }()
                return 0
        }