projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Allow multiple clusters to use loopback driver on same host.
[arvados.git]
/
lib
/
dispatchcloud
/
worker
/
worker.go
diff --git
a/lib/dispatchcloud/worker/worker.go
b/lib/dispatchcloud/worker/worker.go
index 5b145d7c6599b75bb6e8b30f6c65e65d82186d84..b01a820cd619b172538b725d689d0323897611d5 100644
(file)
--- a/
lib/dispatchcloud/worker/worker.go
+++ b/
lib/dispatchcloud/worker/worker.go
@@
-192,7
+192,7
@@
func (wkr *worker) startContainer(ctr arvados.Container) {
}
// ProbeAndUpdate conducts appropriate boot/running probes (if any)
}
// ProbeAndUpdate conducts appropriate boot/running probes (if any)
-// for the worker's curent state. If a previous probe is still
+// for the worker's cur
r
ent state. If a previous probe is still
// running, it does nothing.
//
// It should be called in a new goroutine.
// running, it does nothing.
//
// It should be called in a new goroutine.
@@
-313,6
+313,10
@@
func (wkr *worker) probeAndUpdate() {
// not yet running when ctrUUIDs was generated. Leave
// wkr.running alone and wait for the next probe to
// catch up on any changes.
// not yet running when ctrUUIDs was generated. Leave
// wkr.running alone and wait for the next probe to
// catch up on any changes.
+ logger.WithFields(logrus.Fields{
+ "updated": updated,
+ "wkr.updated": wkr.updated,
+ }).Debug("skipping worker state update due to probe/sync race")
return
}
return
}
@@
-376,6
+380,7
@@
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
}
+ before := time.Now()
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
@@
-383,8
+388,15
@@
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
+ wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
return
}
return
}
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).Debug("probe succeeded")
+ wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
ok = true
staleRunLock := false
ok = true
staleRunLock := false
@@
-406,6
+418,12
@@
func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
// empty string following final newline
} else if s == "broken" {
reportsBroken = true
// empty string following final newline
} else if s == "broken" {
reportsBroken = true
+ } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+ // Ignore crunch-run processes that belong to
+ // a different cluster (e.g., a single host
+ // running multiple clusters with the loopback
+ // driver)
+ continue
} else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
} else if toks[1] == "stale" {
} else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
} else if toks[1] == "stale" {