X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d9fdb9630e56f3ccdaee6acd8b1ca4cdbdf11b0a..df1ebc0e3184afd3fb66414651fc1aec713928bf:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 8b4be1a3c7..10a28157e4 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -7,17 +7,21 @@ package worker import ( "bytes" "encoding/json" + "errors" "fmt" "io" + "net" "path/filepath" "strings" "sync" "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( @@ -184,6 +188,14 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if wkr.starting[ctr.UUID] != rr { + // Someone else (e.g., wkr.probeAndUpdate() -> + // wkr.updateRunning() or wkr.Close()) already + // moved our runner from wkr.starting to + // wkr.running or deleted it while we were in + // rr.Start(). + return + } now := time.Now() wkr.updated = now wkr.busy = now @@ -236,6 +248,7 @@ func (wkr *worker) probeAndUpdate() { ctrUUIDs []string ok bool stderr []byte // from probeBooted + errLast error // from probeBooted or copyRunnerData ) switch initialState { @@ -252,7 +265,8 @@ func (wkr *worker) probeAndUpdate() { logger := wkr.logger.WithField("ProbeStart", probeStart) if !booted { - booted, stderr = wkr.probeBooted() + stderr, errLast = wkr.probeBooted() + booted = errLast == nil shouldCopy := booted || initialState == StateUnknown if !booted { // Pretend this probe succeeded if another @@ -269,6 +283,7 @@ func (wkr *worker) probeAndUpdate() { if err != nil { booted = false wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary") + errLast = err } } if booted { @@ -301,17 +316,17 @@ func (wkr *worker) probeAndUpdate() { dur := probeStart.Sub(wkr.probed) if wkr.shutdownIfBroken(dur) { // stderr from failed run-probes will have - // been logged already, but boot-probe + // been logged already, but some boot-probe // failures are normal so they are logged only - // at Debug level. This is our chance to log - // some evidence about why the node never + // at Debug level. This may be our chance to + // log some evidence about why the node never // booted, even in non-debug mode. if !booted { wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), - }).Info("boot failed") + }).WithError(errLast).Info("boot failed") } } return @@ -462,7 +477,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { return } -func (wkr *worker) probeBooted() (ok bool, stderr []byte) { +func (wkr *worker) probeBooted() (stderr []byte, err error) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" @@ -474,11 +489,30 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { "stderr": string(stderr), }) if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr + if errors.Is(err, sshexecutor.ErrNoAddress) || + errors.As(err, new(*net.OpError)) || + errors.As(err, new(*ssh.ExitError)) { + // These errors are expected while the + // instance is booting, so we only log them at + // debug level. + logger.WithError(err).Debug("boot probe failed") + } else { + // Other errors are more likely to indicate a + // configuration problem, and it's more + // sysadmin-friendly to show them right away + // instead of waiting until boot timeout and + // only showing the last error. + // + // Example: "ssh: handshake failed: ssh: + // unable to authenticate, attempted methods + // [none publickey], no supported methods + // remain" + logger.WithError(err).Warn("boot probe failed") + } + return stderr, err } logger.Info("boot probe succeeded") - return true, stderr + return stderr, nil } func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { @@ -521,9 +555,11 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { // Never shut down. return false } - label, threshold := "", wkr.wp.timeoutProbe + prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe if wkr.state == StateUnknown || wkr.state == StateBooting { - label, threshold = "new ", wkr.wp.timeoutBooting + prologue = "new " + epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html" + threshold = wkr.wp.timeoutBooting } if dur < threshold { return false @@ -532,7 +568,7 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { "Duration": dur, "Since": wkr.probed, "State": wkr.state, - }).Warnf("%sinstance unresponsive, shutting down", label) + }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue) wkr.shutdown() return true } @@ -639,10 +675,12 @@ func (wkr *worker) Close() { for uuid, rr := range wkr.running { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.running, uuid) } for uuid, rr := range wkr.starting { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.starting, uuid) } }