Fix some golint warnings.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
index 808d7bd3a7a4e495a4c5779685662e6b33ee4fe7..41eb20763c75248c6cea81a2e9854ad2dfde42a8 100644 (file)
@@ -6,41 +6,77 @@ package test
 
 import (
        "crypto/rand"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        math_rand "math/rand"
+       "regexp"
+       "strings"
        "sync"
+       "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/mitchellh/mapstructure"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
 
-type StubExecFunc func(instance cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
-
 // A StubDriver implements cloud.Driver by setting up local SSH
-// servers that pass their command execution requests to the provided
-// SSHExecFunc.
+// servers that do fake command executions.
 type StubDriver struct {
-       Exec           StubExecFunc
        HostKey        ssh.Signer
        AuthorizedKeys []ssh.PublicKey
 
+       // SetupVM, if set, is called upon creation of each new
+       // StubVM. This is the caller's opportunity to customize the
+       // VM's error rate and other behaviors.
+       SetupVM func(*StubVM)
+
+       // Bugf, if set, is called if a bug is detected in the caller
+       // or stub. Typically set to (*check.C)Errorf. If unset,
+       // logger.Warnf is called instead.
+       Bugf func(string, ...interface{})
+
+       // StubVM's fake crunch-run uses this Queue to read and update
+       // container state.
+       Queue *Queue
+
+       // Frequency of artificially introduced errors on calls to
+       // Destroy. 0=always succeed, 1=always fail.
        ErrorRateDestroy float64
 
+       // If Create() or Instances() is called too frequently, return
+       // rate-limiting errors.
+       MinTimeBetweenCreateCalls    time.Duration
+       MinTimeBetweenInstancesCalls time.Duration
+
+       // If true, Create and Destroy calls block until Release() is
+       // called.
+       HoldCloudOps bool
+
        instanceSets []*StubInstanceSet
+       holdCloudOps chan bool
 }
 
 // InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+       if sd.holdCloudOps == nil {
+               sd.holdCloudOps = make(chan bool)
+       }
        sis := StubInstanceSet{
                driver:  sd,
-               servers: map[cloud.InstanceID]*stubServer{},
+               logger:  logger,
+               servers: map[cloud.InstanceID]*StubVM{},
        }
        sd.instanceSets = append(sd.instanceSets, &sis)
-       return &sis, mapstructure.Decode(params, &sis)
+
+       var err error
+       if params != nil {
+               err = json.Unmarshal(params, &sis)
+       }
+       return &sis, err
 }
 
 // InstanceSets returns all instances that have been created by the
@@ -50,45 +86,74 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
        return sd.instanceSets
 }
 
+// ReleaseCloudOps releases n pending Create/Destroy calls. If there
+// are fewer than n blocked calls pending, it waits for the rest to
+// arrive.
+func (sd *StubDriver) ReleaseCloudOps(n int) {
+       for i := 0; i < n; i++ {
+               <-sd.holdCloudOps
+       }
+}
+
 type StubInstanceSet struct {
        driver  *StubDriver
-       servers map[cloud.InstanceID]*stubServer
+       logger  logrus.FieldLogger
+       servers map[cloud.InstanceID]*StubVM
        mtx     sync.RWMutex
        stopped bool
+
+       allowCreateCall    time.Time
+       allowInstancesCall time.Time
+       lastInstanceID     int
 }
 
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        if sis.stopped {
                return nil, errors.New("StubInstanceSet: Create called after Stop")
        }
+       if sis.allowCreateCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowCreateCall}
+       }
+       sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
        ak := sis.driver.AuthorizedKeys
        if authKey != nil {
                ak = append([]ssh.PublicKey{authKey}, ak...)
        }
-       var ss *stubServer
-       ss = &stubServer{
+       sis.lastInstanceID++
+       svm := &StubVM{
                sis:          sis,
-               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               id:           cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
-               SSHService: SSHService{
-                       HostKey:        sis.driver.HostKey,
-                       AuthorizedKeys: ak,
-                       Exec: func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-                               return sis.driver.Exec(ss.Instance(), command, stdin, stdout, stderr)
-                       },
-               },
+               initCommand:  cmd,
+               running:      map[string]int64{},
+               killing:      map[string]bool{},
        }
-
-       sis.servers[ss.id] = ss
-       return ss.Instance(), nil
+       svm.SSHService = SSHService{
+               HostKey:        sis.driver.HostKey,
+               AuthorizedUser: "root",
+               AuthorizedKeys: ak,
+               Exec:           svm.Exec,
+       }
+       if setup := sis.driver.SetupVM; setup != nil {
+               setup(svm)
+       }
+       sis.servers[svm.id] = svm
+       return svm.Instance(), nil
 }
 
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
        sis.mtx.RLock()
        defer sis.mtx.RUnlock()
