14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
index 3c23457c95a00a97859023802fa9705d6f98e4a6..5873e492213b86f58eaa98850c5c00c073cd2aee 100644 (file)
@@ -6,9 +6,11 @@ package test
 
 import (
        "crypto/rand"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        math_rand "math/rand"
        "regexp"
        "strings"
@@ -17,7 +19,6 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/mitchellh/mapstructure"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
@@ -41,17 +42,36 @@ type StubDriver struct {
        // Destroy. 0=always succeed, 1=always fail.
        ErrorRateDestroy float64
 
+       // If Create() or Instances() is called too frequently, return
+       // rate-limiting errors.
+       MinTimeBetweenCreateCalls    time.Duration
+       MinTimeBetweenInstancesCalls time.Duration
+
+       // If true, Create and Destroy calls block until Release() is
+       // called.
+       HoldCloudOps bool
+
        instanceSets []*StubInstanceSet
+       holdCloudOps chan bool
 }
 
 // InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+       if sd.holdCloudOps == nil {
+               sd.holdCloudOps = make(chan bool)
+       }
        sis := StubInstanceSet{
                driver:  sd,
+               logger:  logger,
                servers: map[cloud.InstanceID]*StubVM{},
        }
        sd.instanceSets = append(sd.instanceSets, &sis)
-       return &sis, mapstructure.Decode(params, &sis)
+
+       var err error
+       if params != nil {
+               err = json.Unmarshal(params, &sis)
+       }
+       return &sis, err
 }
 
 // InstanceSets returns all instances that have been created by the
@@ -61,19 +81,41 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
        return sd.instanceSets
 }
 
+// ReleaseCloudOps releases n pending Create/Destroy calls. If there
+// are fewer than n blocked calls pending, it waits for the rest to
+// arrive.
+func (sd *StubDriver) ReleaseCloudOps(n int) {
+       for i := 0; i < n; i++ {
+               <-sd.holdCloudOps
+       }
+}
+
 type StubInstanceSet struct {
        driver  *StubDriver
+       logger  logrus.FieldLogger
        servers map[cloud.InstanceID]*StubVM
        mtx     sync.RWMutex
        stopped bool
+
+       allowCreateCall    time.Time
+       allowInstancesCall time.Time
 }
 
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        if sis.stopped {
                return nil, errors.New("StubInstanceSet: Create called after Stop")
        }
+       if sis.allowCreateCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowCreateCall}
+       } else {
+               sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+       }
+
        ak := sis.driver.AuthorizedKeys
        if authKey != nil {
                ak = append([]ssh.PublicKey{authKey}, ak...)
@@ -83,9 +125,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
+               initCommand:  cmd,
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
+               AuthorizedUser: "root",
                AuthorizedKeys: ak,
                Exec:           svm.Exec,
        }
@@ -99,6 +143,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
        sis.mtx.RLock()
        defer sis.mtx.RUnlock()
+       if sis.allowInstancesCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowInstancesCall}
+       } else {
+               sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+       }
        var r []cloud.Instance
        for _, ss := range sis.servers {
                r = append(r, ss.Instance())
@@ -115,6 +164,11 @@ func (sis *StubInstanceSet) Stop() {
        sis.stopped = true
 }
 
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
@@ -133,6 +187,7 @@ type StubVM struct {
        sis          *StubInstanceSet
        id           cloud.InstanceID
        tags         cloud.InstanceTags
+       initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
        running      map[string]bool
@@ -155,7 +210,12 @@ func (svm *StubVM) Instance() stubInstance {
        }
 }
 
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       stdinData, err := ioutil.ReadAll(stdin)
+       if err != nil {
+               fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+               return 1
+       }
        queue := svm.sis.driver.Queue
        uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
        if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -170,7 +230,19 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
                fmt.Fprint(stderr, "crunch-run: command not found\n")
                return 1
        }
-       if strings.HasPrefix(command, "crunch-run --detach ") {
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+               var stdinKV map[string]string
+               err := json.Unmarshal(stdinData, &stdinKV)
+               if err != nil {
+                       fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+                       return 1
+               }
+               for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+                       if stdinKV[name] == "" {
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
+                               return 1
+                       }
+               }
                svm.Lock()
                if svm.running == nil {
                        svm.running = map[string]bool{}
@@ -179,7 +251,10 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
-               logger := logrus.WithField("ContainerUUID", uuid)
+               logger := svm.sis.logger.WithFields(logrus.Fields{
+                       "Instance":      svm.id,
+                       "ContainerUUID": uuid,
+               })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
                        crashluck := math_rand.Float64()
@@ -261,12 +336,19 @@ func (si stubInstance) Address() string {
        return si.addr
 }
 
+func (si stubInstance) RemoteUser() string {
+       return si.svm.SSHService.AuthorizedUser
+}
+
 func (si stubInstance) Destroy() error {
+       sis := si.svm.sis
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
        if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
                return errors.New("instance could not be destroyed")
        }
        si.svm.SSHService.Close()
-       sis := si.svm.sis
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        delete(sis.servers, si.svm.id)