X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aa4b9b1d650f2174eb19bbf2ba9787f3ace59b04..HEAD:/lib/dispatchcloud/worker/worker_test.go diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go index 943fa7c710..5d8c67e916 100644 --- a/lib/dispatchcloud/worker/worker_test.go +++ b/lib/dispatchcloud/worker/worker_test.go @@ -5,28 +5,45 @@ package worker import ( + "bytes" + "crypto/md5" "errors" + "fmt" "io" + "strings" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/lib/dispatchcloud/test" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/lib/dispatchcloud/test" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) var _ = check.Suite(&WorkerSuite{}) -type WorkerSuite struct{} +type WorkerSuite struct { + logger logrus.FieldLogger + testCluster *arvados.Cluster +} + +func (suite *WorkerSuite) SetUpTest(c *check.C) { + suite.logger = ctxlog.TestLogger(c) + cfg, err := config.NewLoader(nil, suite.logger).Load() + c.Assert(err, check.IsNil) + suite.testCluster, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) +} func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { - logger := ctxlog.TestLogger(c) bootTimeout := time.Minute probeTimeout := time.Second ac := arvados.NewClientFromEnv() - is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger) + is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil) c.Assert(err, check.IsNil) inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil) c.Assert(err, check.IsNil) @@ -38,7 +55,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { running int starting int respBoot stubResp // zero value is success + respDeploy stubResp // zero value is success respRun stubResp // zero value is success + nothing running + respRunDeployed stubResp + deployRunner []byte + expectStdin []byte expectState State expectRunning int } @@ -46,7 +67,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { errFail := errors.New("failed") respFail := stubResp{"", "command failed\n", errFail} respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil} - for _, trial := range []trialT{ + for idx, trial := range []trialT{ { testCaseComment: "Unknown, probes fail", state: StateUnknown, @@ -101,6 +122,39 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { expectState: StateUnknown, expectRunning: 1, }, + { + testCaseComment: "Unknown, boot probe fails, deployRunner succeeds, container is running", + state: StateUnknown, + respBoot: respFail, + respRun: respFail, + respRunDeployed: respContainerRunning, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + expectState: StateUnknown, + expectRunning: 1, + }, + { + testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but deployRunner succeeds and container is running", + state: StateUnknown, + age: bootTimeout * 2, + respBoot: respFail, + respRun: respFail, + respRunDeployed: respContainerRunning, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + expectState: StateUnknown, + expectRunning: 1, + }, + { + testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but deployRunner succeeds and no container is running", + state: StateUnknown, + age: bootTimeout * 2, + respBoot: respFail, + respRun: respFail, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + expectState: StateShutdown, + }, { testCaseComment: "Booting, boot probe fails, run probe fails", state: StateBooting, @@ -185,23 +239,63 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { starting: 1, expectState: StateRunning, }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds", + state: StateBooting, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + respRun: respFail, + respRunDeployed: respContainerRunning, + expectRunning: 1, + expectState: StateRunning, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner fails", + state: StateBooting, + deployRunner: []byte("ELF"), + respDeploy: respFail, + expectStdin: []byte("ELF"), + expectState: StateBooting, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds", + state: StateBooting, + deployRunner: nil, + respDeploy: respFail, + expectState: StateIdle, + }, } { - c.Logf("------- %#v", trial) + c.Logf("------- trial %d: %#v", idx, trial) ctime := time.Now().Add(-trial.age) - exr := stubExecutor{ - "bootprobe": trial.respBoot, - "crunch-run --list": trial.respRun, + exr := &stubExecutor{ + response: map[string]stubResp{ + "bootprobe": trial.respBoot, + "crunch-run --list": trial.respRun, + "{deploy}": trial.respDeploy, + }, } wp := &Pool{ arvClient: ac, newExecutor: func(cloud.Instance) Executor { return exr }, + cluster: suite.testCluster, bootProbeCommand: "bootprobe", timeoutBooting: bootTimeout, timeoutProbe: probeTimeout, exited: map[string]time.Time{}, + runnerCmdDefault: "crunch-run", + runnerArgs: []string{"--args=not used with --list"}, + runnerCmd: "crunch-run", + runnerData: trial.deployRunner, + runnerMD5: md5.Sum(trial.deployRunner), + } + wp.registerMetrics(prometheus.NewRegistry()) + if trial.deployRunner != nil { + svHash := md5.Sum(trial.deployRunner) + wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash) + exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed } wkr := &worker{ - logger: logger, + logger: suite.logger, executor: exr, wp: wp, mtx: &wp.mtx, @@ -226,6 +320,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { wkr.probeAndUpdate() c.Check(wkr.state, check.Equals, trial.expectState) c.Check(len(wkr.running), check.Equals, trial.expectRunning) + c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin)) } } @@ -234,14 +329,27 @@ type stubResp struct { stderr string err error } -type stubExecutor map[string]stubResp -func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {} -func (se stubExecutor) Close() {} -func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { - resp, ok := se[cmd] +type stubExecutor struct { + response map[string]stubResp + stdin bytes.Buffer +} + +func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {} +func (se *stubExecutor) Close() {} +func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { + if stdin != nil { + _, err = io.Copy(&se.stdin, stdin) + if err != nil { + return nil, []byte(err.Error()), err + } + } + resp, ok := se.response[cmd] + if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) { + resp, ok = se.response["{deploy}"] + } if !ok { - return nil, []byte("command not found\n"), errors.New("command not found") + return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found") } return []byte(resp.stdout), []byte(resp.stderr), resp.err }