Merge branch '21666-provision-test-improvement'
[arvados.git] / lib / dispatchcloud / worker / worker_test.go
index 943fa7c710bb22af055838239a0eea2ecde11f93..5d8c67e9162140a9eeb8f62a897c7c955d900188 100644 (file)
@@ -5,28 +5,45 @@
 package worker
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
+       "fmt"
        "io"
+       "strings"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&WorkerSuite{})
 
-type WorkerSuite struct{}
+type WorkerSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
        ac := arvados.NewClientFromEnv()
-       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
        c.Assert(err, check.IsNil)
        inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
        c.Assert(err, check.IsNil)
@@ -38,7 +55,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                running         int
                starting        int
                respBoot        stubResp // zero value is success
+               respDeploy      stubResp // zero value is success
                respRun         stubResp // zero value is success + nothing running
+               respRunDeployed stubResp
+               deployRunner    []byte
+               expectStdin     []byte
                expectState     State
                expectRunning   int
        }
@@ -46,7 +67,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
        errFail := errors.New("failed")
        respFail := stubResp{"", "command failed\n", errFail}
        respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
-       for _, trial := range []trialT{
+       for idx, trial := range []trialT{
                {
                        testCaseComment: "Unknown, probes fail",
                        state:           StateUnknown,
@@ -101,6 +122,39 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        expectState:     StateUnknown,
                        expectRunning:   1,
                },
+               {
+                       testCaseComment: "Unknown, boot probe fails, deployRunner succeeds, container is running",
+                       state:           StateUnknown,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       respRunDeployed: respContainerRunning,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateUnknown,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but deployRunner succeeds and container is running",
+                       state:           StateUnknown,
+                       age:             bootTimeout * 2,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       respRunDeployed: respContainerRunning,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateUnknown,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but deployRunner succeeds and no container is running",
+                       state:           StateUnknown,
+                       age:             bootTimeout * 2,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateShutdown,
+               },
                {
                        testCaseComment: "Booting, boot probe fails, run probe fails",
                        state:           StateBooting,
@@ -185,23 +239,63 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        starting:        1,
                        expectState:     StateRunning,
                },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       respRun:         respFail,
+                       respRunDeployed: respContainerRunning,
+                       expectRunning:   1,
+                       expectState:     StateRunning,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       respDeploy:      respFail,
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    nil,
+                       respDeploy:      respFail,
+                       expectState:     StateIdle,
+               },
        } {
-               c.Logf("------- %#v", trial)
+               c.Logf("------- trial %d: %#v", idx, trial)
                ctime := time.Now().Add(-trial.age)
-               exr := stubExecutor{
-                       "bootprobe":         trial.respBoot,
-                       "crunch-run --list": trial.respRun,
+               exr := &stubExecutor{
+                       response: map[string]stubResp{
+                               "bootprobe":         trial.respBoot,
+                               "crunch-run --list": trial.respRun,
+                               "{deploy}":          trial.respDeploy,
+                       },
                }
                wp := &Pool{
                        arvClient:        ac,
                        newExecutor:      func(cloud.Instance) Executor { return exr },
+                       cluster:          suite.testCluster,
                        bootProbeCommand: "bootprobe",
                        timeoutBooting:   bootTimeout,
                        timeoutProbe:     probeTimeout,
                        exited:           map[string]time.Time{},
+                       runnerCmdDefault: "crunch-run",
+                       runnerArgs:       []string{"--args=not used with --list"},
+                       runnerCmd:        "crunch-run",
+                       runnerData:       trial.deployRunner,
+                       runnerMD5:        md5.Sum(trial.deployRunner),
+               }
+               wp.registerMetrics(prometheus.NewRegistry())
+               if trial.deployRunner != nil {
+                       svHash := md5.Sum(trial.deployRunner)
+                       wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+                       exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
                }
                wkr := &worker{
-                       logger:   logger,
+                       logger:   suite.logger,
                        executor: exr,
                        wp:       wp,
                        mtx:      &wp.mtx,
@@ -226,6 +320,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
                c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+               c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
        }
 }
 
@@ -234,14 +329,27 @@ type stubResp struct {
        stderr string
        err    error
 }
-type stubExecutor map[string]stubResp
 
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close()                         {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
-       resp, ok := se[cmd]
+type stubExecutor struct {
+       response map[string]stubResp
+       stdin    bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close()                         {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+       if stdin != nil {
+               _, err = io.Copy(&se.stdin, stdin)
+               if err != nil {
+                       return nil, []byte(err.Error()), err
+               }
+       }
+       resp, ok := se.response[cmd]
+       if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+               resp, ok = se.response["{deploy}"]
+       }
        if !ok {
-               return nil, []byte("command not found\n"), errors.New("command not found")
+               return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
        }
        return []byte(resp.stdout), []byte(resp.stderr), resp.err
 }