X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a7a482db3954fa6470be74f0e00f6e1e105e0b6c..6036c55e1239281746152e85dfabbc9ed3cb6864:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index cda5a14b1a..6878bb0655 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,7 +5,9 @@ package worker import ( + "bytes" "fmt" + "path/filepath" "strings" "sync" "time" @@ -41,6 +43,33 @@ var stateString = map[State]string{ StateShutdown: "shutdown", } +// BootOutcome is the result of a worker boot. It is used as a label in a metric. +type BootOutcome string + +const ( + BootOutcomeFailed BootOutcome = "failure" + BootOutcomeSucceeded BootOutcome = "success" + BootOutcomeIdleShutdown BootOutcome = "idle shutdown" + BootOutcomeDisappeared BootOutcome = "disappeared" +) + +var validBootOutcomes = map[BootOutcome]bool{ + BootOutcomeFailed: true, + BootOutcomeSucceeded: true, + BootOutcomeIdleShutdown: true, + BootOutcomeDisappeared: true, +} + +func (wkr *worker) reportBootOutcome(outcome BootOutcome) { + if wkr.bootOutcomeReported { + return + } + if wkr.wp.mBootOutcomes != nil { + wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() + } + wkr.bootOutcomeReported = true +} + // String implements fmt.Stringer. func (s State) String() string { return stateString[s] @@ -72,22 +101,23 @@ type worker struct { executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - idleBehavior IdleBehavior - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - appeared time.Time - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]*remoteRunner // remember to update state idle<->running when this changes - starting map[string]*remoteRunner // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]*remoteRunner // remember to update state idle<->running when this changes + starting map[string]*remoteRunner // remember to update state idle<->running when this changes + probing chan struct{} + bootOutcomeReported bool } func (wkr *worker) onUnkillable(uuid string) { @@ -222,6 +252,7 @@ func (wkr *worker) probeAndUpdate() { defer wkr.mtx.Unlock() if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { logger.Info("probe reported broken instance") + wkr.reportBootOutcome(BootOutcomeFailed) wkr.setIdleBehavior(IdleBehaviorDrain) } if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { @@ -245,6 +276,7 @@ func (wkr *worker) probeAndUpdate() { // some evidence about why the node never // booted, even in non-debug mode. if !booted { + wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), @@ -309,6 +341,7 @@ func (wkr *worker) probeAndUpdate() { } wkr.updated = updateTime if booted && (initialState == StateUnknown || initialState == StateBooting) { + wkr.reportBootOutcome(BootOutcomeSucceeded) logger.WithFields(logrus.Fields{ "RunningContainers": len(wkr.running), "State": wkr.state, @@ -318,7 +351,7 @@ func (wkr *worker) probeAndUpdate() { } func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { - cmd := "crunch-run --list" + cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } @@ -358,9 +391,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + if err = wkr.wp.loadRunnerData(); err != nil { + wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") + return false, stderr + } else if len(wkr.wp.runnerData) == 0 { + // Assume crunch-run is already installed + } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { + wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") + return false, stderr2 + } else { + stderr = append(stderr, stderr2...) + } return true, stderr } +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { + hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) + dstdir, _ := filepath.Split(wkr.wp.runnerCmd) + logger := wkr.logger.WithFields(logrus.Fields{ + "hash": hash, + "path": wkr.wp.runnerCmd, + }) + + stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) + if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { + logger.Info("runner binary already exists on worker, with correct hash") + return + } + + // Note touch+chmod come before writing data, to avoid the + // possibility of md5 being correct while file mode is + // incorrect. + cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"` + if wkr.instance.RemoteUser() != "root" { + cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` + } + logger.WithField("cmd", cmd).Info("installing runner binary on worker") + stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) + return +} + // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { @@ -429,6 +499,7 @@ func (wkr *worker) shutdownIfIdle() bool { "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, }).Info("shutdown worker") + wkr.reportBootOutcome(BootOutcomeIdleShutdown) wkr.shutdown() return true }