+       if sis.allowInstancesCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowInstancesCall}
+       }
+       sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
        var r []cloud.Instance
        for _, ss := range sis.servers {
                r = append(r, ss.Instance())
@@ -105,86 +170,272 @@ func (sis *StubInstanceSet) Stop() {
        sis.stopped = true
 }
 
-// stubServer is a fake server that runs an SSH service. It represents
-// a VM running in a fake cloud.
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
+// StubVM is a fake server that runs an SSH service. It represents a
+// VM running in a fake cloud.
 //
 // Note this is distinct from a stubInstance, which is a snapshot of
-// the VM's metadata. As with a VM in a real cloud, the stubServer
-// keeps running (and might change IP addresses, shut down, etc.)
-// without updating any stubInstances that have been returned to
-// callers.
-type stubServer struct {
+// the VM's metadata. Like a VM in a real cloud, a StubVM keeps
+// running (and might change IP addresses, shut down, etc.)  without
+// updating any stubInstances that have been returned to callers.
+type StubVM struct {
+       Boot                  time.Time
+       Broken                time.Time
+       ReportBroken          time.Time
+       CrunchRunMissing      bool
+       CrunchRunCrashRate    float64
+       CrunchRunDetachDelay  time.Duration
+       ExecuteContainer      func(arvados.Container) int
+       CrashRunningContainer func(arvados.Container)
+
        sis          *StubInstanceSet
        id           cloud.InstanceID
        tags         cloud.InstanceTags
+       initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
+       running      map[string]int64
+       killing      map[string]bool
+       lastPID      int64
        sync.Mutex
 }
 
-func (ss *stubServer) Instance() stubInstance {
-       ss.Lock()
-       defer ss.Unlock()
+func (svm *StubVM) Instance() stubInstance {
+       svm.Lock()
+       defer svm.Unlock()
        return stubInstance{
-               ss:   ss,
-               addr: ss.SSHService.Address(),
+               svm:  svm,
+               addr: svm.SSHService.Address(),
                // We deliberately return a cached/stale copy of the
                // real tags here, so that (Instance)Tags() sometimes
                // returns old data after a call to
                // (Instance)SetTags().  This is permitted by the
                // driver interface, and this might help remind
                // callers that they need to tolerate it.
-               tags: copyTags(ss.tags),
+               tags: copyTags(svm.tags),
        }
 }
 
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       stdinData, err := ioutil.ReadAll(stdin)
+       if err != nil {
+               fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+               return 1
+       }
+       queue := svm.sis.driver.Queue
+       uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+       if eta := svm.Boot.Sub(time.Now()); eta > 0 {
+               fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+               return 1
+       }
+       if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
+               fmt.Fprintf(stderr, "cannot fork\n")
+               return 2
+       }
+       if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
+               fmt.Fprint(stderr, "crunch-run: command not found\n")
+               return 1
+       }
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+               var stdinKV map[string]string
+               err := json.Unmarshal(stdinData, &stdinKV)
+               if err != nil {
+                       fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+                       return 1
+               }
+               for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+                       if stdinKV[name] == "" {
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
+                               return 1
+                       }
+               }
+               svm.Lock()
+               svm.lastPID++
+               pid := svm.lastPID
+               svm.running[uuid] = pid
+               svm.Unlock()
+               time.Sleep(svm.CrunchRunDetachDelay)
+               fmt.Fprintf(stderr, "starting %s\n", uuid)
+               logger := svm.sis.logger.WithFields(logrus.Fields{
+                       "Instance":      svm.id,
+                       "ContainerUUID": uuid,
+                       "PID":           pid,
+               })
+               logger.Printf("[test] starting crunch-run stub")
+               go func() {
+                       var ctr arvados.Container
+                       var started, completed bool
+                       defer func() {
+                               logger.Print("[test] exiting crunch-run stub")
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] != pid {
+                                       if !completed {
+                                               bugf := svm.sis.driver.Bugf
+                                               if bugf == nil {
+                                                       bugf = logger.Warnf
+                                               }
+                                               bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
+                                       }
+                               } else {
+                                       delete(svm.running, uuid)
+                               }
+                               if !completed {
+                                       logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+                                       if started && svm.CrashRunningContainer != nil {
+                                               svm.CrashRunningContainer(ctr)
+                                       }
+                               }
+                       }()
+
+                       crashluck := math_rand.Float64()
+                       wantCrash := crashluck < svm.CrunchRunCrashRate
+                       wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
+
+                       ctr, ok := queue.Get(uuid)
+                       if !ok {
+                               logger.Print("[test] container not in queue")
+                               return
+                       }
+
+                       time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+
+                       svm.Lock()
+                       killed := svm.running[uuid] != pid
+                       svm.Unlock()
+                       if killed || wantCrashEarly {
+                               return
+                       }
+
+                       ctr.State = arvados.ContainerStateRunning
+                       started = queue.Notify(ctr)
+                       if !started {
+                               ctr, _ = queue.Get(uuid)
+                               logger.Print("[test] erroring out because state=Running update was rejected")
+                               return
+                       }
+
+                       if wantCrash {
+                               logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
+                               return
+                       }
+                       if svm.ExecuteContainer != nil {
+                               ctr.ExitCode = svm.ExecuteContainer(ctr)
+                       }
+                       logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
+                       ctr.State = arvados.ContainerStateComplete
+                       completed = queue.Notify(ctr)
+               }()
+               return 0
+       }
+       if command == "crunch-run --list" {
+               svm.Lock()
+               defer svm.Unlock()
+               for uuid := range svm.running {
+                       fmt.Fprintf(stdout, "%s\n", uuid)
+               }
+               if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+                       fmt.Fprintln(stdout, "broken")
+               }
+               return 0
+       }
+       if strings.HasPrefix(command, "crunch-run --kill ") {
+               svm.Lock()
+               pid, running := svm.running[uuid]
+               if running && !svm.killing[uuid] {
+                       svm.killing[uuid] = true
+                       go func() {
+                               time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
+                               svm.Lock()
+                               defer svm.Unlock()
+                               if svm.running[uuid] == pid {
+                                       // Kill only if the running entry
+                                       // hasn't since been killed and
+                                       // replaced with a different one.
+                                       delete(svm.running, uuid)
+                               }
+                               delete(svm.killing, uuid)
+                       }()
+                       svm.Unlock()
+                       time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
+                       svm.Lock()
+                       _, running = svm.running[uuid]
+               }
+               svm.Unlock()
+               if running {
+                       fmt.Fprintf(stderr, "%s: container is running\n", uuid)
+                       return 1
+               }
+               fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+               return 0
+       }
+       if command == "true" {
+               return 0
+       }
+       fmt.Fprintf(stderr, "%q: command not found", command)
+       return 1
+}
+
 type stubInstance struct {
-       ss   *stubServer
+       svm  *StubVM
        addr string
        tags cloud.InstanceTags
 }
 
 func (si stubInstance) ID() cloud.InstanceID {
-       return si.ss.id
+       return si.svm.id
 }
 
 func (si stubInstance) Address() string {
        return si.addr
 }
 
