func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
_, basename := filepath.Split(prog)
+ if i := strings.Index(basename, "~"); i >= 0 {
+ // drop "~anything" suffix (arvados-dispatch-cloud's
+ // DeployRunnerBinary feature relies on this)
+ basename = basename[:i]
+ }
cmd, ok := m[basename]
if !ok {
// "controller" command exists, and binary is named "arvados-controller"
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
# Worker VM image ID.
ImageID: ""
+ # An executable file (located on the dispatcher host) to be
+ # copied to cloud instances at runtime and used as the
+ # container runner/supervisor. The default value is the
+ # dispatcher program itself.
+ #
+ # Use the empty string to disable this step: nothing will be
+ # copied, and cloud instances are assumed to have a suitable
+ # version of crunch-run installed.
+ DeployRunnerBinary: "/proc/self/exe"
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
type pool interface {
scheduler.WorkerPool
+ CheckHealth() error
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
KillInstance(id cloud.InstanceID, reason string) error
// CheckHealth implements service.Handler.
func (disp *dispatcher) CheckHealth() error {
disp.Start()
- return nil
+ return disp.pool.CheckHealth()
}
// Stop dispatching containers and release resources. Typically used
package worker
import (
+ "crypto/md5"
"crypto/rand"
"errors"
"fmt"
"io"
+ "io/ioutil"
"sort"
"strings"
"sync"
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
+ runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
syncInterval time.Duration
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
throttleCreate throttle
throttleInstances throttle
instanceType arvados.InstanceType
}
+func (wp *Pool) CheckHealth() error {
+ wp.setupOnce.Do(wp.setup)
+ if err := wp.loadRunnerData(); err != nil {
+ return fmt.Errorf("error loading runner binary: %s", err)
+ }
+ return nil
+}
+
// Subscribe returns a buffered channel that becomes ready after any
// change to the pool's state that could have scheduling implications:
// a worker's state changes, a new worker appears, the cloud
func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
+ if wp.loadRunnerData() != nil {
+ // Boot probe is certain to fail.
+ return false
+ }
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+ wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if wp.runnerData != nil {
+ return nil
+ } else if wp.runnerSource == "" {
+ wp.runnerCmd = "crunch-run"
+ wp.runnerData = []byte{}
+ return nil
+ }
+ logger := wp.logger.WithField("source", wp.runnerSource)
+ logger.Debug("loading runner")
+ buf, err := ioutil.ReadFile(wp.runnerSource)
+ if err != nil {
+ logger.WithError(err).Error("failed to load runner program")
+ return err
+ }
+ wp.runnerData = buf
+ wp.runnerMD5 = md5.Sum(buf)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+ return nil
}
func (wp *Pool) notify() {
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
- return stubExecutor{
- "crunch-run --list": stubResp{},
- "true": stubResp{},
+ return &stubExecutor{
+ response: map[string]stubResp{
+ "crunch-run --list": stubResp{},
+ "true": stubResp{},
+ },
}
}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
logger: logger,
- newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+ newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
uuid string
executor Executor
envJSON json.RawMessage
+ runnerCmd string
remoteUser string
timeoutTERM time.Duration
timeoutSignal time.Duration
uuid: uuid,
executor: wkr.executor,
envJSON: envJSON,
+ runnerCmd: wkr.wp.runnerCmd,
remoteUser: wkr.instance.RemoteUser(),
timeoutTERM: wkr.wp.timeoutTERM,
timeoutSignal: wkr.wp.timeoutSignal,
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
func (rr *remoteRunner) kill(sig syscall.Signal) {
logger := rr.logger.WithField("Signal", int(sig))
logger.Info("sending signal")
- cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
package worker
import (
+ "bytes"
"fmt"
+ "path/filepath"
"strings"
"sync"
"time"
}
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
- cmd := "crunch-run --list"
+ cmd := wkr.wp.runnerCmd + " --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
return false, stderr
}
logger.Info("boot probe succeeded")
+ if err = wkr.wp.loadRunnerData(); err != nil {
+ wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+ return false, stderr
+ } else if len(wkr.wp.runnerData) == 0 {
+ // Assume crunch-run is already installed
+ } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+ wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+ return false, stderr2
+ } else {
+ wkr.logger.Info("runner binary OK")
+ stderr = append(stderr, stderr2...)
+ }
return true, stderr
}
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+ hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+ dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+ stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+ if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ return
+ }
+
+ // Note touch+chmod come before writing data, to avoid the
+ // possibility of md5 being correct while file mode is
+ // incorrect.
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+ if wkr.instance.RemoteUser() != "root" {
+ cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+ }
+ stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+ return
+}
+
// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
if wkr.idleBehavior == IdleBehaviorHold {
package worker
import (
+ "bytes"
+ "crypto/md5"
"errors"
+ "fmt"
"io"
+ "strings"
"time"
"git.arvados.org/arvados.git/lib/cloud"
running int
starting int
respBoot stubResp // zero value is success
+ respDeploy stubResp // zero value is success
respRun stubResp // zero value is success + nothing running
+ respRunDeployed stubResp
+ deployRunner []byte
+ expectStdin []byte
expectState State
expectRunning int
}
errFail := errors.New("failed")
respFail := stubResp{"", "command failed\n", errFail}
respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
- for _, trial := range []trialT{
+ for idx, trial := range []trialT{
{
testCaseComment: "Unknown, probes fail",
state: StateUnknown,
starting: 1,
expectState: StateRunning,
},
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ expectStdin: []byte("ELF"),
+ respRun: respFail,
+ respRunDeployed: respContainerRunning,
+ expectRunning: 1,
+ expectState: StateRunning,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+ state: StateBooting,
+ deployRunner: []byte("ELF"),
+ respDeploy: respFail,
+ expectStdin: []byte("ELF"),
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+ state: StateBooting,
+ deployRunner: nil,
+ respDeploy: respFail,
+ expectState: StateIdle,
+ },
} {
- c.Logf("------- %#v", trial)
+ c.Logf("------- trial %d: %#v", idx, trial)
ctime := time.Now().Add(-trial.age)
- exr := stubExecutor{
- "bootprobe": trial.respBoot,
- "crunch-run --list": trial.respRun,
+ exr := &stubExecutor{
+ response: map[string]stubResp{
+ "bootprobe": trial.respBoot,
+ "crunch-run --list": trial.respRun,
+ "{deploy}": trial.respDeploy,
+ },
}
wp := &Pool{
arvClient: ac,
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exited: map[string]time.Time{},
+ runnerCmd: "crunch-run",
+ runnerData: trial.deployRunner,
+ runnerMD5: md5.Sum(trial.deployRunner),
+ }
+ if trial.deployRunner != nil {
+ svHash := md5.Sum(trial.deployRunner)
+ wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+ exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
logger: logger,
wkr.probeAndUpdate()
c.Check(wkr.state, check.Equals, trial.expectState)
c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+ c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
}
}
stderr string
err error
}
-type stubExecutor map[string]stubResp
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close() {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
- resp, ok := se[cmd]
+type stubExecutor struct {
+ response map[string]stubResp
+ stdin bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close() {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+ if stdin != nil {
+ _, err = io.Copy(&se.stdin, stdin)
+ if err != nil {
+ return nil, []byte(err.Error()), err
+ }
+ }
+ resp, ok := se.response[cmd]
+ if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+ resp, ok = se.response["{deploy}"]
+ }
if !ok {
- return nil, []byte("command not found\n"), errors.New("command not found")
+ return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
}
return []byte(resp.stdout), []byte(resp.stderr), resp.err
}