X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9a7e2a24f5f3d261e554ac3815b7e2a4c2e24503..1e3eacb0ca6f2228f50f13514c7577a149a707e6:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 49c5057b38..357ac20a03 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,14 +5,16 @@ package worker import ( + "bytes" "fmt" + "path/filepath" "strings" "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" ) @@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() { } func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { - cmd := "crunch-run --list" + cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } @@ -358,9 +360,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + if err = wkr.wp.loadRunnerData(); err != nil { + wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") + return false, stderr + } else if len(wkr.wp.runnerData) == 0 { + // Assume crunch-run is already installed + } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { + wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") + return false, stderr2 + } else { + stderr = append(stderr, stderr2...) + } return true, stderr } +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { + hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) + dstdir, _ := filepath.Split(wkr.wp.runnerCmd) + logger := wkr.logger.WithFields(logrus.Fields{ + "hash": hash, + "path": wkr.wp.runnerCmd, + }) + + stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) + if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { + logger.Info("runner binary already exists on worker, with correct hash") + return + } + + // Note touch+chmod come before writing data, to avoid the + // possibility of md5 being correct while file mode is + // incorrect. + cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"` + if wkr.instance.RemoteUser() != "root" { + cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` + } + logger.WithField("cmd", cmd).Info("installing runner binary on worker") + stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) + return +} + // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { @@ -455,8 +494,8 @@ func (wkr *worker) saveTags() { instance := wkr.instance tags := instance.Tags() update := cloud.InstanceTags{ - tagKeyInstanceType: wkr.instType.Name, - tagKeyIdleBehavior: string(wkr.idleBehavior), + wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, + wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), } save := false for k, v := range update {