+func (si stubInstance) RemoteUser() string {
+       return si.svm.SSHService.AuthorizedUser
+}
+
 func (si stubInstance) Destroy() error {
-       if math_rand.Float64() < si.ss.sis.driver.ErrorRateDestroy {
+       sis := si.svm.sis
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
+       if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
                return errors.New("instance could not be destroyed")
        }
-       si.ss.SSHService.Close()
-       sis := si.ss.sis
+       si.svm.SSHService.Close()
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
-       delete(sis.servers, si.ss.id)
+       delete(sis.servers, si.svm.id)
        return nil
 }
 
 func (si stubInstance) ProviderType() string {
-       return si.ss.providerType
+       return si.svm.providerType
 }
 
 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
        tags = copyTags(tags)
-       ss := si.ss
+       svm := si.svm
        go func() {
-               ss.Lock()
-               defer ss.Unlock()
-               ss.tags = tags
+               svm.Lock()
+               defer svm.Unlock()
+               svm.tags = tags
        }()
        return nil
 }
 
 func (si stubInstance) Tags() cloud.InstanceTags {
-       return si.tags
+       // Return a copy to ensure a caller can't change our saved
+       // tags just by writing to the returned map.
+       return copyTags(si.tags)
 }
 
 func (si stubInstance) String() string {
-       return string(si.ss.id)
+       return string(si.svm.id)
 }
 
 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
@@ -193,7 +444,7 @@ func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) erro
        if err != nil {
                return err
        }
-       sig, err := si.ss.sis.driver.HostKey.Sign(rand.Reader, buf)
+       sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
        if err != nil {
                return err
        }