Merge branch '15026-cloudtest'
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 26 Jun 2019 18:55:31 +0000 (14:55 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 26 Jun 2019 18:55:31 +0000 (14:55 -0400)
refs #15026

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

12 files changed:
build/run-tests.sh
cmd/arvados-server/cmd.go
lib/cloud/cloudtest/cmd.go [new file with mode: 0644]
lib/cloud/cloudtest/tester.go [new file with mode: 0644]
lib/cloud/cloudtest/tester_test.go [new file with mode: 0644]
lib/config/config.default.yml
lib/config/generated_config.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/ssh_executor/executor.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/verify.go

index 97c21b405ca8ef50eec0a2f93ee6679ddb909c8f..6f8f117744e431a4ce166f396de35725fc00da4d 100755 (executable)
@@ -80,6 +80,7 @@ lib/controller
 lib/crunchstat
 lib/cloud
 lib/cloud/azure
+lib/cloud/cloudtest
 lib/dispatchcloud
 lib/dispatchcloud/container
 lib/dispatchcloud/scheduler
@@ -734,7 +735,7 @@ do_test() {
         services/api)
             stop_services
             ;;
-        gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
+        gofmt | govendor | doc | lib/cli | lib/cloud/azure | lib/cloud/ec2 | lib/cloud/cloudtest | lib/cmd | lib/dispatchcloud/ssh_executor | lib/dispatchcloud/worker)
             # don't care whether services are running
             ;;
         *)
