20649: Fix panic on race, worker shutdown vs. container startup.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index b2ed6c2bff5b039435944b851a9fe3646c922001..0b406ce61de61f248b76bfa30c5ad449b8383374 100644 (file)
@@ -7,17 +7,21 @@ package worker
 import (
        "bytes"
        "encoding/json"
+       "errors"
        "fmt"
        "io"
+       "net"
        "path/filepath"
        "strings"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
@@ -184,6 +188,14 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                }
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
+               if wkr.starting[ctr.UUID] != rr {
+                       // Someone else (e.g., wkr.probeAndUpdate() ->
+                       // wkr.updateRunning() or wkr.Close()) already
+                       // moved our runner from wkr.starting to
+                       // wkr.running or deleted it while we were in
+                       // rr.Start().
+                       return
+               }
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
@@ -236,6 +248,7 @@ func (wkr *worker) probeAndUpdate() {
                ctrUUIDs []string
                ok       bool
                stderr   []byte // from probeBooted
+               errLast  error  // from probeBooted or copyRunnerData
        )
 
        switch initialState {
@@ -252,20 +265,33 @@ func (wkr *worker) probeAndUpdate() {
        logger := wkr.logger.WithField("ProbeStart", probeStart)
 
        if !booted {
-               booted, stderr = wkr.probeBooted()
+               stderr, errLast = wkr.probeBooted()
+               booted = errLast == nil
+               shouldCopy := booted || initialState == StateUnknown
                if !booted {
                        // Pretend this probe succeeded if another
                        // concurrent attempt succeeded.
                        wkr.mtx.Lock()
-                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       if wkr.state == StateRunning || wkr.state == StateIdle {
+                               booted = true
+                               shouldCopy = false
+                       }
                        wkr.mtx.Unlock()
                }
+               if shouldCopy {
+                       _, stderrCopy, err := wkr.copyRunnerData()
+                       if err != nil {
+                               booted = false
+                               wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
+                               errLast = err
+                       }
+               }
                if booted {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
        reportedBroken := false
-       if booted || wkr.state == StateUnknown {
+       if booted || initialState == StateUnknown {
                ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
@@ -290,17 +316,17 @@ func (wkr *worker) probeAndUpdate() {
                dur := probeStart.Sub(wkr.probed)
                if wkr.shutdownIfBroken(dur) {
                        // stderr from failed run-probes will have
-                       // been logged already, but boot-probe
+                       // been logged already, but some boot-probe
                        // failures are normal so they are logged only
-                       // at Debug level. This is our chance to log
-                       // some evidence about why the node never
+                       // at Debug level. This may be our chance to
+                       // log some evidence about why the node never
                        // booted, even in non-debug mode.
                        if !booted {
                                wkr.reportBootOutcome(BootOutcomeFailed)
                                logger.WithFields(logrus.Fields{
                                        "Duration": dur,
                                        "stderr":   string(stderr),
-                               }).Info("boot failed")
+                               }).WithError(errLast).Info("boot failed")
                        }
                }
                return
@@ -451,7 +477,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        return
 }
 
-func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+func (wkr *worker) probeBooted() (stderr []byte, err error) {
        cmd := wkr.wp.bootProbeCommand
        if cmd == "" {
                cmd = "true"
@@ -463,25 +489,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                "stderr":  string(stderr),
        })
        if err != nil {
-               logger.WithError(err).Debug("boot probe failed")
-               return false, stderr
+               if errors.Is(err, sshexecutor.ErrNoAddress) ||
+                       errors.As(err, new(*net.OpError)) ||
+                       errors.As(err, new(*ssh.ExitError)) {
+                       // These errors are expected while the
+                       // instance is booting, so we only log them at
+                       // debug level.
+                       logger.WithError(err).Debug("boot probe failed")
+               } else {
+                       // Other errors are more likely to indicate a
+                       // configuration problem, and it's more
+                       // sysadmin-friendly to show them right away
+                       // instead of waiting until boot timeout and
+                       // only showing the last error.
+                       //
+                       // Example: "ssh: handshake failed: ssh:
+                       // unable to authenticate, attempted methods
+                       // [none publickey], no supported methods
+                       // remain"
+                       logger.WithError(err).Warn("boot probe failed")
+               }
+               return stderr, err
        }
        logger.Info("boot probe succeeded")
+       return stderr, nil
+}
+
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
        if err = wkr.wp.loadRunnerData(); err != nil {
                wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
-               return false, stderr
+               return
        } else if len(wkr.wp.runnerData) == 0 {
                // Assume crunch-run is already installed
-       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
-               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
-               return false, stderr2
-       } else {
-               stderr = append(stderr, stderr2...)
+               return
        }
-       return true, stderr
-}
 
-func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
        hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
        dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
        logger := wkr.logger.WithFields(logrus.Fields{
@@ -631,10 +673,12 @@ func (wkr *worker) Close() {
        for uuid, rr := range wkr.running {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.running, uuid)
        }
        for uuid, rr := range wkr.starting {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.starting, uuid)
        }
 }