Merge branch '21666-provision-test-improvement'
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 6878bb0655ea1e3bc1401396f0c9cbfe4ad9bba0..10a28157e43ee0e496fd82b8705542a58aa0c8ca 100644 (file)
@@ -6,16 +6,22 @@ package worker
 
 import (
        "bytes"
+       "encoding/json"
+       "errors"
        "fmt"
+       "io"
+       "net"
        "path/filepath"
        "strings"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
@@ -43,33 +49,6 @@ var stateString = map[State]string{
        StateShutdown: "shutdown",
 }
 
-// BootOutcome is the result of a worker boot. It is used as a label in a metric.
-type BootOutcome string
-
-const (
-       BootOutcomeFailed       BootOutcome = "failure"
-       BootOutcomeSucceeded    BootOutcome = "success"
-       BootOutcomeIdleShutdown BootOutcome = "idle shutdown"
-       BootOutcomeDisappeared  BootOutcome = "disappeared"
-)
-
-var validBootOutcomes = map[BootOutcome]bool{
-       BootOutcomeFailed:       true,
-       BootOutcomeSucceeded:    true,
-       BootOutcomeIdleShutdown: true,
-       BootOutcomeDisappeared:  true,
-}
-
-func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
-       if wkr.bootOutcomeReported {
-               return
-       }
-       if wkr.wp.mBootOutcomes != nil {
-               wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
-       }
-       wkr.bootOutcomeReported = true
-}
-
 // String implements fmt.Stringer.
 func (s State) String() string {
        return stateString[s]
@@ -81,6 +60,23 @@ func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
 
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+       BootOutcomeFailed      BootOutcome = "failure"
+       BootOutcomeSucceeded   BootOutcome = "success"
+       BootOutcomeAborted     BootOutcome = "aborted"
+       BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+       BootOutcomeFailed:      true,
+       BootOutcomeSucceeded:   true,
+       BootOutcomeAborted:     true,
+       BootOutcomeDisappeared: true,
+}
+
 // IdleBehavior indicates the behavior desired when a node becomes idle.
 type IdleBehavior string
 
@@ -113,11 +109,14 @@ type worker struct {
        updated             time.Time
        busy                time.Time
        destroyed           time.Time
+       firstSSHConnection  time.Time
        lastUUID            string
        running             map[string]*remoteRunner // remember to update state idle<->running when this changes
        starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing             chan struct{}
        bootOutcomeReported bool
+       timeToReadyReported bool
+       staleRunLockSince   time.Time
 }
 
 func (wkr *worker) onUnkillable(uuid string) {
@@ -139,6 +138,28 @@ func (wkr *worker) onKilled(uuid string) {
        go wkr.wp.notify()
 }
 
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+       if wkr.bootOutcomeReported {
+               return
+       }
+       if wkr.wp.mBootOutcomes != nil {
+               wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+       }
+       wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+       if wkr.timeToReadyReported {
+               return
+       }
+       if wkr.wp.mTimeToSSH != nil {
+               wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+       }
+       wkr.timeToReadyReported = true
+}
+
 // caller must have lock.
 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
        wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
@@ -162,8 +183,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        }
        go func() {
                rr.Start()
+               if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+                       wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
+               }
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
+               if wkr.starting[ctr.UUID] != rr {
+                       // Someone else (e.g., wkr.probeAndUpdate() ->
+                       // wkr.updateRunning() or wkr.Close()) already
+                       // moved our runner from wkr.starting to
+                       // wkr.running or deleted it while we were in
+                       // rr.Start().
+                       return
+               }
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
@@ -174,7 +206,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
 }
 
 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's current state. If a previous probe is still
 // running, it does nothing.
 //
 // It should be called in a new goroutine.
