import (
"bytes"
+ "encoding/json"
"fmt"
+ "io"
"path/filepath"
"strings"
"sync"
if !booted {
booted, stderr = wkr.probeBooted()
+ shouldCopy := booted || initialState == StateUnknown
if !booted {
// Pretend this probe succeeded if another
// concurrent attempt succeeded.
wkr.mtx.Lock()
- booted = wkr.state == StateRunning || wkr.state == StateIdle
+ if wkr.state == StateRunning || wkr.state == StateIdle {
+ booted = true
+ shouldCopy = false
+ }
wkr.mtx.Unlock()
}
+ if shouldCopy {
+ _, stderrCopy, err := wkr.copyRunnerData()
+ if err != nil {
+ booted = false
+ wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
+ }
+ }
if booted {
logger.Info("instance booted; will try probeRunning")
}
}
reportedBroken := false
- if booted || wkr.state == StateUnknown {
+ if booted || initialState == StateUnknown {
ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
cmd = "sudo " + cmd
}
before := time.Now()
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ var stdin io.Reader
+ if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
+ j, _ := json.Marshal(prices)
+ stdin = bytes.NewReader(j)
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
// empty string following final newline
} else if s == "broken" {
reportsBroken = true
+ } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+ // Ignore crunch-run processes that belong to
+ // a different cluster (e.g., a single host
+ // running multiple clusters with the loopback
+ // driver)
+ continue
} else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
} else if toks[1] == "stale" {
return false, stderr
}
logger.Info("boot probe succeeded")
+ return true, stderr
+}
+
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
if err = wkr.wp.loadRunnerData(); err != nil {
wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
- return false, stderr
+ return
} else if len(wkr.wp.runnerData) == 0 {
// Assume crunch-run is already installed
- } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
- wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
- return false, stderr2
- } else {
- stderr = append(stderr, stderr2...)
+ return
}
- return true, stderr
-}
-func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
logger := wkr.logger.WithFields(logrus.Fields{