X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6036c55e1239281746152e85dfabbc9ed3cb6864..HEAD:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 6878bb0655..10a28157e4 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,16 +6,22 @@ package worker import ( "bytes" + "encoding/json" + "errors" "fmt" + "io" + "net" "path/filepath" "strings" "sync" "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( @@ -43,33 +49,6 @@ var stateString = map[State]string{ StateShutdown: "shutdown", } -// BootOutcome is the result of a worker boot. It is used as a label in a metric. -type BootOutcome string - -const ( - BootOutcomeFailed BootOutcome = "failure" - BootOutcomeSucceeded BootOutcome = "success" - BootOutcomeIdleShutdown BootOutcome = "idle shutdown" - BootOutcomeDisappeared BootOutcome = "disappeared" -) - -var validBootOutcomes = map[BootOutcome]bool{ - BootOutcomeFailed: true, - BootOutcomeSucceeded: true, - BootOutcomeIdleShutdown: true, - BootOutcomeDisappeared: true, -} - -func (wkr *worker) reportBootOutcome(outcome BootOutcome) { - if wkr.bootOutcomeReported { - return - } - if wkr.wp.mBootOutcomes != nil { - wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() - } - wkr.bootOutcomeReported = true -} - // String implements fmt.Stringer. func (s State) String() string { return stateString[s] @@ -81,6 +60,23 @@ func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } +// BootOutcome is the result of a worker boot. It is used as a label in a metric. +type BootOutcome string + +const ( + BootOutcomeFailed BootOutcome = "failure" + BootOutcomeSucceeded BootOutcome = "success" + BootOutcomeAborted BootOutcome = "aborted" + BootOutcomeDisappeared BootOutcome = "disappeared" +) + +var validBootOutcomes = map[BootOutcome]bool{ + BootOutcomeFailed: true, + BootOutcomeSucceeded: true, + BootOutcomeAborted: true, + BootOutcomeDisappeared: true, +} + // IdleBehavior indicates the behavior desired when a node becomes idle. type IdleBehavior string @@ -113,11 +109,14 @@ type worker struct { updated time.Time busy time.Time destroyed time.Time + firstSSHConnection time.Time lastUUID string running map[string]*remoteRunner // remember to update state idle<->running when this changes starting map[string]*remoteRunner // remember to update state idle<->running when this changes probing chan struct{} bootOutcomeReported bool + timeToReadyReported bool + staleRunLockSince time.Time } func (wkr *worker) onUnkillable(uuid string) { @@ -139,6 +138,28 @@ func (wkr *worker) onKilled(uuid string) { go wkr.wp.notify() } +// caller must have lock. +func (wkr *worker) reportBootOutcome(outcome BootOutcome) { + if wkr.bootOutcomeReported { + return + } + if wkr.wp.mBootOutcomes != nil { + wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() + } + wkr.bootOutcomeReported = true +} + +// caller must have lock. +func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() { + if wkr.timeToReadyReported { + return + } + if wkr.wp.mTimeToSSH != nil { + wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds()) + } + wkr.timeToReadyReported = true +} + // caller must have lock. func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") @@ -162,8 +183,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } go func() { rr.Start() + if wkr.wp.mTimeFromQueueToCrunchRun != nil { + wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds()) + } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if wkr.starting[ctr.UUID] != rr { + // Someone else (e.g., wkr.probeAndUpdate() -> + // wkr.updateRunning() or wkr.Close()) already + // moved our runner from wkr.starting to + // wkr.running or deleted it while we were in + // rr.Start(). + return + } now := time.Now() wkr.updated = now wkr.busy = now @@ -174,7 +206,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } // ProbeAndUpdate conducts appropriate boot/running probes (if any) -// for the worker's curent state. If a previous probe is still +// for the worker's current state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. @@ -216,6 +248,7 @@ func (wkr *worker) probeAndUpdate() { ctrUUIDs []string ok bool stderr []byte // from probeBooted + errLast error // from probeBooted or copyRunnerData ) switch initialState { @@ -232,20 +265,33 @@ func (wkr *worker) probeAndUpdate() { logger := wkr.logger.WithField("ProbeStart", probeStart) if !booted { - booted, stderr = wkr.probeBooted() + stderr, errLast = wkr.probeBooted() + booted = errLast == nil + shouldCopy := booted || initialState == StateUnknown if !booted { // Pretend this probe succeeded if another // concurrent attempt succeeded. wkr.mtx.Lock() - booted = wkr.state == StateRunning || wkr.state == StateIdle + if wkr.state == StateRunning || wkr.state == StateIdle { + booted = true + shouldCopy = false + } wkr.mtx.Unlock() } + if shouldCopy { + _, stderrCopy, err := wkr.copyRunnerData() + if err != nil { + booted = false + wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary") + errLast = err + } + } if booted { logger.Info("instance booted; will try probeRunning") } } reportedBroken := false - if booted || wkr.state == StateUnknown { + if booted || initialState == StateUnknown { ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() @@ -270,17 +316,17 @@ func (wkr *worker) probeAndUpdate() { dur := probeStart.Sub(wkr.probed) if wkr.shutdownIfBroken(dur) { // stderr from failed run-probes will have - // been logged already, but boot-probe + // been logged already, but some boot-probe // failures are normal so they are logged only - // at Debug level. This is our chance to log - // some evidence about why the node never + // at Debug level. This may be our chance to + // log some evidence about why the node never // booted, even in non-debug mode. if !booted { wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), - }).Info("boot failed") + }).WithError(errLast).Info("boot failed") } } return @@ -295,6 +341,10 @@ func (wkr *worker) probeAndUpdate() { // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. + logger.WithFields(logrus.Fields{ + "updated": updated, + "wkr.updated": wkr.updated, + }).Debug("skipping worker state update due to probe/sync race") return } @@ -312,6 +362,9 @@ func (wkr *worker) probeAndUpdate() { // Update state if this was the first successful boot-probe. if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { + if wkr.state == StateBooting { + wkr.reportTimeBetweenFirstSSHAndReadyForContainer() + } // Note: this will change again below if // len(wkr.starting)+len(wkr.running) > 0. wkr.state = StateIdle @@ -355,27 +408,76 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) + before := time.Now() + var stdin io.Reader + if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 { + j, _ := json.Marshal(prices) + stdin = bytes.NewReader(j) + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") + wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) return } + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).Debug("probe succeeded") + wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) ok = true + + staleRunLock := false for _, s := range strings.Split(string(stdout), "\n") { - if s == "broken" { + // Each line of the "crunch-run --list" output is one + // of the following: + // + // * a container UUID, indicating that processes + // related to that container are currently running. + // Optionally followed by " stale", indicating that + // the crunch-run process itself has exited (the + // remaining process is probably arv-mount). + // + // * the string "broken", indicating that the instance + // appears incapable of starting containers. + // + // See ListProcesses() in lib/crunchrun/background.go. + if s == "" { + // empty string following final newline + } else if s == "broken" { reportsBroken = true - } else if s != "" { + } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) { + // Ignore crunch-run processes that belong to + // a different cluster (e.g., a single host + // running multiple clusters with the loopback + // driver) + continue + } else if toks := strings.Split(s, " "); len(toks) == 1 { running = append(running, s) + } else if toks[1] == "stale" { + wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock") + staleRunLock = true } } + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + if !staleRunLock { + wkr.staleRunLockSince = time.Time{} + } else if wkr.staleRunLockSince.IsZero() { + wkr.staleRunLockSince = time.Now() + } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock { + wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long") + reportsBroken = true + } return } -func (wkr *worker) probeBooted() (ok bool, stderr []byte) { +func (wkr *worker) probeBooted() (stderr []byte, err error) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" @@ -387,25 +489,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { "stderr": string(stderr), }) if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr + if errors.Is(err, sshexecutor.ErrNoAddress) || + errors.As(err, new(*net.OpError)) || + errors.As(err, new(*ssh.ExitError)) { + // These errors are expected while the + // instance is booting, so we only log them at + // debug level. + logger.WithError(err).Debug("boot probe failed") + } else { + // Other errors are more likely to indicate a + // configuration problem, and it's more + // sysadmin-friendly to show them right away + // instead of waiting until boot timeout and + // only showing the last error. + // + // Example: "ssh: handshake failed: ssh: + // unable to authenticate, attempted methods + // [none publickey], no supported methods + // remain" + logger.WithError(err).Warn("boot probe failed") + } + return stderr, err } logger.Info("boot probe succeeded") + return stderr, nil +} + +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { if err = wkr.wp.loadRunnerData(); err != nil { wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") - return false, stderr + return } else if len(wkr.wp.runnerData) == 0 { // Assume crunch-run is already installed - } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { - wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") - return false, stderr2 - } else { - stderr = append(stderr, stderr2...) + return } - return true, stderr -} -func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) dstdir, _ := filepath.Split(wkr.wp.runnerCmd) logger := wkr.logger.WithFields(logrus.Fields{ @@ -437,9 +555,11 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { // Never shut down. return false } - label, threshold := "", wkr.wp.timeoutProbe + prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe if wkr.state == StateUnknown || wkr.state == StateBooting { - label, threshold = "new ", wkr.wp.timeoutBooting + prologue = "new " + epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html" + threshold = wkr.wp.timeoutBooting } if dur < threshold { return false @@ -448,7 +568,7 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { "Duration": dur, "Since": wkr.probed, "State": wkr.state, - }).Warnf("%sinstance unresponsive, shutting down", label) + }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue) wkr.shutdown() return true } @@ -499,7 +619,7 @@ func (wkr *worker) shutdownIfIdle() bool { "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, }).Info("shutdown worker") - wkr.reportBootOutcome(BootOutcomeIdleShutdown) + wkr.reportBootOutcome(BootOutcomeAborted) wkr.shutdown() return true } @@ -555,10 +675,12 @@ func (wkr *worker) Close() { for uuid, rr := range wkr.running { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.running, uuid) } for uuid, rr := range wkr.starting { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.starting, uuid) } }