X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e2d623bd4c686100772924b2b15ab808bbb147d0..48697cdb28fca37f1e420855a1bf1af2446184c7:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 517a5d193e..b2ed6c2bff 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,7 +6,9 @@ package worker import ( "bytes" + "encoding/json" "fmt" + "io" "path/filepath" "strings" "sync" @@ -177,6 +179,9 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } go func() { rr.Start() + if wkr.wp.mTimeFromQueueToCrunchRun != nil { + wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds()) + } wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() @@ -189,7 +194,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } // ProbeAndUpdate conducts appropriate boot/running probes (if any) -// for the worker's curent state. If a previous probe is still +// for the worker's current state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. @@ -310,6 +315,10 @@ func (wkr *worker) probeAndUpdate() { // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. + logger.WithFields(logrus.Fields{ + "updated": updated, + "wkr.updated": wkr.updated, + }).Debug("skipping worker state update due to probe/sync race") return } @@ -373,15 +382,28 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) + before := time.Now() + var stdin io.Reader + if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 { + j, _ := json.Marshal(prices) + stdin = bytes.NewReader(j) + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") + wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) return } + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).Debug("probe succeeded") + wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) ok = true staleRunLock := false @@ -403,6 +425,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { // empty string following final newline } else if s == "broken" { reportsBroken = true + } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) { + // Ignore crunch-run processes that belong to + // a different cluster (e.g., a single host + // running multiple clusters with the loopback + // driver) + continue } else if toks := strings.Split(s, " "); len(toks) == 1 { running = append(running, s) } else if toks[1] == "stale" {