Merge branch 'master' into 16811-public-favs
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
index 4df39d0c46ac7dc538e5c7d9949cd884df8768b4..4d32cf221ce49461e092a834ad192460bc37a49d 100644 (file)
@@ -17,8 +17,8 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
@@ -34,6 +34,11 @@ type StubDriver struct {
        // VM's error rate and other behaviors.
        SetupVM func(*StubVM)
 
+       // Bugf, if set, is called if a bug is detected in the caller
+       // or stub. Typically set to (*check.C)Errorf. If unset,
+       // logger.Warnf is called instead.
+       Bugf func(string, ...interface{})
+
        // StubVM's fake crunch-run uses this Queue to read and update
        // container state.
        Queue *Queue
@@ -56,7 +61,7 @@ type StubDriver struct {
 }
 
 // InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
        if sd.holdCloudOps == nil {
                sd.holdCloudOps = make(chan bool)
        }
@@ -99,6 +104,7 @@ type StubInstanceSet struct {
 
        allowCreateCall    time.Time
        allowInstancesCall time.Time
+       lastInstanceID     int
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
@@ -112,20 +118,21 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
        }
        if sis.allowCreateCall.After(time.Now()) {
                return nil, RateLimitError{sis.allowCreateCall}
-       } else {
-               sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
        }
-
+       sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
        ak := sis.driver.AuthorizedKeys
        if authKey != nil {
                ak = append([]ssh.PublicKey{authKey}, ak...)
        }
+       sis.lastInstanceID++
        svm := &StubVM{
                sis:          sis,
-               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               id:           cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
+               running:      map[string]stubProcess{},
+               killing:      map[string]bool{},
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
@@ -145,9 +152,8 @@ func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
        defer sis.mtx.RUnlock()
        if sis.allowInstancesCall.After(time.Now()) {
                return nil, RateLimitError{sis.allowInstancesCall}
-       } else {
-               sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
        }
+       sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
        var r []cloud.Instance
        for _, ss := range sis.servers {
                r = append(r, ss.Instance())
@@ -177,12 +183,16 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 // running (and might change IP addresses, shut down, etc.)  without
 // updating any stubInstances that have been returned to callers.
 type StubVM struct {
-       Boot                 time.Time
-       Broken               time.Time
-       CrunchRunMissing     bool
-       CrunchRunCrashRate   float64
-       CrunchRunDetachDelay time.Duration
-       ExecuteContainer     func(arvados.Container) int
+       Boot                  time.Time
+       Broken                time.Time
+       ReportBroken          time.Time
+       CrunchRunMissing      bool
+       CrunchRunCrashRate    float64
+       CrunchRunDetachDelay  time.Duration
+       ArvMountMaxExitLag    time.Duration
+       ArvMountDeadlockRate  float64
+       ExecuteContainer      func(arvados.Container) int
+       CrashRunningContainer func(arvados.Container)
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -190,10 +200,21 @@ type StubVM struct {
        initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
-       running      map[string]bool
+       running      map[string]stubProcess
+       killing      map[string]bool
+       lastPID      int64
+       deadlocked   string
        sync.Mutex
 }
 
+type stubProcess struct {
+       pid int64
+
+       // crunch-run has exited, but arv-mount process (or something)
+       // still holds lock in /var/run/
+       exited bool
+}
+
 func (svm *StubVM) Instance() stubInstance {
        svm.Lock()
        defer svm.Unlock()
@@ -239,80 +260,127 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
                        if stdinKV[name] == "" {
-                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
                                return 1
                        }
                }
                svm.Lock()
-               if svm.running == nil {
-                       svm.running = map[string]bool{}
-               }
-               svm.running[uuid] = true
+               svm.lastPID++
+               pid := svm.lastPID
+               svm.running[uuid] = stubProcess{pid: pid}
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
                logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
+                       "PID":           pid,
                })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
+                       var ctr arvados.Container
+                       var started, completed bool
+                       defer func() {
+                               logger.Print("[test] exiting crunch-run stub")
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid].pid != pid {
+                                       bugf := svm.sis.driver.Bugf
+                                       if bugf == nil {
+                                               bugf = logger.Warnf
+                                       }
+                                       bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
+                                       return
+                               }
+                               if !completed {
+                                       logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+                                       if started && svm.CrashRunningContainer != nil {
+                                               svm.CrashRunningContainer(ctr)
+                                       }
+                               }
+                               sproc := svm.running[uuid]
+                               sproc.exited = true
+                               svm.running[uuid] = sproc
+                               svm.Unlock()
+                               time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
+                               svm.Lock()
+                               if math_rand.Float64() >= svm.ArvMountDeadlockRate {
+                                       delete(svm.running, uuid)
+                               }
+                       }()
+
                        crashluck := math_rand.Float64()
+                       wantCrash := crashluck < svm.CrunchRunCrashRate
+                       wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
+
                        ctr, ok := queue.Get(uuid)
                        if !ok {
                                logger.Print("[test] container not in queue")
                                return
                        }
-                       if crashluck > svm.CrunchRunCrashRate/2 {
-                               time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
-                               ctr.State = arvados.ContainerStateRunning
-                               queue.Notify(ctr)
-                       }
 
                        time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
                        svm.Lock()
-                       _, running := svm.running[uuid]
+                       killed := svm.killing[uuid]
                        svm.Unlock()
-                       if !running {
-                               logger.Print("[test] container was killed")
+                       if killed || wantCrashEarly {
+                               return
+                       }
+
+                       ctr.State = arvados.ContainerStateRunning
+                       started = queue.Notify(ctr)
+                       if !started {
+                               ctr, _ = queue.Get(uuid)
+                               logger.Print("[test] erroring out because state=Running update was rejected")
+                               return
+                       }
+
+                       if wantCrash {
+                               logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
                                return
                        }
                        if svm.ExecuteContainer != nil {
                                ctr.ExitCode = svm.ExecuteContainer(ctr)
                        }
-                       // TODO: Check whether the stub instance has
-                       // been destroyed, and if so, don't call
-                       // queue.Notify. Then "container finished
-                       // twice" can be classified as a bug.
-                       if crashluck < svm.CrunchRunCrashRate {
-                               logger.Print("[test] crashing crunch-run stub")
-                       } else {
-                               ctr.State = arvados.ContainerStateComplete
-                               queue.Notify(ctr)
-                       }
-                       logger.Print("[test] exiting crunch-run stub")
-                       svm.Lock()
-                       defer svm.Unlock()
-                       delete(svm.running, uuid)
+                       logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
+                       ctr.State = arvados.ContainerStateComplete
+                       completed = queue.Notify(ctr)
                }()
                return 0
        }
        if command == "crunch-run --list" {
                svm.Lock()
                defer svm.Unlock()
-               for uuid := range svm.running {
-                       fmt.Fprintf(stdout, "%s\n", uuid)
+               for uuid, sproc := range svm.running {
+                       if sproc.exited {
+                               fmt.Fprintf(stdout, "%s stale\n", uuid)
+                       } else {
+                               fmt.Fprintf(stdout, "%s\n", uuid)
+                       }
                }
+               if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+                       fmt.Fprintln(stdout, "broken")
+               }
+               fmt.Fprintln(stdout, svm.deadlocked)
                return 0
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
                svm.Lock()
-               defer svm.Unlock()
-               if svm.running[uuid] {
-                       delete(svm.running, uuid)
-               } else {
-                       fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+               sproc, running := svm.running[uuid]
+               if running && !sproc.exited {
+                       svm.killing[uuid] = true
+                       svm.Unlock()
+                       time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+                       svm.Lock()
+                       sproc, running = svm.running[uuid]
+               }
+               svm.Unlock()
+               if running && !sproc.exited {
+                       fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+                       return 1
                }
+               fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
                return 0
        }
        if command == "true" {