X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ca2d946973b6ae25dd594ddecec54e02b83bc44e..f30c8ed35e3e1ad7cb3cb51fc6d83f56a04ae8de:/lib/dispatchcloud/worker/worker_test.go diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go index 3bc33b62c9..cfb7a1bfb7 100644 --- a/lib/dispatchcloud/worker/worker_test.go +++ b/lib/dispatchcloud/worker/worker_test.go @@ -5,14 +5,19 @@ package worker import ( + "bytes" + "crypto/md5" "errors" + "fmt" "io" + "strings" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/lib/dispatchcloud/test" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/test" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" check "gopkg.in/check.v1" ) @@ -25,7 +30,8 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { bootTimeout := time.Minute probeTimeout := time.Second - is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger) + ac := arvados.NewClientFromEnv() + is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger) c.Assert(err, check.IsNil) inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil) c.Assert(err, check.IsNil) @@ -37,7 +43,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { running int starting int respBoot stubResp // zero value is success + respDeploy stubResp // zero value is success respRun stubResp // zero value is success + nothing running + respRunDeployed stubResp + deployRunner []byte + expectStdin []byte expectState State expectRunning int } @@ -45,7 +55,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { errFail := errors.New("failed") respFail := stubResp{"", "command failed\n", errFail} respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil} - for _, trial := range []trialT{ + for idx, trial := range []trialT{ { testCaseComment: "Unknown, probes fail", state: StateUnknown, @@ -184,19 +194,57 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { starting: 1, expectState: StateRunning, }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds", + state: StateBooting, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + respRun: respFail, + respRunDeployed: respContainerRunning, + expectRunning: 1, + expectState: StateRunning, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner fails", + state: StateBooting, + deployRunner: []byte("ELF"), + respDeploy: respFail, + expectStdin: []byte("ELF"), + expectState: StateBooting, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds", + state: StateBooting, + deployRunner: nil, + respDeploy: respFail, + expectState: StateIdle, + }, } { - c.Logf("------- %#v", trial) + c.Logf("------- trial %d: %#v", idx, trial) ctime := time.Now().Add(-trial.age) - exr := stubExecutor{ - "bootprobe": trial.respBoot, - "crunch-run --list": trial.respRun, + exr := &stubExecutor{ + response: map[string]stubResp{ + "bootprobe": trial.respBoot, + "crunch-run --list": trial.respRun, + "{deploy}": trial.respDeploy, + }, } wp := &Pool{ + arvClient: ac, newExecutor: func(cloud.Instance) Executor { return exr }, bootProbeCommand: "bootprobe", timeoutBooting: bootTimeout, timeoutProbe: probeTimeout, exited: map[string]time.Time{}, + runnerCmd: "crunch-run", + runnerData: trial.deployRunner, + runnerMD5: md5.Sum(trial.deployRunner), + } + wp.registerMetrics(prometheus.NewRegistry()) + if trial.deployRunner != nil { + svHash := md5.Sum(trial.deployRunner) + wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash) + exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed } wkr := &worker{ logger: logger, @@ -209,16 +257,22 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { busy: ctime, probed: ctime, updated: ctime, + running: map[string]*remoteRunner{}, + starting: map[string]*remoteRunner{}, + probing: make(chan struct{}, 1), } if trial.running > 0 { - wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}} + uuid := "zzzzz-dz642-abcdefghijklmno" + wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)} } if trial.starting > 0 { - wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}} + uuid := "zzzzz-dz642-bcdefghijklmnop" + wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)} } wkr.probeAndUpdate() c.Check(wkr.state, check.Equals, trial.expectState) c.Check(len(wkr.running), check.Equals, trial.expectRunning) + c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin)) } } @@ -227,14 +281,27 @@ type stubResp struct { stderr string err error } -type stubExecutor map[string]stubResp -func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {} -func (se stubExecutor) Close() {} -func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { - resp, ok := se[cmd] +type stubExecutor struct { + response map[string]stubResp + stdin bytes.Buffer +} + +func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {} +func (se *stubExecutor) Close() {} +func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { + if stdin != nil { + _, err = io.Copy(&se.stdin, stdin) + if err != nil { + return nil, []byte(err.Error()), err + } + } + resp, ok := se.response[cmd] + if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) { + resp, ok = se.response["{deploy}"] + } if !ok { - return nil, []byte("command not found\n"), errors.New("command not found") + return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found") } return []byte(resp.stdout), []byte(resp.stderr), resp.err }