14931: Add configurable prefix for built-in tags.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 0872e594f9a8175973cc11032ca7942a8aa455d5..03ab15176f5297b85182d3689b71f5a3f0195004 100644 (file)
@@ -5,13 +5,14 @@
 package worker
 
 import (
-       "bytes"
+       "fmt"
        "strings"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -56,8 +57,8 @@ type IdleBehavior string
 
 const (
        IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
-       IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
-       IdleBehaviorDrain              = "drain" // shutdown immediately when idle
+       IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
+       IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
 )
 
 var validIdleBehavior = map[IdleBehavior]bool{
@@ -84,47 +85,60 @@ type worker struct {
        busy         time.Time
        destroyed    time.Time
        lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing      chan struct{}
 }
 
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+}
+
 // caller must have lock.
 func (wkr *worker) startContainer(ctr arvados.Container) {
        logger := wkr.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance)
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
-       wkr.state = StateRunning
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
+       if wkr.state != StateRunning {
+               wkr.state = StateRunning
+               go wkr.wp.notify()
+       }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+               rr.Start()
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
@@ -144,57 +158,99 @@ func (wkr *worker) ProbeAndUpdate() {
        }
 }
 
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
 func (wkr *worker) probeAndUpdate() {
        wkr.mtx.Lock()
        updated := wkr.updated
-       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
-       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+       initialState := wkr.state
        wkr.mtx.Unlock()
-       if !needProbeBooted && !needProbeRunning {
-               return
-       }
 
        var (
+               booted   bool
                ctrUUIDs []string
                ok       bool
-               stderr   []byte
+               stderr   []byte // from probeBooted
        )
-       if needProbeBooted {
-               ok, stderr = wkr.probeBooted()
-               wkr.mtx.Lock()
-               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
-                       wkr.logger.Info("instance booted; will try probeRunning")
-                       needProbeRunning = true
+
+       switch initialState {
+       case StateShutdown:
+               return
+       case StateIdle, StateRunning:
+               booted = true
+       case StateUnknown, StateBooting:
+       default:
+               panic(fmt.Sprintf("unknown state %s", initialState))
+       }
+
+       probeStart := time.Now()
+       logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+       if !booted {
+               booted, stderr = wkr.probeBooted()
+               if !booted {
+                       // Pretend this probe succeeded if another
+                       // concurrent attempt succeeded.
+                       wkr.mtx.Lock()
+                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       wkr.mtx.Unlock()
+               }
+               if booted {
+                       logger.Info("instance booted; will try probeRunning")
                }
-               wkr.mtx.Unlock()
        }
-       if needProbeRunning {
-               ctrUUIDs, ok, stderr = wkr.probeRunning()
+       reportedBroken := false
+       if booted || wkr.state == StateUnknown {
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
-       logger := wkr.logger.WithField("stderr", string(stderr))
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
-       if !ok {
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
+       if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
                        // initiated during probe.
                        return
                }
-               dur := time.Since(wkr.probed)
-               logger := logger.WithFields(logrus.Fields{
-                       "Duration": dur,
-                       "State":    wkr.state,
-               })
-               if wkr.state == StateBooting && !needProbeRunning {
-                       // If we know the instance has never passed a
-                       // boot probe, it's not noteworthy that it
-                       // hasn't passed this probe.
-                       logger.Debug("new instance not responding")
-               } else {
-                       logger.Info("instance not responding")
+               // Using the start time of the probe as the timeout
+               // threshold ensures we always initiate at least one
+               // probe attempt after the boot/probe timeout expires
+               // (otherwise, a slow probe failure could cause us to
+               // shutdown an instance even though it did in fact
+               // boot/recover before the timeout expired).
+               dur := probeStart.Sub(wkr.probed)
+               if wkr.shutdownIfBroken(dur) {
+                       // stderr from failed run-probes will have
+                       // been logged already, but boot-probe
+                       // failures are normal so they are logged only
+                       // at Debug level. This is our chance to log
+                       // some evidence about why the node never
+                       // booted, even in non-debug mode.
+                       if !booted {
+                               logger.WithFields(logrus.Fields{
+                                       "Duration": dur,
+                                       "stderr":   string(stderr),
+                               }).Info("boot failed")
+                       }
                }
-               wkr.shutdownIfBroken(dur)
                return
        }
 
@@ -219,49 +275,53 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       running := map[string]struct{}{}
-       changed := false
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
-       if wkr.state == StateUnknown || wkr.state == StateBooting {
+
+       changed := wkr.updateRunning(ctrUUIDs)
+
+       // Update state if this was the first successful boot-probe.
+       if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
                // Note: this will change again below if
                // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
                changed = true
        }
+
+       // If wkr.state and wkr.running aren't changing then there's
+       // no need to log anything, notify the scheduler, move state
+       // back and forth between idle/running, etc.
        if !changed {
                return
        }
 
-       wkr.running = running
+       // Log whenever a run-probe reveals crunch-run processes
+       // appearing/disappearing before boot-probe succeeds.
+       if wkr.state == StateUnknown && changed {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(wkr.running),
+                       "State":             wkr.state,
+               }).Info("crunch-run probe succeeded, but boot probe is still failing")
+       }
+
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
                wkr.state = StateIdle
        }
        wkr.updated = updateTime
-       if needProbeBooted {
+       if booted && (initialState == StateUnknown || initialState == StateBooting) {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -269,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false, stderr
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true, stderr
+       ok = true
+       for _, s := range strings.Split(string(stdout), "\n") {
+               if s == "broken" {
+                       reportsBroken = true
+               } else if s != "" {
+                       running = append(running, s)
+               }
        }
-       return strings.Split(string(stdout), "\n"), true, stderr
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -298,16 +362,17 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 }
 
 // caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               return
+               // Never shut down.
+               return false
        }
        label, threshold := "", wkr.wp.timeoutProbe
        if wkr.state == StateUnknown || wkr.state == StateBooting {
                label, threshold = "new ", wkr.wp.timeoutBooting
        }
        if dur < threshold {
-               return
+               return false
        }
        wkr.logger.WithFields(logrus.Fields{
                "Duration": dur,
@@ -315,24 +380,55 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
                "State":    wkr.state,
        }).Warnf("%sinstance unresponsive, shutting down", label)
        wkr.shutdown()
+       return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
                return false
        }
-       if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
+       draining := wkr.idleBehavior == IdleBehaviorDrain
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.givenup {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
-       age := time.Since(wkr.busy)
-       if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
                return false
        }
        wkr.logger.WithFields(logrus.Fields{
-               "Age":          age,
+               "State":        wkr.state,
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
        wkr.shutdown()
        return true
 }
@@ -357,22 +453,89 @@ func (wkr *worker) shutdown() {
 // match. Caller must have lock.
 func (wkr *worker) saveTags() {
        instance := wkr.instance
-       have := instance.Tags()
-       want := cloud.InstanceTags{
-               tagKeyInstanceType: wkr.instType.Name,
-               tagKeyIdleBehavior: string(wkr.idleBehavior),
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
+               wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+               wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
-       go func() {
-               for k, v := range want {
-                       if v == have[k] {
-                               continue
-                       }
-                       err := instance.SetTags(want)
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
                        if err != nil {
-                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
                        }
-                       break
+               }()
+       }
+}
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
 
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
                }
-       }()
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
 }