package worker
import (
+ "bytes"
"fmt"
+ "path/filepath"
"strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/stats"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
return []byte(stateString[s]), nil
}
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+ BootOutcomeFailed BootOutcome = "failure"
+ BootOutcomeSucceeded BootOutcome = "success"
+ BootOutcomeAborted BootOutcome = "aborted"
+ BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+ BootOutcomeFailed: true,
+ BootOutcomeSucceeded: true,
+ BootOutcomeAborted: true,
+ BootOutcomeDisappeared: true,
+}
+
// IdleBehavior indicates the behavior desired when a node becomes idle.
type IdleBehavior string
executor Executor
wp *Pool
- mtx sync.Locker // must be wp's Locker.
- state State
- idleBehavior IdleBehavior
- instance cloud.Instance
- instType arvados.InstanceType
- vcpus int64
- memory int64
- appeared time.Time
- probed time.Time
- updated time.Time
- busy time.Time
- destroyed time.Time
- lastUUID string
- running map[string]*remoteRunner // remember to update state idle<->running when this changes
- starting map[string]*remoteRunner // remember to update state idle<->running when this changes
- probing chan struct{}
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ idleBehavior IdleBehavior
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ appeared time.Time
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ destroyed time.Time
+ firstSSHConnection time.Time
+ lastUUID string
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
+ probing chan struct{}
+ bootOutcomeReported bool
+ timeToReadyReported bool
}
func (wkr *worker) onUnkillable(uuid string) {
go wkr.wp.notify()
}
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+ if wkr.bootOutcomeReported {
+ return
+ }
+ if wkr.wp.mBootOutcomes != nil {
+ wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+ }
+ wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+ if wkr.timeToReadyReported {
+ return
+ }
+ if wkr.wp.mTimeToSSH != nil {
+ wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+ }
+ wkr.timeToReadyReported = true
+}
+
// caller must have lock.
func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
defer wkr.mtx.Unlock()
if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
logger.Info("probe reported broken instance")
+ wkr.reportBootOutcome(BootOutcomeFailed)
wkr.setIdleBehavior(IdleBehaviorDrain)
}
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
// some evidence about why the node never
// booted, even in non-debug mode.
if !booted {
+ wkr.reportBootOutcome(BootOutcomeFailed)
logger.WithFields(logrus.Fields{
"Duration": dur,
"stderr": string(stderr),
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+ if wkr.state == StateBooting {
+ wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+ }
// Note: this will change again below if
// len(wkr.starting)+len(wkr.running) > 0.
wkr.state = StateIdle
}
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
+ wkr.reportBootOutcome(BootOutcomeSucceeded)
logger.WithFields(logrus.Fields{
"RunningContainers": len(wkr.running),
"State": wkr.state,
}
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
- cmd := "crunch-run --list"
+ cmd := wkr.wp.runnerCmd + " --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
return false, stderr
}
logger.Info("boot probe succeeded")
+ if err = wkr.wp.loadRunnerData(); err != nil {
+ wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+ return false, stderr
+ } else if len(wkr.wp.runnerData) == 0 {
+ // Assume crunch-run is already installed
+ } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+ wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+ return false, stderr2
+ } else {
+ stderr = append(stderr, stderr2...)
+ }
return true, stderr
}
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+ hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+ dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "hash": hash,
+ "path": wkr.wp.runnerCmd,
+ })
+
+ stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+ if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ logger.Info("runner binary already exists on worker, with correct hash")
+ return
+ }
+
+ // Note touch+chmod come before writing data, to avoid the
+ // possibility of md5 being correct while file mode is
+ // incorrect.
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
+ if wkr.instance.RemoteUser() != "root" {
+ cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+ }
+ logger.WithField("cmd", cmd).Info("installing runner binary on worker")
+ stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+ return
+}
+
// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
if wkr.idleBehavior == IdleBehaviorHold {
"IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
}).Info("shutdown worker")
+ wkr.reportBootOutcome(BootOutcomeAborted)
wkr.shutdown()
return true
}