X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/458436270ce8fb80d421d55e192236c5ac4a225e..e44725a3792df227f189f88ffb2cd1dbf0e93489:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 397a462929..0b406ce61d 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -7,17 +7,21 @@ package worker import ( "bytes" "encoding/json" + "errors" "fmt" "io" + "net" "path/filepath" "strings" "sync" "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( @@ -184,6 +188,14 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if wkr.starting[ctr.UUID] != rr { + // Someone else (e.g., wkr.probeAndUpdate() -> + // wkr.updateRunning() or wkr.Close()) already + // moved our runner from wkr.starting to + // wkr.running or deleted it while we were in + // rr.Start(). + return + } now := time.Now() wkr.updated = now wkr.busy = now @@ -236,6 +248,7 @@ func (wkr *worker) probeAndUpdate() { ctrUUIDs []string ok bool stderr []byte // from probeBooted + errLast error // from probeBooted or copyRunnerData ) switch initialState { @@ -252,20 +265,33 @@ func (wkr *worker) probeAndUpdate() { logger := wkr.logger.WithField("ProbeStart", probeStart) if !booted { - booted, stderr = wkr.probeBooted() + stderr, errLast = wkr.probeBooted() + booted = errLast == nil + shouldCopy := booted || initialState == StateUnknown if !booted { // Pretend this probe succeeded if another // concurrent attempt succeeded. wkr.mtx.Lock() - booted = wkr.state == StateRunning || wkr.state == StateIdle + if wkr.state == StateRunning || wkr.state == StateIdle { + booted = true + shouldCopy = false + } wkr.mtx.Unlock() } + if shouldCopy { + _, stderrCopy, err := wkr.copyRunnerData() + if err != nil { + booted = false + wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary") + errLast = err + } + } if booted { logger.Info("instance booted; will try probeRunning") } } reportedBroken := false - if booted || wkr.state == StateUnknown { + if booted || initialState == StateUnknown { ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() @@ -290,17 +316,17 @@ func (wkr *worker) probeAndUpdate() { dur := probeStart.Sub(wkr.probed) if wkr.shutdownIfBroken(dur) { // stderr from failed run-probes will have - // been logged already, but boot-probe + // been logged already, but some boot-probe // failures are normal so they are logged only - // at Debug level. This is our chance to log - // some evidence about why the node never + // at Debug level. This may be our chance to + // log some evidence about why the node never // booted, even in non-debug mode. if !booted { wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), - }).Info("boot failed") + }).WithError(errLast).Info("boot failed") } } return @@ -384,7 +410,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { } before := time.Now() var stdin io.Reader - if prices := wkr.instance.PriceHistory(); len(prices) > 0 { + if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 { j, _ := json.Marshal(prices) stdin = bytes.NewReader(j) } @@ -451,7 +477,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { return } -func (wkr *worker) probeBooted() (ok bool, stderr []byte) { +func (wkr *worker) probeBooted() (stderr []byte, err error) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" @@ -463,25 +489,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { "stderr": string(stderr), }) if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr + if errors.Is(err, sshexecutor.ErrNoAddress) || + errors.As(err, new(*net.OpError)) || + errors.As(err, new(*ssh.ExitError)) { + // These errors are expected while the + // instance is booting, so we only log them at + // debug level. + logger.WithError(err).Debug("boot probe failed") + } else { + // Other errors are more likely to indicate a + // configuration problem, and it's more + // sysadmin-friendly to show them right away + // instead of waiting until boot timeout and + // only showing the last error. + // + // Example: "ssh: handshake failed: ssh: + // unable to authenticate, attempted methods + // [none publickey], no supported methods + // remain" + logger.WithError(err).Warn("boot probe failed") + } + return stderr, err } logger.Info("boot probe succeeded") + return stderr, nil +} + +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { if err = wkr.wp.loadRunnerData(); err != nil { wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") - return false, stderr + return } else if len(wkr.wp.runnerData) == 0 { // Assume crunch-run is already installed - } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { - wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") - return false, stderr2 - } else { - stderr = append(stderr, stderr2...) + return } - return true, stderr -} -func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) dstdir, _ := filepath.Split(wkr.wp.runnerCmd) logger := wkr.logger.WithFields(logrus.Fields{ @@ -631,10 +673,12 @@ func (wkr *worker) Close() { for uuid, rr := range wkr.running { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.running, uuid) } for uuid, rr := range wkr.starting { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() + delete(wkr.starting, uuid) } }