X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/004cf8a6b59ac10bf84e213309289e6138c48b7d..be28c5f528a93ee32eef4c1dc2d0872cb718b29f:/lib/dispatchcloud/test/stub_driver.go diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index 873d987327..f6e06d3f7c 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -17,8 +17,8 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" ) @@ -34,6 +34,11 @@ type StubDriver struct { // VM's error rate and other behaviors. SetupVM func(*StubVM) + // Bugf, if set, is called if a bug is detected in the caller + // or stub. Typically set to (*check.C)Errorf. If unset, + // logger.Warnf is called instead. + Bugf func(string, ...interface{}) + // StubVM's fake crunch-run uses this Queue to read and update // container state. Queue *Queue @@ -56,7 +61,7 @@ type StubDriver struct { } // InstanceSet returns a new *StubInstanceSet. -func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) { +func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) { if sd.holdCloudOps == nil { sd.holdCloudOps = make(chan bool) } @@ -99,6 +104,7 @@ type StubInstanceSet struct { allowCreateCall time.Time allowInstancesCall time.Time + lastInstanceID int } func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) { @@ -120,9 +126,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, if authKey != nil { ak = append([]ssh.PublicKey{authKey}, ak...) } + sis.lastInstanceID++ svm := &StubVM{ sis: sis, - id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())), + id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)), tags: copyTags(tags), providerType: it.ProviderType, initCommand: cmd, @@ -263,49 +270,68 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, }) logger.Printf("[test] starting crunch-run stub") go func() { + var ctr arvados.Container + var started, completed bool + defer func() { + logger.Print("[test] exiting crunch-run stub") + svm.Lock() + defer svm.Unlock() + if svm.running[uuid] != pid { + if !completed { + bugf := svm.sis.driver.Bugf + if bugf == nil { + bugf = logger.Warnf + } + bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid]) + } + } else { + delete(svm.running, uuid) + } + if !completed { + logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub") + if started && svm.CrashRunningContainer != nil { + svm.CrashRunningContainer(ctr) + } + } + }() + crashluck := math_rand.Float64() + wantCrash := crashluck < svm.CrunchRunCrashRate + wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2 + ctr, ok := queue.Get(uuid) if !ok { logger.Print("[test] container not in queue") return } - defer func() { - if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil { - svm.CrashRunningContainer(ctr) - } - }() - - if crashluck > svm.CrunchRunCrashRate/2 { - time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) - ctr.State = arvados.ContainerStateRunning - if !queue.Notify(ctr) { - ctr, _ = queue.Get(uuid) - logger.Print("[test] erroring out because state=Running update was rejected") - return - } - } - time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) svm.Lock() - defer svm.Unlock() - if svm.running[uuid] != pid { - logger.Print("[test] container was killed") + killed := svm.running[uuid] != pid + svm.Unlock() + if killed || wantCrashEarly { return } - delete(svm.running, uuid) - if crashluck < svm.CrunchRunCrashRate { + ctr.State = arvados.ContainerStateRunning + started = queue.Notify(ctr) + if !started { + ctr, _ = queue.Get(uuid) + logger.Print("[test] erroring out because state=Running update was rejected") + return + } + + if wantCrash { logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub") - } else { - if svm.ExecuteContainer != nil { - ctr.ExitCode = svm.ExecuteContainer(ctr) - } - logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub") - ctr.State = arvados.ContainerStateComplete - go queue.Notify(ctr) + return + } + if svm.ExecuteContainer != nil { + ctr.ExitCode = svm.ExecuteContainer(ctr) } + logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container") + ctr.State = arvados.ContainerStateComplete + completed = queue.Notify(ctr) }() return 0 }