X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b517f68ab03879edb3cec475bd1988c2e5fe96bd..e44725a3792df227f189f88ffb2cd1dbf0e93489:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 8b4be1a3c7..0b406ce61d 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -7,17 +7,21 @@ package worker import ( "bytes" "encoding/json" + "errors" "fmt" "io" + "net" "path/filepath" "strings" "sync" "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( @@ -184,6 +188,14 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if wkr.starting[ctr.UUID] != rr { + // Someone else (e.g., wkr.probeAndUpdate() -> + // wkr.updateRunning() or wkr.Close()) already + // moved our runner from wkr.starting to + // wkr.running or deleted it while we were in + // rr.Start(). + return + } now := time.Now() wkr.updated = now wkr.busy = now @@ -236,6 +248,7 @@ func (wkr *worker) probeAndUpdate() { ctrUUIDs []string ok bool stderr []byte // from probeBooted + errLast error // from probeBooted or copyRunnerData ) switch initialState { @@ -252,7 +265,8 @@ func (wkr *worker) probeAndUpdate() { logger := wkr.logger.WithField("ProbeStart", probeStart) if !booted { - booted, stderr = wkr.probeBooted() + stderr, errLast = wkr.probeBooted() + booted = errLast == nil shouldCopy := booted || initialState == StateUnknown if !booted { // Pretend this probe succeeded if another @@ -269,6 +283,7 @@ func (wkr *worker) probeAndUpdate() { if err != nil { booted = false wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary") + errLast = err } } if booted { @@ -301,17 +316,17 @@ func (wkr *worker) probeAndUpdate() { dur := probeStart.Sub(wkr.probed) if wkr.shutdownIfBroken(dur) { // stderr from failed run-probes will have - // been logged already, but boot-probe + // been logged already, but some boot-probe // failures are normal so they are logged only - // at Debug level. This is our chance to log - // some evidence about why the node never + // at Debug level. This may be our chance to + // log some evidence about why the node never // booted, even in non-debug mode. if !booted { wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), - }).Info("boot failed") + }).WithError(errLast).Info("boot failed") } } return @@ -462,7 +477,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { return } -func (wkr *worker) probeBooted() (ok bool, stderr []byte) { +func (wkr *worker) probeBooted() (stderr []byte, err error) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" @@ -474,11 +489,30 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { "stderr": string(stderr), }) if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr + if errors.Is(err, sshexecutor.ErrNoAddress) || + errors.As(err, new(*net.OpError)) || + errors.As(err, new(*ssh.ExitError)) { + // These errors are expected while the + // instance is booting, so we only log them at + // debug level. + logger.WithError(err).Debug("boot probe failed") + } else { + // Other errors are more likely to indicate a + // configuration problem, and it's more + // sysadmin-friendly to show them right away + // instead of waiting until boot timeout and + // only showing the last error. + // + // Example: "ssh: handshake failed: ssh: + // unable to authenticate, attempted methods + // [none publickey], no supported methods + // remain" + logger.WithError(err).Warn("boot probe failed") + } + return stderr, err } logger.Info("boot probe succeeded") - return true, stderr + return stderr, nil } func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { @@ -639,10 +673,12 @@ func (wkr *worker) Close() { for uuid, rr := range wkr.running { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.running, uuid) } for uuid, rr := range wkr.starting { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.starting, uuid) } }