@@ -216,6 +248,7 @@ func (wkr *worker) probeAndUpdate() {
                ctrUUIDs []string
                ok       bool
                stderr   []byte // from probeBooted
+               errLast  error  // from probeBooted or copyRunnerData
        )
 
        switch initialState {
@@ -232,20 +265,33 @@ func (wkr *worker) probeAndUpdate() {
        logger := wkr.logger.WithField("ProbeStart", probeStart)
 
        if !booted {
-               booted, stderr = wkr.probeBooted()
+               stderr, errLast = wkr.probeBooted()
+               booted = errLast == nil
+               shouldCopy := booted || initialState == StateUnknown
                if !booted {
                        // Pretend this probe succeeded if another
                        // concurrent attempt succeeded.
                        wkr.mtx.Lock()
-                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       if wkr.state == StateRunning || wkr.state == StateIdle {
+                               booted = true
+                               shouldCopy = false
+                       }
                        wkr.mtx.Unlock()
                }
+               if shouldCopy {
+                       _, stderrCopy, err := wkr.copyRunnerData()
+                       if err != nil {
+                               booted = false
+                               wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
+                               errLast = err
+                       }
+               }
                if booted {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
        reportedBroken := false
-       if booted || wkr.state == StateUnknown {
+       if booted || initialState == StateUnknown {
                ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
@@ -270,17 +316,17 @@ func (wkr *worker) probeAndUpdate() {
                dur := probeStart.Sub(wkr.probed)
                if wkr.shutdownIfBroken(dur) {
                        // stderr from failed run-probes will have
-                       // been logged already, but boot-probe
+                       // been logged already, but some boot-probe
                        // failures are normal so they are logged only
-                       // at Debug level. This is our chance to log
-                       // some evidence about why the node never
+                       // at Debug level. This may be our chance to
+                       // log some evidence about why the node never
                        // booted, even in non-debug mode.
                        if !booted {
                                wkr.reportBootOutcome(BootOutcomeFailed)
                                logger.WithFields(logrus.Fields{
                                        "Duration": dur,
                                        "stderr":   string(stderr),
-                               }).Info("boot failed")
+                               }).WithError(errLast).Info("boot failed")
                        }
                }
                return
@@ -295,6 +341,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -312,6 +362,9 @@ func (wkr *worker) probeAndUpdate() {
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               if wkr.state == StateBooting {
+                       wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+               }
                // Note: this will change again below if
                // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
@@ -355,27 +408,76 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+       before := time.Now()
+       var stdin io.Reader
+       if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
+               j, _ := json.Marshal(prices)
+               stdin = bytes.NewReader(j)
+       }
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
                        "Command": cmd,
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
+               wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
                return
        }
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
+       wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
        ok = true
+
+       staleRunLock := false
        for _, s := range strings.Split(string(stdout), "\n") {
-               if s == "broken" {
+               // Each line of the "crunch-run --list" output is one
+               // of the following:
+               //
+               // * a container UUID, indicating that processes
+               //   related to that container are currently running.
+               //   Optionally followed by " stale", indicating that
+               //   the crunch-run process itself has exited (the
+               //   remaining process is probably arv-mount).
+               //
+               // * the string "broken", indicating that the instance
+               //   appears incapable of starting containers.
+               //
+               // See ListProcesses() in lib/crunchrun/background.go.
+               if s == "" {
+                       // empty string following final newline
+               } else if s == "broken" {
                        reportsBroken = true
-               } else if s != "" {
+               } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+                       // Ignore crunch-run processes that belong to
+                       // a different cluster (e.g., a single host
+                       // running multiple clusters with the loopback
+                       // driver)
+                       continue
+               } else if toks := strings.Split(s, " "); len(toks) == 1 {
                        running = append(running, s)
+               } else if toks[1] == "stale" {
+                       wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+                       staleRunLock = true
                }
        }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !staleRunLock {
+               wkr.staleRunLockSince = time.Time{}
+       } else if wkr.staleRunLockSince.IsZero() {
+               wkr.staleRunLockSince = time.Now()
+       } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+               wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+               reportsBroken = true
+       }
        return
 }
 
-func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+func (wkr *worker) probeBooted() (stderr []byte, err error) {
        cmd := wkr.wp.bootProbeCommand
        if cmd == "" {
                cmd = "true"
@@ -387,25 +489,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                "stderr":  string(stderr),
        })
        if err != nil {
-               logger.WithError(err).Debug("boot probe failed")
-               return false, stderr
+               if errors.Is(err, sshexecutor.ErrNoAddress) ||
+                       errors.As(err, new(*net.OpError)) ||
+                       errors.As(err, new(*ssh.ExitError)) {
+                       // These errors are expected while the
+                       // instance is booting, so we only log them at
+                       // debug level.
+                       logger.WithError(err).Debug("boot probe failed")
+               } else {
+                       // Other errors are more likely to indicate a
+                       // configuration problem, and it's more
+                       // sysadmin-friendly to show them right away
+                       // instead of waiting until boot timeout and
+                       // only showing the last error.
+                       //
+                       // Example: "ssh: handshake failed: ssh:
+                       // unable to authenticate, attempted methods
+                       // [none publickey], no supported methods
+                       // remain"
+                       logger.WithError(err).Warn("boot probe failed")
+               }
+               return stderr, err
        }
        logger.Info("boot probe succeeded")
+       return stderr, nil
+}
+
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
        if err = wkr.wp.loadRunnerData(); err != nil {
                wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
-               return false, stderr
+               return
        } else if len(wkr.wp.runnerData) == 0 {
                // Assume crunch-run is already installed
-       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
-               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
-               return false, stderr2
-       } else {
-               stderr = append(stderr, stderr2...)
+               return
        }
-       return true, stderr
-}
 
-func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
        hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
        dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
        logger := wkr.logger.WithFields(logrus.Fields{
@@ -437,9 +555,11 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
                // Never shut down.
                return false
        }
-       label, threshold := "", wkr.wp.timeoutProbe
+       prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe
        if wkr.state == StateUnknown || wkr.state == StateBooting {
-               label, threshold = "new ", wkr.wp.timeoutBooting
+               prologue = "new "
+               epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html"
+               threshold = wkr.wp.timeoutBooting
        }
        if dur < threshold {
                return false
@@ -448,7 +568,7 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
                "Duration": dur,
                "Since":    wkr.probed,
                "State":    wkr.state,
-       }).Warnf("%sinstance unresponsive, shutting down", label)
+       }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue)
        wkr.shutdown()
        return true
 }
@@ -499,7 +619,7 @@ func (wkr *worker) shutdownIfIdle() bool {
                "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown worker")
-       wkr.reportBootOutcome(BootOutcomeIdleShutdown)
+       wkr.reportBootOutcome(BootOutcomeAborted)
        wkr.shutdown()
        return true
 }
@@ -555,10 +675,12 @@ func (wkr *worker) Close() {
        for uuid, rr := range wkr.running {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.running, uuid)
        }
        for uuid, rr := range wkr.starting {
                wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
                rr.Close()
+               delete(wkr.starting, uuid)
        }
 }