X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dbbc5a9078c0fe88160729f3ca8f56673397b106..8288bba27e9beff7273aeb65c5200248e52bab02:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 41117c1d4e..b01a820cd6 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -7,13 +7,14 @@ package worker import ( "bytes" "fmt" + "path/filepath" "strings" "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" ) @@ -53,6 +54,23 @@ func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } +// BootOutcome is the result of a worker boot. It is used as a label in a metric. +type BootOutcome string + +const ( + BootOutcomeFailed BootOutcome = "failure" + BootOutcomeSucceeded BootOutcome = "success" + BootOutcomeAborted BootOutcome = "aborted" + BootOutcomeDisappeared BootOutcome = "disappeared" +) + +var validBootOutcomes = map[BootOutcome]bool{ + BootOutcomeFailed: true, + BootOutcomeSucceeded: true, + BootOutcomeAborted: true, + BootOutcomeDisappeared: true, +} + // IdleBehavior indicates the behavior desired when a node becomes idle. type IdleBehavior string @@ -73,22 +91,26 @@ type worker struct { executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - idleBehavior IdleBehavior - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - appeared time.Time - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]*remoteRunner // remember to update state idle<->running when this changes - starting map[string]*remoteRunner // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + firstSSHConnection time.Time + lastUUID string + running map[string]*remoteRunner // remember to update state idle<->running when this changes + starting map[string]*remoteRunner // remember to update state idle<->running when this changes + probing chan struct{} + bootOutcomeReported bool + timeToReadyReported bool + staleRunLockSince time.Time } func (wkr *worker) onUnkillable(uuid string) { @@ -110,6 +132,28 @@ func (wkr *worker) onKilled(uuid string) { go wkr.wp.notify() } +// caller must have lock. +func (wkr *worker) reportBootOutcome(outcome BootOutcome) { + if wkr.bootOutcomeReported { + return + } + if wkr.wp.mBootOutcomes != nil { + wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() + } + wkr.bootOutcomeReported = true +} + +// caller must have lock. +func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() { + if wkr.timeToReadyReported { + return + } + if wkr.wp.mTimeToSSH != nil { + wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds()) + } + wkr.timeToReadyReported = true +} + // caller must have lock. func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") @@ -133,6 +177,9 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } go func() { rr.Start() + if wkr.wp.mTimeFromQueueToCrunchRun != nil { + wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds()) + } wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() @@ -145,7 +192,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } // ProbeAndUpdate conducts appropriate boot/running probes (if any) -// for the worker's curent state. If a previous probe is still +// for the worker's current state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. @@ -215,11 +262,17 @@ func (wkr *worker) probeAndUpdate() { logger.Info("instance booted; will try probeRunning") } } + reportedBroken := false if booted || wkr.state == StateUnknown { - ctrUUIDs, ok = wkr.probeRunning() + ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { + logger.Info("probe reported broken instance") + wkr.reportBootOutcome(BootOutcomeFailed) + wkr.setIdleBehavior(IdleBehaviorDrain) + } if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was @@ -241,6 +294,7 @@ func (wkr *worker) probeAndUpdate() { // some evidence about why the node never // booted, even in non-debug mode. if !booted { + wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), @@ -259,6 +313,10 @@ func (wkr *worker) probeAndUpdate() { // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. + logger.WithFields(logrus.Fields{ + "updated": updated, + "wkr.updated": wkr.updated, + }).Debug("skipping worker state update due to probe/sync race") return } @@ -276,6 +334,9 @@ func (wkr *worker) probeAndUpdate() { // Update state if this was the first successful boot-probe. if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { + if wkr.state == StateBooting { + wkr.reportTimeBetweenFirstSSHAndReadyForContainer() + } // Note: this will change again below if // len(wkr.starting)+len(wkr.running) > 0. wkr.state = StateIdle @@ -305,6 +366,7 @@ func (wkr *worker) probeAndUpdate() { } wkr.updated = updateTime if booted && (initialState == StateUnknown || initialState == StateBooting) { + wkr.reportBootOutcome(BootOutcomeSucceeded) logger.WithFields(logrus.Fields{ "RunningContainers": len(wkr.running), "State": wkr.state, @@ -313,11 +375,12 @@ func (wkr *worker) probeAndUpdate() { go wkr.wp.notify() } -func (wkr *worker) probeRunning() (running []string, ok bool) { - cmd := "crunch-run --list" +func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { + cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } + before := time.Now() stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ @@ -325,13 +388,60 @@ func (wkr *worker) probeRunning() (running []string, ok bool) { "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") - return nil, false + wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) + return } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).Debug("probe succeeded") + wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) + ok = true + + staleRunLock := false + for _, s := range strings.Split(string(stdout), "\n") { + // Each line of the "crunch-run --list" output is one + // of the following: + // + // * a container UUID, indicating that processes + // related to that container are currently running. + // Optionally followed by " stale", indicating that + // the crunch-run process itself has exited (the + // remaining process is probably arv-mount). + // + // * the string "broken", indicating that the instance + // appears incapable of starting containers. + // + // See ListProcesses() in lib/crunchrun/background.go. + if s == "" { + // empty string following final newline + } else if s == "broken" { + reportsBroken = true + } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) { + // Ignore crunch-run processes that belong to + // a different cluster (e.g., a single host + // running multiple clusters with the loopback + // driver) + continue + } else if toks := strings.Split(s, " "); len(toks) == 1 { + running = append(running, s) + } else if toks[1] == "stale" { + wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock") + staleRunLock = true + } } - return strings.Split(string(stdout), "\n"), true + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + if !staleRunLock { + wkr.staleRunLockSince = time.Time{} + } else if wkr.staleRunLockSince.IsZero() { + wkr.staleRunLockSince = time.Now() + } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock { + wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long") + reportsBroken = true + } + return } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { @@ -350,9 +460,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + if err = wkr.wp.loadRunnerData(); err != nil { + wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") + return false, stderr + } else if len(wkr.wp.runnerData) == 0 { + // Assume crunch-run is already installed + } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { + wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") + return false, stderr2 + } else { + stderr = append(stderr, stderr2...) + } return true, stderr } +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { + hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) + dstdir, _ := filepath.Split(wkr.wp.runnerCmd) + logger := wkr.logger.WithFields(logrus.Fields{ + "hash": hash, + "path": wkr.wp.runnerCmd, + }) + + stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) + if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { + logger.Info("runner binary already exists on worker, with correct hash") + return + } + + // Note touch+chmod come before writing data, to avoid the + // possibility of md5 being correct while file mode is + // incorrect. + cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"` + if wkr.instance.RemoteUser() != "root" { + cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` + } + logger.WithField("cmd", cmd).Info("installing runner binary on worker") + stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) + return +} + // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { @@ -421,6 +568,7 @@ func (wkr *worker) shutdownIfIdle() bool { "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, }).Info("shutdown worker") + wkr.reportBootOutcome(BootOutcomeAborted) wkr.shutdown() return true } @@ -447,8 +595,8 @@ func (wkr *worker) saveTags() { instance := wkr.instance tags := instance.Tags() update := cloud.InstanceTags{ - tagKeyInstanceType: wkr.instType.Name, - tagKeyIdleBehavior: string(wkr.idleBehavior), + wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, + wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), } save := false for k, v := range update {