X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/74df5a58360fe6bcb273480d3ddec507a53b6b2b..35a23c29e610809d19635aba3cc1956128cb75d2:/lib/dispatchcloud/test/stub_driver.go diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index f738e20664..5873e49221 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -6,9 +6,11 @@ package test import ( "crypto/rand" + "encoding/json" "errors" "fmt" "io" + "io/ioutil" math_rand "math/rand" "regexp" "strings" @@ -17,7 +19,6 @@ import ( "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/mitchellh/mapstructure" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" ) @@ -41,19 +42,36 @@ type StubDriver struct { // Destroy. 0=always succeed, 1=always fail. ErrorRateDestroy float64 + // If Create() or Instances() is called too frequently, return + // rate-limiting errors. + MinTimeBetweenCreateCalls time.Duration + MinTimeBetweenInstancesCalls time.Duration + + // If true, Create and Destroy calls block until Release() is + // called. + HoldCloudOps bool + instanceSets []*StubInstanceSet + holdCloudOps chan bool } // InstanceSet returns a new *StubInstanceSet. -func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, - logger logrus.FieldLogger) (cloud.InstanceSet, error) { - +func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) { + if sd.holdCloudOps == nil { + sd.holdCloudOps = make(chan bool) + } sis := StubInstanceSet{ driver: sd, + logger: logger, servers: map[cloud.InstanceID]*StubVM{}, } sd.instanceSets = append(sd.instanceSets, &sis) - return &sis, mapstructure.Decode(params, &sis) + + var err error + if params != nil { + err = json.Unmarshal(params, &sis) + } + return &sis, err } // InstanceSets returns all instances that have been created by the @@ -63,19 +81,41 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet { return sd.instanceSets } +// ReleaseCloudOps releases n pending Create/Destroy calls. If there +// are fewer than n blocked calls pending, it waits for the rest to +// arrive. +func (sd *StubDriver) ReleaseCloudOps(n int) { + for i := 0; i < n; i++ { + <-sd.holdCloudOps + } +} + type StubInstanceSet struct { driver *StubDriver + logger logrus.FieldLogger servers map[cloud.InstanceID]*StubVM mtx sync.RWMutex stopped bool + + allowCreateCall time.Time + allowInstancesCall time.Time } -func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) { +func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) { + if sis.driver.HoldCloudOps { + sis.driver.holdCloudOps <- true + } sis.mtx.Lock() defer sis.mtx.Unlock() if sis.stopped { return nil, errors.New("StubInstanceSet: Create called after Stop") } + if sis.allowCreateCall.After(time.Now()) { + return nil, RateLimitError{sis.allowCreateCall} + } else { + sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls) + } + ak := sis.driver.AuthorizedKeys if authKey != nil { ak = append([]ssh.PublicKey{authKey}, ak...) @@ -85,9 +125,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())), tags: copyTags(tags), providerType: it.ProviderType, + initCommand: cmd, } svm.SSHService = SSHService{ HostKey: sis.driver.HostKey, + AuthorizedUser: "root", AuthorizedKeys: ak, Exec: svm.Exec, } @@ -101,6 +143,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) { sis.mtx.RLock() defer sis.mtx.RUnlock() + if sis.allowInstancesCall.After(time.Now()) { + return nil, RateLimitError{sis.allowInstancesCall} + } else { + sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls) + } var r []cloud.Instance for _, ss := range sis.servers { r = append(r, ss.Instance()) @@ -117,6 +164,11 @@ func (sis *StubInstanceSet) Stop() { sis.stopped = true } +type RateLimitError struct{ Retry time.Time } + +func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) } +func (e RateLimitError) EarliestRetry() time.Time { return e.Retry } + // StubVM is a fake server that runs an SSH service. It represents a // VM running in a fake cloud. // @@ -135,6 +187,7 @@ type StubVM struct { sis *StubInstanceSet id cloud.InstanceID tags cloud.InstanceTags + initCommand cloud.InitCommand providerType string SSHService SSHService running map[string]bool @@ -157,7 +210,12 @@ func (svm *StubVM) Instance() stubInstance { } } -func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 { +func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 { + stdinData, err := ioutil.ReadAll(stdin) + if err != nil { + fmt.Fprintf(stderr, "error reading stdin: %s\n", err) + return 1 + } queue := svm.sis.driver.Queue uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command) if eta := svm.Boot.Sub(time.Now()); eta > 0 { @@ -172,7 +230,19 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write fmt.Fprint(stderr, "crunch-run: command not found\n") return 1 } - if strings.HasPrefix(command, "crunch-run --detach ") { + if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") { + var stdinKV map[string]string + err := json.Unmarshal(stdinData, &stdinKV) + if err != nil { + fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData) + return 1 + } + for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} { + if stdinKV[name] == "" { + fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin) + return 1 + } + } svm.Lock() if svm.running == nil { svm.running = map[string]bool{} @@ -181,7 +251,10 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write svm.Unlock() time.Sleep(svm.CrunchRunDetachDelay) fmt.Fprintf(stderr, "starting %s\n", uuid) - logger := logrus.WithField("ContainerUUID", uuid) + logger := svm.sis.logger.WithFields(logrus.Fields{ + "Instance": svm.id, + "ContainerUUID": uuid, + }) logger.Printf("[test] starting crunch-run stub") go func() { crashluck := math_rand.Float64() @@ -263,12 +336,19 @@ func (si stubInstance) Address() string { return si.addr } +func (si stubInstance) RemoteUser() string { + return si.svm.SSHService.AuthorizedUser +} + func (si stubInstance) Destroy() error { + sis := si.svm.sis + if sis.driver.HoldCloudOps { + sis.driver.holdCloudOps <- true + } if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy { return errors.New("instance could not be destroyed") } si.svm.SSHService.Close() - sis := si.svm.sis sis.mtx.Lock() defer sis.mtx.Unlock() delete(sis.servers, si.svm.id)