21258: Fix "container completed twice" testing bug.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
index 6e0b1294875dfc9e50c2794b3b79cdd069c1e5ce..2265be6e1610015358036f515c50acea5bad5c11 100644 (file)
@@ -34,7 +34,10 @@ type StubDriver struct {
        // SetupVM, if set, is called upon creation of each new
        // StubVM. This is the caller's opportunity to customize the
        // VM's error rate and other behaviors.
-       SetupVM func(*StubVM)
+       //
+       // If SetupVM returns an error, that error will be returned to
+       // the caller of Create(), and the new VM will be discarded.
+       SetupVM func(*StubVM) error
 
        // Bugf, if set, is called if a bug is detected in the caller
        // or stub. Typically set to (*check.C)Errorf. If unset,
@@ -152,7 +155,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                Exec:           svm.Exec,
        }
        if setup := sis.driver.SetupVM; setup != nil {
-               setup(svm)
+               err := setup(svm)
+               if err != nil {
+                       return nil, err
+               }
        }
        sis.servers[svm.id] = svm
        return svm.Instance(), nil
@@ -195,6 +201,12 @@ type RateLimitError struct{ Retry time.Time }
 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string                { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool        { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
@@ -227,6 +239,8 @@ type StubVM struct {
        killing      map[string]bool
        lastPID      int64
        deadlocked   string
+       stubprocs    sync.WaitGroup
+       destroying   bool
        sync.Mutex
 }
 
@@ -255,6 +269,17 @@ func (svm *StubVM) Instance() stubInstance {
 }
 
 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       // Ensure we don't start any new stubprocs after Destroy()
+       // has started Wait()ing for stubprocs to end.
+       svm.Lock()
+       if svm.destroying {
+               svm.Unlock()
+               return 1
+       }
+       svm.stubprocs.Add(1)
+       defer svm.stubprocs.Done()
+       svm.Unlock()
+
        stdinData, err := ioutil.ReadAll(stdin)
        if err != nil {
                fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
@@ -292,7 +317,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                pid := svm.lastPID
                svm.running[uuid] = stubProcess{pid: pid}
                svm.Unlock()
+
                time.Sleep(svm.CrunchRunDetachDelay)
+
+               svm.Lock()
+               defer svm.Unlock()
+               if svm.destroying {
+                       fmt.Fprint(stderr, "crunch-run: killed by system shutdown\n")
+                       return 9
+               }
                fmt.Fprintf(stderr, "starting %s\n", uuid)
                logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
@@ -300,13 +333,18 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                        "PID":           pid,
                })
                logger.Printf("[test] starting crunch-run stub")
+               svm.stubprocs.Add(1)
                go func() {
+                       defer svm.stubprocs.Done()
                        var ctr arvados.Container
                        var started, completed bool
                        defer func() {
                                logger.Print("[test] exiting crunch-run stub")
                                svm.Lock()
                                defer svm.Unlock()
+                               if svm.destroying {
+                                       return
+                               }
                                if svm.running[uuid].pid != pid {
                                        bugf := svm.sis.driver.Bugf
                                        if bugf == nil {
@@ -346,8 +384,10 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
 
                        svm.Lock()
                        killed := svm.killing[uuid]
+                       delete(svm.killing, uuid)
+                       destroying := svm.destroying
                        svm.Unlock()
-                       if killed || wantCrashEarly {
+                       if killed || wantCrashEarly || destroying {
                                return
                        }
 
@@ -439,6 +479,10 @@ func (si stubInstance) Destroy() error {
        if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
                return errors.New("instance could not be destroyed")
        }
+       si.svm.Lock()
+       si.svm.destroying = true
+       si.svm.Unlock()
+       si.svm.stubprocs.Wait()
        si.svm.SSHService.Close()
        sis.mtx.Lock()
        defer sis.mtx.Unlock()