X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d6598fd6339e6219a7103781433356dfde546527..8288bba27e9beff7273aeb65c5200248e52bab02:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 517a5d193e..b01a820cd6 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -177,6 +177,9 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } go func() { rr.Start() + if wkr.wp.mTimeFromQueueToCrunchRun != nil { + wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds()) + } wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() @@ -189,7 +192,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) { } // ProbeAndUpdate conducts appropriate boot/running probes (if any) -// for the worker's curent state. If a previous probe is still +// for the worker's current state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. @@ -310,6 +313,10 @@ func (wkr *worker) probeAndUpdate() { // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. + logger.WithFields(logrus.Fields{ + "updated": updated, + "wkr.updated": wkr.updated, + }).Debug("skipping worker state update due to probe/sync race") return } @@ -373,6 +380,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } + before := time.Now() stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ @@ -380,8 +388,15 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") + wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) return } + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).Debug("probe succeeded") + wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) ok = true staleRunLock := false @@ -403,6 +418,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { // empty string following final newline } else if s == "broken" { reportsBroken = true + } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) { + // Ignore crunch-run processes that belong to + // a different cluster (e.g., a single host + // running multiple clusters with the loopback + // driver) + continue } else if toks := strings.Split(s, " "); len(toks) == 1 { running = append(running, s) } else if toks[1] == "stale" {