X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a07891c7ac7e90ebdf35ae1812ec03c818fe2a67..233a2b6bd23a3e2054cfc0690f2bc06c0f9f7323:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 41117c1d4e..03ab15176f 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,7 +5,6 @@ package worker import ( - "bytes" "fmt" "strings" "sync" @@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() { logger.Info("instance booted; will try probeRunning") } } + reportedBroken := false if booted || wkr.state == StateUnknown { - ctrUUIDs, ok = wkr.probeRunning() + ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { + logger.Info("probe reported broken instance") + wkr.setIdleBehavior(IdleBehaviorDrain) + } if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was @@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() { go wkr.wp.notify() } -func (wkr *worker) probeRunning() (running []string, ok bool) { +func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { cmd := "crunch-run --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd @@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) { "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") - return nil, false + return } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true + ok = true + for _, s := range strings.Split(string(stdout), "\n") { + if s == "broken" { + reportsBroken = true + } else if s != "" { + running = append(running, s) + } } - return strings.Split(string(stdout), "\n"), true + return } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { @@ -447,8 +455,8 @@ func (wkr *worker) saveTags() { instance := wkr.instance tags := instance.Tags() update := cloud.InstanceTags{ - tagKeyInstanceType: wkr.instType.Name, - tagKeyIdleBehavior: string(wkr.idleBehavior), + wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, + wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), } save := false for k, v := range update {