import (
"bytes"
+ "encoding/json"
+ "errors"
"fmt"
+ "io"
+ "net"
"path/filepath"
"strings"
"sync"
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
StateShutdown: "shutdown",
}
-// BootOutcome is the result of a worker boot. It is used as a label in a metric.
-type BootOutcome string
-
-const (
- BootOutcomeFailed BootOutcome = "failure"
- BootOutcomeSucceeded BootOutcome = "success"
- BootOutcomeIdleShutdown BootOutcome = "idle shutdown"
- BootOutcomeDisappeared BootOutcome = "disappeared"
-)
-
-var validBootOutcomes = map[BootOutcome]bool{
- BootOutcomeFailed: true,
- BootOutcomeSucceeded: true,
- BootOutcomeIdleShutdown: true,
- BootOutcomeDisappeared: true,
-}
-
-func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
- if wkr.bootOutcomeReported {
- return
- }
- if wkr.wp.mBootOutcomes != nil {
- wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
- }
- wkr.bootOutcomeReported = true
-}
-
// String implements fmt.Stringer.
func (s State) String() string {
return stateString[s]
return []byte(stateString[s]), nil
}
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+ BootOutcomeFailed BootOutcome = "failure"
+ BootOutcomeSucceeded BootOutcome = "success"
+ BootOutcomeAborted BootOutcome = "aborted"
+ BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+ BootOutcomeFailed: true,
+ BootOutcomeSucceeded: true,
+ BootOutcomeAborted: true,
+ BootOutcomeDisappeared: true,
+}
+
// IdleBehavior indicates the behavior desired when a node becomes idle.
type IdleBehavior string
updated time.Time
busy time.Time
destroyed time.Time
+ firstSSHConnection time.Time
lastUUID string
running map[string]*remoteRunner // remember to update state idle<->running when this changes
starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
bootOutcomeReported bool
+ timeToReadyReported bool
+ staleRunLockSince time.Time
}
func (wkr *worker) onUnkillable(uuid string) {
go wkr.wp.notify()
}
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+ if wkr.bootOutcomeReported {
+ return
+ }
+ if wkr.wp.mBootOutcomes != nil {
+ wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+ }
+ wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+ if wkr.timeToReadyReported {
+ return
+ }
+ if wkr.wp.mTimeToSSH != nil {
+ wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+ }
+ wkr.timeToReadyReported = true
+}
+
// caller must have lock.
func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
}
go func() {
rr.Start()
+ if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+ wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
+ }
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if wkr.starting[ctr.UUID] != rr {
+ // Someone else (e.g., wkr.probeAndUpdate() ->
+ // wkr.updateRunning() or wkr.Close()) already
+ // moved our runner from wkr.starting to
+ // wkr.running or deleted it while we were in
+ // rr.Start().
+ return
+ }
now := time.Now()
wkr.updated = now
wkr.busy = now
}
// ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's current state. If a previous probe is still
// running, it does nothing.
//
// It should be called in a new goroutine.
ctrUUIDs []string
ok bool
stderr []byte // from probeBooted
+ errLast error // from probeBooted or copyRunnerData
)
switch initialState {
logger := wkr.logger.WithField("ProbeStart", probeStart)
if !booted {
- booted, stderr = wkr.probeBooted()
+ stderr, errLast = wkr.probeBooted()
+ booted = errLast == nil
+ shouldCopy := booted || initialState == StateUnknown
if !booted {
// Pretend this probe succeeded if another
// concurrent attempt succeeded.
wkr.mtx.Lock()
- booted = wkr.state == StateRunning || wkr.state == StateIdle
+ if wkr.state == StateRunning || wkr.state == StateIdle {
+ booted = true
+ shouldCopy = false
+ }
wkr.mtx.Unlock()
}
+ if shouldCopy {
+ _, stderrCopy, err := wkr.copyRunnerData()
+ if err != nil {
+ booted = false
+ wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
+ errLast = err
+ }
+ }
if booted {
logger.Info("instance booted; will try probeRunning")
}
}
reportedBroken := false
- if booted || wkr.state == StateUnknown {
+ if booted || initialState == StateUnknown {
ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
dur := probeStart.Sub(wkr.probed)
if wkr.shutdownIfBroken(dur) {
// stderr from failed run-probes will have
- // been logged already, but boot-probe
+ // been logged already, but some boot-probe
// failures are normal so they are logged only
- // at Debug level. This is our chance to log
- // some evidence about why the node never
+ // at Debug level. This may be our chance to
+ // log some evidence about why the node never
// booted, even in non-debug mode.
if !booted {
wkr.reportBootOutcome(BootOutcomeFailed)
logger.WithFields(logrus.Fields{
"Duration": dur,
"stderr": string(stderr),
- }).Info("boot failed")
+ }).WithError(errLast).Info("boot failed")
}
}
return
// not yet running when ctrUUIDs was generated. Leave
// wkr.running alone and wait for the next probe to
// catch up on any changes.
+ logger.WithFields(logrus.Fields{
+ "updated": updated,
+ "wkr.updated": wkr.updated,
+ }).Debug("skipping worker state update due to probe/sync race")
return
}
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+ if wkr.state == StateBooting {
+ wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+ }
// Note: this will change again below if
// len(wkr.starting)+len(wkr.running) > 0.
wkr.state = StateIdle
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ before := time.Now()
+ var stdin io.Reader
+ if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
+ j, _ := json.Marshal(prices)
+ stdin = bytes.NewReader(j)
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
+ wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
return
}
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).Debug("probe succeeded")
+ wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
ok = true
+
+ staleRunLock := false
for _, s := range strings.Split(string(stdout), "\n") {
- if s == "broken" {
+ // Each line of the "crunch-run --list" output is one
+ // of the following:
+ //
+ // * a container UUID, indicating that processes
+ // related to that container are currently running.
+ // Optionally followed by " stale", indicating that
+ // the crunch-run process itself has exited (the
+ // remaining process is probably arv-mount).
+ //
+ // * the string "broken", indicating that the instance
+ // appears incapable of starting containers.
+ //
+ // See ListProcesses() in lib/crunchrun/background.go.
+ if s == "" {
+ // empty string following final newline
+ } else if s == "broken" {
reportsBroken = true
- } else if s != "" {
+ } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+ // Ignore crunch-run processes that belong to
+ // a different cluster (e.g., a single host
+ // running multiple clusters with the loopback
+ // driver)
+ continue
+ } else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
+ } else if toks[1] == "stale" {
+ wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+ staleRunLock = true
}
}
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if !staleRunLock {
+ wkr.staleRunLockSince = time.Time{}
+ } else if wkr.staleRunLockSince.IsZero() {
+ wkr.staleRunLockSince = time.Now()
+ } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+ wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+ reportsBroken = true
+ }
return
}
-func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+func (wkr *worker) probeBooted() (stderr []byte, err error) {
cmd := wkr.wp.bootProbeCommand
if cmd == "" {
cmd = "true"
"stderr": string(stderr),
})
if err != nil {
- logger.WithError(err).Debug("boot probe failed")
- return false, stderr
+ if errors.Is(err, sshexecutor.ErrNoAddress) ||
+ errors.As(err, new(*net.OpError)) ||
+ errors.As(err, new(*ssh.ExitError)) {
+ // These errors are expected while the
+ // instance is booting, so we only log them at
+ // debug level.
+ logger.WithError(err).Debug("boot probe failed")
+ } else {
+ // Other errors are more likely to indicate a
+ // configuration problem, and it's more
+ // sysadmin-friendly to show them right away
+ // instead of waiting until boot timeout and
+ // only showing the last error.
+ //
+ // Example: "ssh: handshake failed: ssh:
+ // unable to authenticate, attempted methods
+ // [none publickey], no supported methods
+ // remain"
+ logger.WithError(err).Warn("boot probe failed")
+ }
+ return stderr, err
}
logger.Info("boot probe succeeded")
+ return stderr, nil
+}
+
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
if err = wkr.wp.loadRunnerData(); err != nil {
wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
- return false, stderr
+ return
} else if len(wkr.wp.runnerData) == 0 {
// Assume crunch-run is already installed
- } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
- wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
- return false, stderr2
- } else {
- stderr = append(stderr, stderr2...)
+ return
}
- return true, stderr
-}
-func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
logger := wkr.logger.WithFields(logrus.Fields{
"IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
}).Info("shutdown worker")
- wkr.reportBootOutcome(BootOutcomeIdleShutdown)
+ wkr.reportBootOutcome(BootOutcomeAborted)
wkr.shutdown()
return true
}
for uuid, rr := range wkr.running {
wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
rr.Close()
+ delete(wkr.running, uuid)
}
for uuid, rr := range wkr.starting {
wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
rr.Close()
+ delete(wkr.starting, uuid)
}
}