@@ -992,6 +993,7 @@ gostuff=(
     lib/cloud
     lib/cloud/azure
     lib/cloud/ec2
+    lib/cloud/cloudtest
     lib/config
     lib/dispatchcloud
     lib/dispatchcloud/container
index 983159382297dab0a5d95fbf1f35f440fc015720..2506bd2c98892579c974c93f925673277c4c3172 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "os"
 
+       "git.curoverse.com/arvados.git/lib/cloud/cloudtest"
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/config"
        "git.curoverse.com/arvados.git/lib/controller"
@@ -20,6 +21,7 @@ var (
                "-version":  cmd.Version(version),
                "--version": cmd.Version(version),
 
+               "cloudtest":      cloudtest.Command,
                "config-check":   config.CheckCommand,
                "config-dump":    config.DumpCommand,
                "controller":     controller.Command,
diff --git a/lib/cloud/cloudtest/cmd.go b/lib/cloud/cloudtest/cmd.go
new file mode 100644 (file)
index 0000000..1f94ea6
--- /dev/null
@@ -0,0 +1,153 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+       "bufio"
+       "errors"
+       "flag"
+       "fmt"
+       "io"
+       "os"
+       "os/user"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/config"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "golang.org/x/crypto/ssh"
+)
+
+var Command command
+
+type command struct{}
+
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
+       instanceSetID := flags.String("instance-set-id", defaultInstanceSetID(), "InstanceSetID tag `value` to use on the test instance")
+       imageID := flags.String("image-id", "", "Image ID to use when creating the test instance (if empty, use cluster config)")
+       instanceType := flags.String("instance-type", "", "Instance type to create (if empty, use cheapest type in config)")
+       destroyExisting := flags.Bool("destroy-existing", false, "Destroy any existing instances tagged with our InstanceSetID, instead of erroring out")
+       shellCommand := flags.String("command", "", "Run an interactive shell command on the test instance when it boots")
+       pauseBeforeDestroy := flags.Bool("pause-before-destroy", false, "Prompt and wait before destroying the test instance")
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+
+       if len(flags.Args()) != 0 {
+               flags.Usage()
+               return 2
+       }
+       logger := ctxlog.New(stderr, "text", "info")
+       defer func() {
+               if err != nil {
+                       logger.WithError(err).Error("fatal")
+                       // suppress output from the other error-printing func
+                       err = nil
+               }
+               logger.Info("exiting")
+       }()
+
+       cfg, err := config.LoadFile(*configFile, logger)
+       if err != nil {
+               return 1
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               return 1
+       }
+       key, err := ssh.ParsePrivateKey([]byte(cluster.Containers.DispatchPrivateKey))
+       if err != nil {
+               err = fmt.Errorf("error parsing configured Containers.DispatchPrivateKey: %s", err)
+               return 1
+       }
+       driver, ok := dispatchcloud.Drivers[cluster.Containers.CloudVMs.Driver]
+       if !ok {
+               err = fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
+               return 1
+       }
+       if *imageID == "" {
+               *imageID = cluster.Containers.CloudVMs.ImageID
+       }
+       it, err := chooseInstanceType(cluster, *instanceType)
+       if err != nil {
+               return 1
+       }
+       tags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+       tagKeyPrefix := cluster.Containers.CloudVMs.TagKeyPrefix
+       tags[tagKeyPrefix+"CloudTestPID"] = fmt.Sprintf("%d", os.Getpid())
+       if !(&tester{
+               Logger:           logger,
+               Tags:             tags,
+               TagKeyPrefix:     tagKeyPrefix,
+               SetID:            cloud.InstanceSetID(*instanceSetID),
+               DestroyExisting:  *destroyExisting,
+               ProbeInterval:    cluster.Containers.CloudVMs.ProbeInterval.Duration(),
+               SyncInterval:     cluster.Containers.CloudVMs.SyncInterval.Duration(),
+               TimeoutBooting:   cluster.Containers.CloudVMs.TimeoutBooting.Duration(),
+               Driver:           driver,
+               DriverParameters: cluster.Containers.CloudVMs.DriverParameters,
+               ImageID:          cloud.ImageID(*imageID),
+               InstanceType:     it,
+               SSHKey:           key,
+               SSHPort:          cluster.Containers.CloudVMs.SSHPort,
+               BootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+               ShellCommand:     *shellCommand,
+               PauseBeforeDestroy: func() {
+                       if *pauseBeforeDestroy {
+                               logger.Info("waiting for operator to press Enter")
+                               fmt.Fprint(stderr, "Press Enter to continue: ")
+                               bufio.NewReader(stdin).ReadString('\n')
+                       }
+               },
+       }).Run() {
+               return 1
+       }
+       return 0
+}
+
+func defaultInstanceSetID() string {
+       username := ""
+       if u, err := user.Current(); err == nil {
+               username = u.Username
+       }
+       hostname, _ := os.Hostname()
+       return fmt.Sprintf("cloudtest-%s@%s", username, hostname)
+}
+
+// Return the named instance type, or the cheapest type if name=="".
+func chooseInstanceType(cluster *arvados.Cluster, name string) (arvados.InstanceType, error) {
+       if len(cluster.InstanceTypes) == 0 {
+               return arvados.InstanceType{}, errors.New("no instance types are configured")
+       } else if name == "" {
+               first := true
+               var best arvados.InstanceType
+               for _, it := range cluster.InstanceTypes {
+                       if first || best.Price > it.Price {
+                               best = it
+                               first = false
+                       }
+               }
+               return best, nil
+       } else if it, ok := cluster.InstanceTypes[name]; !ok {
+               return it, fmt.Errorf("requested instance type %q is not configured", name)
+       } else {
+               return it, nil
+       }
+}
diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
new file mode 100644 (file)
index 0000000..dc5a473
--- /dev/null
@@ -0,0 +1,377 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+       "crypto/rand"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+)
+
+var (
+       errTestInstanceNotFound = errors.New("test instance missing from cloud provider's list")
+)
+
+// A tester does a sequence of operations to test a cloud driver and
+// configuration. Run() should be called only once, after assigning
+// suitable values to public fields.
+type tester struct {
+       Logger             logrus.FieldLogger
+       Tags               cloud.SharedResourceTags
+       TagKeyPrefix       string
+       SetID              cloud.InstanceSetID
+       DestroyExisting    bool
+       ProbeInterval      time.Duration
+       SyncInterval       time.Duration
+       TimeoutBooting     time.Duration
+       Driver             cloud.Driver
+       DriverParameters   json.RawMessage
+       InstanceType       arvados.InstanceType
+       ImageID            cloud.ImageID
+       SSHKey             ssh.Signer
+       SSHPort            string
+       BootProbeCommand   string
+       ShellCommand       string
+       PauseBeforeDestroy func()
+
+       is              cloud.InstanceSet
+       testInstance    *worker.TagVerifier
+       secret          string
+       executor        *ssh_executor.Executor
+       showedLoginInfo bool
+
+       failed bool
+}
+
+// Run the test suite as specified, clean up as needed, and return
+// true (everything is OK) or false (something went wrong).
+func (t *tester) Run() bool {
+       // This flag gets set when we encounter a non-fatal error, so
+       // we can continue doing more tests but remember to return
+       // false (failure) at the end.
+       deferredError := false
+
+       var err error
+       t.is, err = t.Driver.InstanceSet(t.DriverParameters, t.SetID, t.Tags, t.Logger)
+       if err != nil {
+               t.Logger.WithError(err).Info("error initializing driver")
+               return false
+       }
+
+       // Don't send the driver any filters the first time we get the
+       // instance list. This way we can log an instance count
+       // (N=...)  that includes all instances in this service
+       // account, even if they don't have the same InstanceSetID.
+       insts, err := t.getInstances(nil)
+       if err != nil {
+               t.Logger.WithError(err).Info("error getting initial list of instances")
+               return false
+       }
+
+       for {
+               foundExisting := false
+               for _, i := range insts {
+                       if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+                               continue
+                       }
+                       lgr := t.Logger.WithFields(logrus.Fields{
+                               "Instance":      i.ID(),
+                               "InstanceSetID": t.SetID,
+                       })
+                       foundExisting = true
+                       if t.DestroyExisting {
+                               lgr.Info("destroying existing instance with our InstanceSetID")
+                               err := i.Destroy()
+                               if err != nil {
+                                       lgr.WithError(err).Error("error destroying existing instance")
+                               } else {
+                                       lgr.Info("Destroy() call succeeded")
+                               }
+                       } else {
+                               lgr.Error("found existing instance with our InstanceSetID")
+                       }
+               }
+               if !foundExisting {
+                       break
+               } else if t.DestroyExisting {
+                       t.sleepSyncInterval()
+               } else {
+                       t.Logger.Error("cannot continue with existing instances -- clean up manually, use -destroy-existing=true, or choose a different -instance-set-id")
+                       return false
+               }
+       }
+
+       t.secret = randomHex(40)
+
+       tags := cloud.InstanceTags{}
+       for k, v := range t.Tags {
+               tags[k] = v
+       }
+       tags[t.TagKeyPrefix+"InstanceSetID"] = string(t.SetID)
+       tags[t.TagKeyPrefix+"InstanceSecret"] = t.secret
+
+       defer t.destroyTestInstance()
+
+       bootDeadline := time.Now().Add(t.TimeoutBooting)
+       initCommand := worker.TagVerifier{nil, t.secret}.InitCommand()
+
+       t.Logger.WithFields(logrus.Fields{
+               "InstanceType":         t.InstanceType.Name,
+               "ProviderInstanceType": t.InstanceType.ProviderType,
+               "ImageID":              t.ImageID,
+               "Tags":                 tags,
+               "InitCommand":          initCommand,
+       }).Info("creating instance")
+       inst, err := t.is.Create(t.InstanceType, t.ImageID, tags, initCommand, t.SSHKey.PublicKey())
+       if err != nil {
+               // Create() might have failed due to a bug or network
+               // error even though the creation was successful, so
+               // it's safer to wait a bit for an instance to appear.
+               deferredError = true
+               t.Logger.WithError(err).Error("error creating test instance")
+               t.Logger.WithField("Deadline", bootDeadline).Info("waiting for instance to appear anyway, in case the Create response was incorrect")
+               for err = t.refreshTestInstance(); err != nil; err = t.refreshTestInstance() {
+                       if time.Now().After(bootDeadline) {
+                               t.Logger.Error("timed out")
+                               return false
+                       } else {
+                               t.sleepSyncInterval()
+                       }
+               }
+               t.Logger.WithField("Instance", t.testInstance.ID()).Info("new instance appeared")
+               t.showLoginInfo()
+       } else {
+               // Create() succeeded. Make sure the new instance
+               // appears right away in the Instances() list.
+               t.Logger.WithField("Instance", inst.ID()).Info("created instance")
+               t.testInstance = &worker.TagVerifier{inst, t.secret}
+               t.showLoginInfo()
+               err = t.refreshTestInstance()
+               if err == errTestInstanceNotFound {
+                       t.Logger.WithError(err).Error("cloud/driver Create succeeded, but instance is not in list")
+                       deferredError = true
+               } else if err != nil {
+                       t.Logger.WithError(err).Error("error getting list of instances")
+                       return false
+               }
+       }
+
+       if !t.checkTags() {
+               // checkTags() already logged the errors
+               deferredError = true
+       }
+
+       if !t.waitForBoot(bootDeadline) {
+               deferredError = true
+       }
+
+       if t.ShellCommand != "" {
+               err = t.runShellCommand(t.ShellCommand)
+               if err != nil {
+                       t.Logger.WithError(err).Error("shell command failed")
+                       deferredError = true
+               }
+       }
+
+       if fn := t.PauseBeforeDestroy; fn != nil {
+               t.showLoginInfo()
+               fn()
+       }
+
+       return !deferredError
+}
+
+// If the test instance has an address, log an "ssh user@host" command
+// line that the operator can paste into another terminal, and set
+// t.showedLoginInfo.
+//
+// If the test instance doesn't have an address yet, do nothing.
+func (t *tester) showLoginInfo() {
+       t.updateExecutor()
+       host, port := t.executor.TargetHostPort()
+       if host == "" {
+               return
+       }
+       user := t.testInstance.RemoteUser()
+       t.Logger.WithField("Command", fmt.Sprintf("ssh -p%s %s@%s", port, user, host)).Info("showing login information")
+       t.showedLoginInfo = true
+}
+
+// Get the latest instance list from the driver. If our test instance
+// is found, assign it to t.testIntance.
+func (t *tester) refreshTestInstance() error {
+       insts, err := t.getInstances(cloud.InstanceTags{t.TagKeyPrefix + "InstanceSetID": string(t.SetID)})
+       if err != nil {
+               return err
+       }
+       for _, i := range insts {
+               if t.testInstance == nil {
+                       // Filter by InstanceSetID tag value
+                       if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] != string(t.SetID) {
+                               continue
+                       }
+               } else {
+                       // Filter by instance ID
+                       if i.ID() != t.testInstance.ID() {
+                               continue
+                       }
+               }
+               t.Logger.WithFields(logrus.Fields{
+                       "Instance": i.ID(),
+                       "Address":  i.Address(),
+               }).Info("found our instance in returned list")
+               t.testInstance = &worker.TagVerifier{i, t.secret}
+               if !t.showedLoginInfo {
+                       t.showLoginInfo()
+               }
+               return nil
+       }
+       return errTestInstanceNotFound
+}
+
+// Get the list of instances, passing the given tags to the cloud
+// driver to filter results.
+//
+// Return only the instances that have our InstanceSetID tag.
+func (t *tester) getInstances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       var ret []cloud.Instance
+       t.Logger.WithField("FilterTags", tags).Info("getting instance list")
+       insts, err := t.is.Instances(tags)
+       if err != nil {
+               return nil, err
+       }
+       t.Logger.WithField("N", len(insts)).Info("got instance list")
+       for _, i := range insts {
+               if i.Tags()[t.TagKeyPrefix+"InstanceSetID"] == string(t.SetID) {
+                       ret = append(ret, i)
+               }
+       }
+       return ret, nil
+}
+
+// Check that t.testInstance has every tag in t.Tags. If not, log an
+// error and return false.
+func (t *tester) checkTags() bool {
+       ok := true
+       for k, v := range t.Tags {
+               if got := t.testInstance.Tags()[k]; got != v {
+                       ok = false
+                       t.Logger.WithFields(logrus.Fields{
+                               "Key":           k,
+                               "ExpectedValue": v,
+                               "GotValue":      got,
+                       }).Error("tag is missing from test instance")
+               }
+       }
+       if ok {
+               t.Logger.Info("all expected tags are present")
+       }
+       return ok
+}
+
+// Run t.BootProbeCommand on t.testInstance until it succeeds or the
+// deadline arrives.
+func (t *tester) waitForBoot(deadline time.Time) bool {
+       for time.Now().Before(deadline) {
+               err := t.runShellCommand(t.BootProbeCommand)
+               if err == nil {
+                       return true
+               }
+               t.sleepProbeInterval()
+               t.refreshTestInstance()
+       }
+       t.Logger.Error("timed out")
+       return false
+}
+
+// Create t.executor and/or update its target to t.testInstance's
+// current address.
+func (t *tester) updateExecutor() {
+       if t.executor == nil {
+               t.executor = ssh_executor.New(t.testInstance)
+               t.executor.SetTargetPort(t.SSHPort)
+               t.executor.SetSigners(t.SSHKey)
+       } else {
+               t.executor.SetTarget(t.testInstance)
+       }
+}
+
+func (t *tester) runShellCommand(cmd string) error {
+       t.updateExecutor()
+       t.Logger.WithFields(logrus.Fields{
+               "Command": cmd,
+       }).Info("executing remote command")
+       stdout, stderr, err := t.executor.Execute(nil, cmd, nil)
+       lgr := t.Logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       })
+       if err != nil {
+               lgr.WithError(err).Info("remote command failed")
+       } else {
+               lgr.Info("remote command succeeded")
+       }
+       return err
+}
+
+// currently, this tries forever until it can return true (success).
+func (t *tester) destroyTestInstance() bool {
+       if t.testInstance == nil {
+               return true
+       }
+       for {
+               t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroying instance")
+               err := t.testInstance.Destroy()
+               if err != nil {
+                       t.Logger.WithError(err).WithField("Instance", t.testInstance.ID()).Error("error destroying instance")
+               } else {
+                       t.Logger.WithField("Instance", t.testInstance.ID()).Info("destroyed instance")
+               }
+               err = t.refreshTestInstance()
+               if err == errTestInstanceNotFound {
+                       t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance no longer appears in list")
+                       t.testInstance = nil
+                       return true
+               } else if err == nil {
+                       t.Logger.WithField("Instance", t.testInstance.ID()).Info("instance still exists after calling Destroy")
+                       t.sleepSyncInterval()
+                       continue
+               } else {
+                       t.Logger.WithError(err).Error("error getting list of instances")
+                       continue
+               }
+       }
+}
+
+func (t *tester) sleepSyncInterval() {
+       t.Logger.WithField("Duration", t.SyncInterval).Info("waiting SyncInterval")
+       time.Sleep(t.SyncInterval)
+}
+
+func (t *tester) sleepProbeInterval() {
+       t.Logger.WithField("Duration", t.ProbeInterval).Info("waiting ProbeInterval")
+       time.Sleep(t.ProbeInterval)
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}
diff --git a/lib/cloud/cloudtest/tester_test.go b/lib/cloud/cloudtest/tester_test.go
new file mode 100644 (file)
index 0000000..358530b
--- /dev/null
@@ -0,0 +1,97 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloudtest
+
+import (
+       "bytes"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&TesterSuite{})
+
+type TesterSuite struct {
+       stubDriver *test.StubDriver
+       cluster    *arvados.Cluster
+       tester     *tester
+       log        bytes.Buffer
+}
+
+func (s *TesterSuite) SetUpTest(c *check.C) {
+       pubkey, privkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
+       _, privhostkey := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_vm")
+       s.stubDriver = &test.StubDriver{
+               HostKey:                   privhostkey,
+               AuthorizedKeys:            []ssh.PublicKey{pubkey},
+               ErrorRateDestroy:          0.1,
+               MinTimeBetweenCreateCalls: time.Millisecond,
+       }
+       tagKeyPrefix := "tagprefix:"
+       s.cluster = &arvados.Cluster{
+               ManagementToken: "test-management-token",
+               Containers: arvados.ContainersConfig{
+                       CloudVMs: arvados.CloudVMsConfig{
+                               SyncInterval:   arvados.Duration(10 * time.Millisecond),
+                               TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+                               TimeoutProbe:   arvados.Duration(15 * time.Millisecond),
+                               ProbeInterval:  arvados.Duration(5 * time.Millisecond),
+                               ResourceTags:   map[string]string{"testtag": "test value"},
+                       },
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       test.InstanceType(1).Name: test.InstanceType(1),
+                       test.InstanceType(2).Name: test.InstanceType(2),
+                       test.InstanceType(3).Name: test.InstanceType(3),
+               },
+       }
+       s.tester = &tester{
+               Logger:           ctxlog.New(&s.log, "text", "info"),
+               Tags:             cloud.SharedResourceTags{"testtagkey": "testtagvalue"},
+               TagKeyPrefix:     tagKeyPrefix,
+               SetID:            cloud.InstanceSetID("test-instance-set-id"),
+               ProbeInterval:    5 * time.Millisecond,
+               SyncInterval:     10 * time.Millisecond,
+               TimeoutBooting:   150 * time.Millisecond,
+               Driver:           s.stubDriver,
+               DriverParameters: nil,
+               InstanceType:     test.InstanceType(2),
+               ImageID:          "test-image-id",
+               SSHKey:           privkey,
+               BootProbeCommand: "crunch-run --list",
+               ShellCommand:     "true",
+       }
+}
+
+func (s *TesterSuite) TestSuccess(c *check.C) {
+       s.tester.Logger = ctxlog.TestLogger(c)
+       ok := s.tester.Run()
+       c.Check(ok, check.Equals, true)
+}
+
+func (s *TesterSuite) TestBootFail(c *check.C) {
+       s.tester.BootProbeCommand = "falsey"
+       ok := s.tester.Run()
+       c.Check(ok, check.Equals, false)
+       c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
+
+func (s *TesterSuite) TestShellCommandFail(c *check.C) {
+       s.tester.ShellCommand = "falsey"
+       ok := s.tester.Run()
+       c.Check(ok, check.Equals, false)
+       c.Check(s.log.String(), check.Matches, `(?ms).*\\"falsey\\": command not found.*`)
+}
index 0078da37eba25f61d6f807c6152d0db707d1cd94..7a31d03504cbe5214476f23ba1dbfca10199b168 100644 (file)
@@ -500,7 +500,7 @@ Clusters:
         # Shell command to execute on each worker to determine whether
         # the worker is booted and ready to run containers. It should
         # exit zero if the worker is ready.
-        BootProbeCommand: "docker ps"
+        BootProbeCommand: "docker ps -q"
 
         # Minimum interval between consecutive probes to a single
         # worker.
index 6b2bb61112f7d371b58e88dfadad1a07d9476277..763e7b2bce70a9b0ee7337fc0ebac6b958389f20 100644 (file)
@@ -506,7 +506,7 @@ Clusters:
         # Shell command to execute on each worker to determine whether
         # the worker is booted and ready to run containers. It should
         # exit zero if the worker is ready.
-        BootProbeCommand: "docker ps"
+        BootProbeCommand: "docker ps -q"
 
         # Minimum interval between consecutive probes to a single
         # worker.
index 07fdc672282d5475308a9f010555f9a0efc241c6..4a5ca3017afdab64ac288fcfbc7147dcf90e3027 100644 (file)
@@ -107,7 +107,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
 // a fake queue and cloud driver. The fake cloud driver injects
 // artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
-       drivers["test"] = s.stubDriver
+       Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
@@ -211,7 +211,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
-       drivers["test"] = s.stubDriver
+       Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        s.disp.queue = &test.Queue{}
        go s.disp.run()
@@ -233,7 +233,7 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
 
 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
        s.cluster.ManagementToken = ""
-       drivers["test"] = s.stubDriver
+       Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        s.disp.queue = &test.Queue{}
        go s.disp.run()
@@ -252,7 +252,7 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
-       drivers["test"] = s.stubDriver
+       Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        s.disp.queue = &test.Queue{}
        go s.disp.run()
index 6162c81b63bc4589df0d4beb3bbe6bc35a4330e8..f1ae68c001b24e7c8c613db9680105b57889edb1 100644 (file)
@@ -17,13 +17,16 @@ import (
        "golang.org/x/crypto/ssh"
 )
 
-var drivers = map[string]cloud.Driver{
+// Map of available cloud drivers.
+// Clusters.*.Containers.CloudVMs.Driver configuration values
+// correspond to keys in this map.
+var Drivers = map[string]cloud.Driver{
        "azure": azure.Driver,
        "ec2":   ec2.Driver,
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
-       driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
+       driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
        if !ok {
                return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
        }
@@ -85,7 +88,7 @@ func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.
        return is.InstanceSet.Create(it, image, allTags, init, pk)
 }
 
-// Filters the instances returned by the wrapped InstanceSet's
+// Filter the instances returned by the wrapped InstanceSet's
 // Instances() method (in case the wrapped InstanceSet didn't do this
 // itself).
 type filteringInstanceSet struct {
index feed1c2a78b82a84821f22eee99e39e960dbd431..d608763cf5100d46ca0b1cb2b50a6759a157b1e3 100644 (file)
@@ -173,14 +173,13 @@ func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
        return exr.client, exr.clientErr
 }
 
-// Create a new SSH client.
-func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
-       target := exr.Target()
-       addr := target.Address()
+func (exr *Executor) TargetHostPort() (string, string) {
+       addr := exr.Target().Address()
        if addr == "" {
-               return nil, errors.New("instance has no address")
+               return "", ""
        }
-       if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+       h, p, err := net.SplitHostPort(addr)
+       if err != nil || p == "" {
                // Target address does not specify a port.  Use
                // targetPort, or "ssh".
                if h == "" {
@@ -189,11 +188,19 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
                if p = exr.targetPort; p == "" {
                        p = "ssh"
                }
-               addr = net.JoinHostPort(h, p)
+       }
+       return h, p
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+       addr := net.JoinHostPort(exr.TargetHostPort())
+       if addr == ":" {
+               return nil, errors.New("instance has no address")
        }
        var receivedKey ssh.PublicKey
        client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
-               User: target.RemoteUser(),
+               User: exr.Target().RemoteUser(),
                Auth: []ssh.AuthMethod{
                        ssh.PublicKeys(exr.signers...),
                },
@@ -210,7 +217,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        }
 
        if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
-               err = target.VerifyHostKey(receivedKey, client)
+               err = exr.Target().VerifyHostKey(receivedKey, client)
                if err != nil {
                        return nil, err
                }
index 97ca7f60a2a916fd6feece03e016c7fe3673e22d..13b7fd29343772201db02f3a0937df89074adc99 100644 (file)
@@ -292,7 +292,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               initCmd := TagVerifier{nil, secret}.InitCommand()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -346,7 +346,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
        secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
-       inst = tagVerifier{inst, secret}
+       inst = TagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
index 330071951425c1c382f8be4e53f436d758d032f6..c718702101bdcf08502c252f3c59e7d88eeb6735 100644 (file)
@@ -21,13 +21,17 @@ var (
        instanceSecretLength   = 40 // hex digits
 )
 
-type tagVerifier struct {
+type TagVerifier struct {
        cloud.Instance
-       secret string
+       Secret string
 }
 
-func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
-       if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.secret == "" {
+func (tv TagVerifier) InitCommand() cloud.InitCommand {
+       return cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", tv.Secret, instanceSecretFilename))
+}
+
+func (tv TagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+       if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.Secret == "" {
                // If the wrapped instance indicates it has a way to
                // verify the key, return that decision.
                return err
@@ -49,7 +53,7 @@ func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) er
        if err != nil {
                return err
        }
-       if stdout.String() != tv.secret {
+       if stdout.String() != tv.Secret {
                return errBadInstanceSecret
        }
        return nil