15759: Deploy supervisor binary from dispatcher to worker VMs.
authorTom Clegg <tom@tomclegg.ca>
Mon, 30 Dec 2019 15:10:30 +0000 (10:10 -0500)
committerTom Clegg <tom@tomclegg.ca>
Mon, 30 Dec 2019 15:10:30 +0000 (10:10 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

lib/cmd/cmd.go
lib/config/config.default.yml
lib/config/generated_config.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/runner.go
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go

index 51bcf55c76040e1f8c3a1320fcf7ab50db2090c6..611c95d2340a3b2da47b8a7cbcfff2a3aad9af8c 100644 (file)
@@ -65,6 +65,11 @@ type Multi map[string]Handler
 
 func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        _, basename := filepath.Split(prog)
+       if i := strings.Index(basename, "~"); i >= 0 {
+               // drop "~anything" suffix (arvados-dispatch-cloud's
+               // DeployRunnerBinary feature relies on this)
+               basename = basename[:i]
+       }
        cmd, ok := m[basename]
        if !ok {
                // "controller" command exists, and binary is named "arvados-controller"
index 7779d7ebf2a26b11a0ca4225a6cdfe865abd8a3d..ccc6343e6b35963ea84a3d590dfdae87057b91a1 100644 (file)
@@ -788,6 +788,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
index 3e58d249bf4dee7526ea1bbab74b259b85d2d4ab..3d4c18e283739130744b64a69fff8aad94abd617 100644 (file)
@@ -794,6 +794,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
index ce7dc07301daf99f27a2a422ea1641c4e4d92902..19c8f6e0b489309fdb5ac363c96fd1b5beeaa24d 100644 (file)
@@ -37,6 +37,7 @@ const (
 
 type pool interface {
        scheduler.WorkerPool
+       CheckHealth() error
        Instances() []worker.InstanceView
        SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
        KillInstance(id cloud.InstanceID, reason string) error
@@ -78,7 +79,7 @@ func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // CheckHealth implements service.Handler.
 func (disp *dispatcher) CheckHealth() error {
        disp.Start()
-       return nil
+       return disp.pool.CheckHealth()
 }
 
 // Stop dispatching containers and release resources. Typically used
index 649910f63ca9c76c194b0e9155e36e516cfeb584..6bc269c7e9dd2fabb77bd1cdeee21ed4c07d61c0 100644 (file)
@@ -5,10 +5,12 @@
 package worker
 
 import (
+       "crypto/md5"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "sort"
        "strings"
        "sync"
@@ -135,6 +137,7 @@ type Pool struct {
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
+       runnerSource       string
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
@@ -160,6 +163,9 @@ type Pool struct {
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
 
        throttleCreate    throttle
        throttleInstances throttle
@@ -177,6 +183,14 @@ type createCall struct {
        instanceType arvados.InstanceType
 }
 
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -276,6 +290,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -743,6 +761,36 @@ func (wp *Pool) setup() {
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = "crunch-run"
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
index 8b2415b402f5885decbc5f9f0e389bd4a389d369..1948c1e874859f2d8355115b3671f2c5ef0ae32d 100644 (file)
@@ -70,9 +70,11 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
-               return stubExecutor{
-                       "crunch-run --list": stubResp{},
-                       "true":              stubResp{},
+               return &stubExecutor{
+                       response: map[string]stubResp{
+                               "crunch-run --list": stubResp{},
+                               "true":              stubResp{},
+                       },
                }
        }
 
@@ -155,7 +157,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
                logger:      logger,
-               newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
index e819a6036b341d5d2bbe28a242292296b5b36cd2..47521213427610a531fa269df32b046b17ba72aa 100644 (file)
@@ -20,6 +20,7 @@ type remoteRunner struct {
        uuid          string
        executor      Executor
        envJSON       json.RawMessage
+       runnerCmd     string
        remoteUser    string
        timeoutTERM   time.Duration
        timeoutSignal time.Duration
@@ -59,6 +60,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
                uuid:          uuid,
                executor:      wkr.executor,
                envJSON:       envJSON,
+               runnerCmd:     wkr.wp.runnerCmd,
                remoteUser:    wkr.instance.RemoteUser(),
                timeoutTERM:   wkr.wp.timeoutTERM,
                timeoutSignal: wkr.wp.timeoutSignal,
@@ -76,7 +78,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
@@ -136,7 +138,7 @@ func (rr *remoteRunner) Kill(reason string) {
 func (rr *remoteRunner) kill(sig syscall.Signal) {
        logger := rr.logger.WithField("Signal", int(sig))
        logger.Info("sending signal")
-       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
index cda5a14b1aae19a7cbbbab377f074543f2a87ac9..a455a7d06d88770080fa42e62a19cb878e5050d6 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
+       "bytes"
        "fmt"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() {
 }
 
 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
-       cmd := "crunch-run --list"
+       cmd := wkr.wp.runnerCmd + " --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
@@ -358,9 +360,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                return false, stderr
        }
        logger.Info("boot probe succeeded")
+       if err = wkr.wp.loadRunnerData(); err != nil {
+               wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+               return false, stderr
+       } else if len(wkr.wp.runnerData) == 0 {
+               // Assume crunch-run is already installed
+       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+               return false, stderr2
+       } else {
+               wkr.logger.Info("runner binary OK")
+               stderr = append(stderr, stderr2...)
+       }
        return true, stderr
 }
 
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+       hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+       dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+       stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+       if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+               return
+       }
+
+       // Note touch+chmod come before writing data, to avoid the
+       // possibility of md5 being correct while file mode is
+       // incorrect.
+       cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+       if wkr.instance.RemoteUser() != "root" {
+               cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+       }
+       stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+       return
+}
+
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
index 4f8f77bff6de95f820ca38d81a0151f31a3bf86d..a4c2a6370f3d5ce3803484b18ac811abec7e6bc1 100644 (file)
@@ -5,8 +5,12 @@
 package worker
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
+       "fmt"
        "io"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
@@ -38,7 +42,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                running         int
                starting        int
                respBoot        stubResp // zero value is success
+               respDeploy      stubResp // zero value is success
                respRun         stubResp // zero value is success + nothing running
+               respRunDeployed stubResp
+               deployRunner    []byte
+               expectStdin     []byte
                expectState     State
                expectRunning   int
        }
@@ -46,7 +54,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
        errFail := errors.New("failed")
        respFail := stubResp{"", "command failed\n", errFail}
        respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
-       for _, trial := range []trialT{
+       for idx, trial := range []trialT{
                {
                        testCaseComment: "Unknown, probes fail",
                        state:           StateUnknown,
@@ -185,12 +193,40 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        starting:        1,
                        expectState:     StateRunning,
                },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       respRun:         respFail,
+                       respRunDeployed: respContainerRunning,
+                       expectRunning:   1,
+                       expectState:     StateRunning,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       respDeploy:      respFail,
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    nil,
+                       respDeploy:      respFail,
+                       expectState:     StateIdle,
+               },
        } {
-               c.Logf("------- %#v", trial)
+               c.Logf("------- trial %d: %#v", idx, trial)
                ctime := time.Now().Add(-trial.age)
-               exr := stubExecutor{
-                       "bootprobe":         trial.respBoot,
-                       "crunch-run --list": trial.respRun,
+               exr := &stubExecutor{
+                       response: map[string]stubResp{
+                               "bootprobe":         trial.respBoot,
+                               "crunch-run --list": trial.respRun,
+                               "{deploy}":          trial.respDeploy,
+                       },
                }
                wp := &Pool{
                        arvClient:        ac,
@@ -199,6 +235,14 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        timeoutBooting:   bootTimeout,
                        timeoutProbe:     probeTimeout,
                        exited:           map[string]time.Time{},
+                       runnerCmd:        "crunch-run",
+                       runnerData:       trial.deployRunner,
+                       runnerMD5:        md5.Sum(trial.deployRunner),
+               }
+               if trial.deployRunner != nil {
+                       svHash := md5.Sum(trial.deployRunner)
+                       wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+                       exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
                }
                wkr := &worker{
                        logger:   logger,
@@ -226,6 +270,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
                c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+               c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
        }
 }
 
@@ -234,14 +279,27 @@ type stubResp struct {
        stderr string
        err    error
 }
-type stubExecutor map[string]stubResp
 
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close()                         {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
-       resp, ok := se[cmd]
+type stubExecutor struct {
+       response map[string]stubResp
+       stdin    bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close()                         {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+       if stdin != nil {
+               _, err = io.Copy(&se.stdin, stdin)
+               if err != nil {
+                       return nil, []byte(err.Error()), err
+               }
+       }
+       resp, ok := se.response[cmd]
+       if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+               resp, ok = se.response["{deploy}"]
+       }
        if !ok {
-               return nil, []byte("command not found\n"), errors.New("command not found")
+               return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
        }
        return []byte(resp.stdout), []byte(resp.stderr), resp.err
 }