From da3cd46f9204f522b73023b42e8a4d04f697e9c7 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 30 Dec 2019 10:10:30 -0500 Subject: [PATCH] 15759: Deploy supervisor binary from dispatcher to worker VMs. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/cmd/cmd.go | 5 ++ lib/config/config.default.yml | 10 ++++ lib/config/generated_config.go | 10 ++++ lib/dispatchcloud/dispatcher.go | 3 +- lib/dispatchcloud/worker/pool.go | 48 +++++++++++++++ lib/dispatchcloud/worker/pool_test.go | 10 ++-- lib/dispatchcloud/worker/runner.go | 6 +- lib/dispatchcloud/worker/worker.go | 36 ++++++++++- lib/dispatchcloud/worker/worker_test.go | 80 +++++++++++++++++++++---- 9 files changed, 189 insertions(+), 19 deletions(-) diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go index 51bcf55c76..611c95d234 100644 --- a/lib/cmd/cmd.go +++ b/lib/cmd/cmd.go @@ -65,6 +65,11 @@ type Multi map[string]Handler func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { _, basename := filepath.Split(prog) + if i := strings.Index(basename, "~"); i >= 0 { + // drop "~anything" suffix (arvados-dispatch-cloud's + // DeployRunnerBinary feature relies on this) + basename = basename[:i] + } cmd, ok := m[basename] if !ok { // "controller" command exists, and binary is named "arvados-controller" diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 7779d7ebf2..ccc6343e6b 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -788,6 +788,16 @@ Clusters: # Worker VM image ID. ImageID: "" + # An executable file (located on the dispatcher host) to be + # copied to cloud instances at runtime and used as the + # container runner/supervisor. The default value is the + # dispatcher program itself. + # + # Use the empty string to disable this step: nothing will be + # copied, and cloud instances are assumed to have a suitable + # version of crunch-run installed. + DeployRunnerBinary: "/proc/self/exe" + # Tags to add on all resources (VMs, NICs, disks) created by # the container dispatcher. (Arvados's own tags -- # InstanceType, IdleBehavior, and InstanceSecret -- will also diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index 3e58d249bf..3d4c18e283 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -794,6 +794,16 @@ Clusters: # Worker VM image ID. ImageID: "" + # An executable file (located on the dispatcher host) to be + # copied to cloud instances at runtime and used as the + # container runner/supervisor. The default value is the + # dispatcher program itself. + # + # Use the empty string to disable this step: nothing will be + # copied, and cloud instances are assumed to have a suitable + # version of crunch-run installed. + DeployRunnerBinary: "/proc/self/exe" + # Tags to add on all resources (VMs, NICs, disks) created by # the container dispatcher. (Arvados's own tags -- # InstanceType, IdleBehavior, and InstanceSecret -- will also diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index ce7dc07301..19c8f6e0b4 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -37,6 +37,7 @@ const ( type pool interface { scheduler.WorkerPool + CheckHealth() error Instances() []worker.InstanceView SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error KillInstance(id cloud.InstanceID, reason string) error @@ -78,7 +79,7 @@ func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { // CheckHealth implements service.Handler. func (disp *dispatcher) CheckHealth() error { disp.Start() - return nil + return disp.pool.CheckHealth() } // Stop dispatching containers and release resources. Typically used diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 649910f63c..6bc269c7e9 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,10 +5,12 @@ package worker import ( + "crypto/md5" "crypto/rand" "errors" "fmt" "io" + "io/ioutil" "sort" "strings" "sync" @@ -135,6 +137,7 @@ type Pool struct { instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string + runnerSource string imageID cloud.ImageID instanceTypes map[string]arvados.InstanceType syncInterval time.Duration @@ -160,6 +163,9 @@ type Pool struct { stop chan bool mtx sync.RWMutex setupOnce sync.Once + runnerData []byte + runnerMD5 [md5.Size]byte + runnerCmd string throttleCreate throttle throttleInstances throttle @@ -177,6 +183,14 @@ type createCall struct { instanceType arvados.InstanceType } +func (wp *Pool) CheckHealth() error { + wp.setupOnce.Do(wp.setup) + if err := wp.loadRunnerData(); err != nil { + return fmt.Errorf("error loading runner binary: %s", err) + } + return nil +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -276,6 +290,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) + if wp.loadRunnerData() != nil { + // Boot probe is certain to fail. + return false + } wp.mtx.Lock() defer wp.mtx.Unlock() if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { @@ -743,6 +761,36 @@ func (wp *Pool) setup() { wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} + wp.loadRunnerData() +} + +// Load the runner program to be deployed on worker nodes into +// wp.runnerData, if necessary. Errors are logged. +// +// If auto-deploy is disabled, len(wp.runnerData) will be 0. +// +// Caller must not have lock. +func (wp *Pool) loadRunnerData() error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if wp.runnerData != nil { + return nil + } else if wp.runnerSource == "" { + wp.runnerCmd = "crunch-run" + wp.runnerData = []byte{} + return nil + } + logger := wp.logger.WithField("source", wp.runnerSource) + logger.Debug("loading runner") + buf, err := ioutil.ReadFile(wp.runnerSource) + if err != nil { + logger.WithError(err).Error("failed to load runner program") + return err + } + wp.runnerData = buf + wp.runnerMD5 = md5.Sum(buf) + wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5) + return nil } func (wp *Pool) notify() { diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 8b2415b402..1948c1e874 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -70,9 +70,11 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { c.Assert(err, check.IsNil) newExecutor := func(cloud.Instance) Executor { - return stubExecutor{ - "crunch-run --list": stubResp{}, - "true": stubResp{}, + return &stubExecutor{ + response: map[string]stubResp{ + "crunch-run --list": stubResp{}, + "true": stubResp{}, + }, } } @@ -155,7 +157,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ logger: logger, - newExecutor: func(cloud.Instance) Executor { return stubExecutor{} }, + newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go index e819a6036b..4752121342 100644 --- a/lib/dispatchcloud/worker/runner.go +++ b/lib/dispatchcloud/worker/runner.go @@ -20,6 +20,7 @@ type remoteRunner struct { uuid string executor Executor envJSON json.RawMessage + runnerCmd string remoteUser string timeoutTERM time.Duration timeoutSignal time.Duration @@ -59,6 +60,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { uuid: uuid, executor: wkr.executor, envJSON: envJSON, + runnerCmd: wkr.wp.runnerCmd, remoteUser: wkr.instance.RemoteUser(), timeoutTERM: wkr.wp.timeoutTERM, timeoutSignal: wkr.wp.timeoutSignal, @@ -76,7 +78,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { // assume the remote process _might_ have started, at least until it // probes the worker and finds otherwise. func (rr *remoteRunner) Start() { - cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'" + cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'" if rr.remoteUser != "root" { cmd = "sudo " + cmd } @@ -136,7 +138,7 @@ func (rr *remoteRunner) Kill(reason string) { func (rr *remoteRunner) kill(sig syscall.Signal) { logger := rr.logger.WithField("Signal", int(sig)) logger.Info("sending signal") - cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid) + cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid) if rr.remoteUser != "root" { cmd = "sudo " + cmd } diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index cda5a14b1a..a455a7d06d 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,7 +5,9 @@ package worker import ( + "bytes" "fmt" + "path/filepath" "strings" "sync" "time" @@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() { } func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { - cmd := "crunch-run --list" + cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } @@ -358,9 +360,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + if err = wkr.wp.loadRunnerData(); err != nil { + wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") + return false, stderr + } else if len(wkr.wp.runnerData) == 0 { + // Assume crunch-run is already installed + } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { + wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") + return false, stderr2 + } else { + wkr.logger.Info("runner binary OK") + stderr = append(stderr, stderr2...) + } return true, stderr } +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { + hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) + dstdir, _ := filepath.Split(wkr.wp.runnerCmd) + + stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) + if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { + return + } + + // Note touch+chmod come before writing data, to avoid the + // possibility of md5 being correct while file mode is + // incorrect. + cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"` + if wkr.instance.RemoteUser() != "root" { + cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` + } + stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) + return +} + // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go index 4f8f77bff6..a4c2a6370f 100644 --- a/lib/dispatchcloud/worker/worker_test.go +++ b/lib/dispatchcloud/worker/worker_test.go @@ -5,8 +5,12 @@ package worker import ( + "bytes" + "crypto/md5" "errors" + "fmt" "io" + "strings" "time" "git.arvados.org/arvados.git/lib/cloud" @@ -38,7 +42,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { running int starting int respBoot stubResp // zero value is success + respDeploy stubResp // zero value is success respRun stubResp // zero value is success + nothing running + respRunDeployed stubResp + deployRunner []byte + expectStdin []byte expectState State expectRunning int } @@ -46,7 +54,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { errFail := errors.New("failed") respFail := stubResp{"", "command failed\n", errFail} respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil} - for _, trial := range []trialT{ + for idx, trial := range []trialT{ { testCaseComment: "Unknown, probes fail", state: StateUnknown, @@ -185,12 +193,40 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { starting: 1, expectState: StateRunning, }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds", + state: StateBooting, + deployRunner: []byte("ELF"), + expectStdin: []byte("ELF"), + respRun: respFail, + respRunDeployed: respContainerRunning, + expectRunning: 1, + expectState: StateRunning, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner fails", + state: StateBooting, + deployRunner: []byte("ELF"), + respDeploy: respFail, + expectStdin: []byte("ELF"), + expectState: StateBooting, + }, + { + testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds", + state: StateBooting, + deployRunner: nil, + respDeploy: respFail, + expectState: StateIdle, + }, } { - c.Logf("------- %#v", trial) + c.Logf("------- trial %d: %#v", idx, trial) ctime := time.Now().Add(-trial.age) - exr := stubExecutor{ - "bootprobe": trial.respBoot, - "crunch-run --list": trial.respRun, + exr := &stubExecutor{ + response: map[string]stubResp{ + "bootprobe": trial.respBoot, + "crunch-run --list": trial.respRun, + "{deploy}": trial.respDeploy, + }, } wp := &Pool{ arvClient: ac, @@ -199,6 +235,14 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { timeoutBooting: bootTimeout, timeoutProbe: probeTimeout, exited: map[string]time.Time{}, + runnerCmd: "crunch-run", + runnerData: trial.deployRunner, + runnerMD5: md5.Sum(trial.deployRunner), + } + if trial.deployRunner != nil { + svHash := md5.Sum(trial.deployRunner) + wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash) + exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed } wkr := &worker{ logger: logger, @@ -226,6 +270,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { wkr.probeAndUpdate() c.Check(wkr.state, check.Equals, trial.expectState) c.Check(len(wkr.running), check.Equals, trial.expectRunning) + c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin)) } } @@ -234,14 +279,27 @@ type stubResp struct { stderr string err error } -type stubExecutor map[string]stubResp -func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {} -func (se stubExecutor) Close() {} -func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { - resp, ok := se[cmd] +type stubExecutor struct { + response map[string]stubResp + stdin bytes.Buffer +} + +func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {} +func (se *stubExecutor) Close() {} +func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) { + if stdin != nil { + _, err = io.Copy(&se.stdin, stdin) + if err != nil { + return nil, []byte(err.Error()), err + } + } + resp, ok := se.response[cmd] + if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) { + resp, ok = se.response["{deploy}"] + } if !ok { - return nil, []byte("command not found\n"), errors.New("command not found") + return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found") } return []byte(resp.stdout), []byte(resp.stderr), resp.err } -- 2.30.2