X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e838828374ceed5ef6da260939251e86f72b6f27..ac312e0acae4fd114114081c9f4791d05e640831:/lib/dispatchcloud/test/stub_driver.go diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index a9a5a429f3..0a74d97606 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -17,8 +17,10 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/crunchrun" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" ) @@ -32,14 +34,23 @@ type StubDriver struct { // SetupVM, if set, is called upon creation of each new // StubVM. This is the caller's opportunity to customize the // VM's error rate and other behaviors. - SetupVM func(*StubVM) + // + // If SetupVM returns an error, that error will be returned to + // the caller of Create(), and the new VM will be discarded. + SetupVM func(*StubVM) error + + // Bugf, if set, is called if a bug is detected in the caller + // or stub. Typically set to (*check.C)Errorf. If unset, + // logger.Warnf is called instead. + Bugf func(string, ...interface{}) // StubVM's fake crunch-run uses this Queue to read and update // container state. Queue *Queue // Frequency of artificially introduced errors on calls to - // Destroy. 0=always succeed, 1=always fail. + // Create and Destroy. 0=always succeed, 1=always fail. + ErrorRateCreate float64 ErrorRateDestroy float64 // If Create() or Instances() is called too frequently, return @@ -47,6 +58,8 @@ type StubDriver struct { MinTimeBetweenCreateCalls time.Duration MinTimeBetweenInstancesCalls time.Duration + QuotaMaxInstances int + // If true, Create and Destroy calls block until Release() is // called. HoldCloudOps bool @@ -56,7 +69,7 @@ type StubDriver struct { } // InstanceSet returns a new *StubInstanceSet. -func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) { +func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) { if sd.holdCloudOps == nil { sd.holdCloudOps = make(chan bool) } @@ -99,9 +112,10 @@ type StubInstanceSet struct { allowCreateCall time.Time allowInstancesCall time.Time + lastInstanceID int } -func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) { +func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) { if sis.driver.HoldCloudOps { sis.driver.holdCloudOps <- true } @@ -112,21 +126,26 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, } if sis.allowCreateCall.After(time.Now()) { return nil, RateLimitError{sis.allowCreateCall} - } else { - sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls) } - + if math_rand.Float64() < sis.driver.ErrorRateCreate { + return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate) + } + if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max { + return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)} + } + sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls) ak := sis.driver.AuthorizedKeys if authKey != nil { ak = append([]ssh.PublicKey{authKey}, ak...) } + sis.lastInstanceID++ svm := &StubVM{ + InitCommand: initCommand, sis: sis, - id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())), + id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)), tags: copyTags(tags), providerType: it.ProviderType, - initCommand: cmd, - running: map[string]int64{}, + running: map[string]stubProcess{}, killing: map[string]bool{}, } svm.SSHService = SSHService{ @@ -136,7 +155,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, Exec: svm.Exec, } if setup := sis.driver.SetupVM; setup != nil { - setup(svm) + err := setup(svm) + if err != nil { + return nil, err + } } sis.servers[svm.id] = svm return svm.Instance(), nil @@ -147,9 +169,8 @@ func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err defer sis.mtx.RUnlock() if sis.allowInstancesCall.After(time.Now()) { return nil, RateLimitError{sis.allowInstancesCall} - } else { - sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls) } + sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls) var r []cloud.Instance for _, ss := range sis.servers { r = append(r, ss.Instance()) @@ -166,11 +187,26 @@ func (sis *StubInstanceSet) Stop() { sis.stopped = true } +func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) { + sis.mtx.Lock() + defer sis.mtx.Unlock() + for _, vm := range sis.servers { + svms = append(svms, vm) + } + return +} + type RateLimitError struct{ Retry time.Time } func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) } func (e RateLimitError) EarliestRetry() time.Time { return e.Retry } +type CapacityError struct{ InstanceTypeSpecific bool } + +func (e CapacityError) Error() string { return "insufficient capacity" } +func (e CapacityError) IsCapacityError() bool { return true } +func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific } + // StubVM is a fake server that runs an SSH service. It represents a // VM running in a fake cloud. // @@ -185,21 +221,35 @@ type StubVM struct { CrunchRunMissing bool CrunchRunCrashRate float64 CrunchRunDetachDelay time.Duration + ArvMountMaxExitLag time.Duration + ArvMountDeadlockRate float64 ExecuteContainer func(arvados.Container) int CrashRunningContainer func(arvados.Container) + ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config " + + // Populated by (*StubInstanceSet)Create() + InitCommand cloud.InitCommand sis *StubInstanceSet id cloud.InstanceID tags cloud.InstanceTags - initCommand cloud.InitCommand providerType string SSHService SSHService - running map[string]int64 + running map[string]stubProcess killing map[string]bool lastPID int64 + deadlocked string sync.Mutex } +type stubProcess struct { + pid int64 + + // crunch-run has exited, but arv-mount process (or something) + // still holds lock in /var/run/ + exited bool +} + func (svm *StubVM) Instance() stubInstance { svm.Lock() defer svm.Unlock() @@ -236,15 +286,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, fmt.Fprint(stderr, "crunch-run: command not found\n") return 1 } - if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") { - var stdinKV map[string]string - err := json.Unmarshal(stdinData, &stdinKV) + if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) { + var configData crunchrun.ConfigData + err := json.Unmarshal(stdinData, &configData) if err != nil { fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData) return 1 } for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} { - if stdinKV[name] == "" { + if configData.Env[name] == "" { fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData) return 1 } @@ -252,7 +302,7 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, svm.Lock() svm.lastPID++ pid := svm.lastPID - svm.running[uuid] = pid + svm.running[uuid] = stubProcess{pid: pid} svm.Unlock() time.Sleep(svm.CrunchRunDetachDelay) fmt.Fprintf(stderr, "starting %s\n", uuid) @@ -263,93 +313,110 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, }) logger.Printf("[test] starting crunch-run stub") go func() { + var ctr arvados.Container + var started, completed bool + defer func() { + logger.Print("[test] exiting crunch-run stub") + svm.Lock() + defer svm.Unlock() + if svm.running[uuid].pid != pid { + bugf := svm.sis.driver.Bugf + if bugf == nil { + bugf = logger.Warnf + } + bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid) + return + } + if !completed { + logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub") + if started && svm.CrashRunningContainer != nil { + svm.CrashRunningContainer(ctr) + } + } + sproc := svm.running[uuid] + sproc.exited = true + svm.running[uuid] = sproc + svm.Unlock() + time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64())) + svm.Lock() + if math_rand.Float64() >= svm.ArvMountDeadlockRate { + delete(svm.running, uuid) + } + }() + crashluck := math_rand.Float64() + wantCrash := crashluck < svm.CrunchRunCrashRate + wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2 + ctr, ok := queue.Get(uuid) if !ok { logger.Print("[test] container not in queue") return } - defer func() { - if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil { - svm.CrashRunningContainer(ctr) - } - }() - - if crashluck > svm.CrunchRunCrashRate/2 { - time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) - ctr.State = arvados.ContainerStateRunning - if !queue.Notify(ctr) { - ctr, _ = queue.Get(uuid) - logger.Print("[test] erroring out because state=Running update was rejected") - return - } - } - time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) svm.Lock() - defer svm.Unlock() - if svm.running[uuid] != pid { - logger.Print("[test] container was killed") + killed := svm.killing[uuid] + svm.Unlock() + if killed || wantCrashEarly { + return + } + + ctr.State = arvados.ContainerStateRunning + started = queue.Notify(ctr) + if !started { + ctr, _ = queue.Get(uuid) + logger.Print("[test] erroring out because state=Running update was rejected") return } - delete(svm.running, uuid) - if crashluck < svm.CrunchRunCrashRate { + if wantCrash { logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub") - } else { - if svm.ExecuteContainer != nil { - ctr.ExitCode = svm.ExecuteContainer(ctr) - } - logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub") - ctr.State = arvados.ContainerStateComplete - go queue.Notify(ctr) + return } + if svm.ExecuteContainer != nil { + ctr.ExitCode = svm.ExecuteContainer(ctr) + } + logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container") + ctr.State = arvados.ContainerStateComplete + completed = queue.Notify(ctr) }() return 0 } if command == "crunch-run --list" { svm.Lock() defer svm.Unlock() - for uuid := range svm.running { - fmt.Fprintf(stdout, "%s\n", uuid) + for uuid, sproc := range svm.running { + if sproc.exited { + fmt.Fprintf(stdout, "%s stale\n", uuid) + } else { + fmt.Fprintf(stdout, "%s\n", uuid) + } } if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) { fmt.Fprintln(stdout, "broken") } + fmt.Fprintln(stdout, svm.deadlocked) return 0 } if strings.HasPrefix(command, "crunch-run --kill ") { svm.Lock() - pid, running := svm.running[uuid] - if running && !svm.killing[uuid] { + sproc, running := svm.running[uuid] + if running && !sproc.exited { svm.killing[uuid] = true - go func() { - time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond) - svm.Lock() - defer svm.Unlock() - if svm.running[uuid] == pid { - // Kill only if the running entry - // hasn't since been killed and - // replaced with a different one. - delete(svm.running, uuid) - } - delete(svm.killing, uuid) - }() svm.Unlock() time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond) svm.Lock() - _, running = svm.running[uuid] + sproc, running = svm.running[uuid] } svm.Unlock() - if running { + if running && !sproc.exited { fmt.Fprintf(stderr, "%s: container is running\n", uuid) return 1 - } else { - fmt.Fprintf(stderr, "%s: container is not running\n", uuid) - return 0 } + fmt.Fprintf(stderr, "%s: container is not running\n", uuid) + return 0 } if command == "true" { return 0 @@ -436,3 +503,13 @@ func copyTags(src cloud.InstanceTags) cloud.InstanceTags { } return dst } + +func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice { + return nil +} + +type QuotaError struct { + error +} + +func (QuotaError) IsQuotaError() bool { return true }