package worker
import (
+ "bytes"
+ "crypto/md5"
"errors"
+ "fmt"
"io"
+ "strings"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
type WorkerSuite struct{}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
- is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
+ ac := arvados.NewClientFromEnv()
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
c.Assert(err, check.IsNil)
- inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+ inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
type trialT struct {
running int
starting int
respBoot stubResp // zero value is success
+ respDeploy stubResp // zero value is success
respRun stubResp // zero value is success + nothing running
+ respRunDeployed stubResp
+ deployRunner []byte
+ expectStdin []byte
expectState State
expectRunning int
}
errFail := errors.New("failed")
respFail := stubResp{"", "command failed\n", errFail}
respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
- for _, trial := range []trialT{
+ for idx, trial := range []trialT{
{
testCaseComment: "Unknown, probes fail",
state: StateUnknown,
starting: 1,
expectState: StateRunning,
},
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ expectStdin: []byte("ELF"),
+ respRun: respFail,
+ respRunDeployed: respContainerRunning,
+ expectRunning: 1,
+ expectState: StateRunning,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ respDeploy: respFail,
+ expectStdin: []byte("ELF"),
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+ state: StateBooting,
+ deployRunner: nil,
+ respDeploy: respFail,
+ expectState: StateIdle,
+ },
} {
- c.Logf("------- %#v", trial)
+ c.Logf("------- trial %d: %#v", idx, trial)
ctime := time.Now().Add(-trial.age)
- exr := stubExecutor{
- "bootprobe": trial.respBoot,
- "crunch-run --list": trial.respRun,
+ exr := &stubExecutor{
+ response: map[string]stubResp{
+ "bootprobe": trial.respBoot,
+ "crunch-run --list": trial.respRun,
+ "{deploy}": trial.respDeploy,
+ },
}
wp := &Pool{
+ arvClient: ac,
newExecutor: func(cloud.Instance) Executor { return exr },
bootProbeCommand: "bootprobe",
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exited: map[string]time.Time{},
+ runnerCmd: "crunch-run",
+ runnerData: trial.deployRunner,
+ runnerMD5: md5.Sum(trial.deployRunner),
+ }
+ if trial.deployRunner != nil {
+ svHash := md5.Sum(trial.deployRunner)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+ exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
logger: logger,
busy: ctime,
probed: ctime,
updated: ctime,
+ running: map[string]*remoteRunner{},
+ starting: map[string]*remoteRunner{},
+ probing: make(chan struct{}, 1),
}
if trial.running > 0 {
- wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-abcdefghijklmno"
+ wkr.running = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
if trial.starting > 0 {
- wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ uuid := "zzzzz-dz642-bcdefghijklmnop"
+ wkr.starting = map[string]*remoteRunner{uuid: newRemoteRunner(uuid, wkr)}
}
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+ c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
}
}
stderr string
err error
}
-type stubExecutor map[string]stubResp
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close() {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
- resp, ok := se[cmd]
+type stubExecutor struct {
+ response map[string]stubResp
+ stdin bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close() {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+ if stdin != nil {
+ _, err = io.Copy(&se.stdin, stdin)
+ if err != nil {
+ return nil, []byte(err.Error()), err
+ }
+ }
+ resp, ok := se.response[cmd]
+ if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+ resp, ok = se.response["{deploy}"]
+ }
if !ok {
- return nil, []byte("command not found\n"), errors.New("command not found")
+ return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
}
return []byte(resp.stdout), []byte(resp.stderr), resp.err
}