X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6b77f5f193b9c7e931d2e0b0712c1b27cd547fc8..c19811454b70623c0f0f92e07ffc6c4b33deb63d:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index b01a820cd6..8b4be1a3c7 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,7 +6,9 @@ package worker import ( "bytes" + "encoding/json" "fmt" + "io" "path/filepath" "strings" "sync" @@ -251,19 +253,30 @@ func (wkr *worker) probeAndUpdate() { if !booted { booted, stderr = wkr.probeBooted() + shouldCopy := booted || initialState == StateUnknown if !booted { // Pretend this probe succeeded if another // concurrent attempt succeeded. wkr.mtx.Lock() - booted = wkr.state == StateRunning || wkr.state == StateIdle + if wkr.state == StateRunning || wkr.state == StateIdle { + booted = true + shouldCopy = false + } wkr.mtx.Unlock() } + if shouldCopy { + _, stderrCopy, err := wkr.copyRunnerData() + if err != nil { + booted = false + wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary") + } + } if booted { logger.Info("instance booted; will try probeRunning") } } reportedBroken := false - if booted || wkr.state == StateUnknown { + if booted || initialState == StateUnknown { ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() @@ -381,7 +394,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { cmd = "sudo " + cmd } before := time.Now() - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) + var stdin io.Reader + if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 { + j, _ := json.Marshal(prices) + stdin = bytes.NewReader(j) + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, @@ -460,21 +478,18 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + return true, stderr +} + +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { if err = wkr.wp.loadRunnerData(); err != nil { wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") - return false, stderr + return } else if len(wkr.wp.runnerData) == 0 { // Assume crunch-run is already installed - } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { - wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") - return false, stderr2 - } else { - stderr = append(stderr, stderr2...) + return } - return true, stderr -} -func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) dstdir, _ := filepath.Split(wkr.wp.runnerCmd) logger := wkr.logger.WithFields(logrus.Fields{