import (
"bytes"
+ "encoding/json"
"fmt"
+ "io"
"path/filepath"
"strings"
"sync"
// not yet running when ctrUUIDs was generated. Leave
// wkr.running alone and wait for the next probe to
// catch up on any changes.
+ logger.WithFields(logrus.Fields{
+ "updated": updated,
+ "wkr.updated": wkr.updated,
+ }).Debug("skipping worker state update due to probe/sync race")
return
}
cmd = "sudo " + cmd
}
before := time.Now()
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ var stdin io.Reader
+ if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
+ j, _ := json.Marshal(prices)
+ stdin = bytes.NewReader(j)
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
return
}
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).Debug("probe succeeded")
wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
ok = true
// empty string following final newline
} else if s == "broken" {
reportsBroken = true
+ } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+ // Ignore crunch-run processes that belong to
+ // a different cluster (e.g., a single host
+ // running multiple clusters with the loopback
+ // driver)
+ continue
} else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
} else if toks[1] == "stale" {