14931: Add configurable prefix for built-in tags.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 41117c1d4eafb5aa2a92c163d3f79d72ace443d3..03ab15176f5297b85182d3689b71f5a3f0195004 100644 (file)
@@ -5,7 +5,6 @@
 package worker
 
 import (
-       "bytes"
        "fmt"
        "strings"
        "sync"
@@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
+       reportedBroken := false
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok = wkr.probeRunning()
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
@@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() {
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        cmd := "crunch-run --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
@@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true
+       ok = true
+       for _, s := range strings.Split(string(stdout), "\n") {
+               if s == "broken" {
+                       reportsBroken = true
+               } else if s != "" {
+                       running = append(running, s)
+               }
        }
-       return strings.Split(string(stdout), "\n"), true
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -447,8 +455,8 @@ func (wkr *worker) saveTags() {
        instance := wkr.instance
        tags := instance.Tags()
        update := cloud.InstanceTags{
-               tagKeyInstanceType: wkr.instType.Name,
-               tagKeyIdleBehavior: string(wkr.idleBehavior),
+               wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+               wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
        save := false
        for k, v := range update {