14807: Split instance count/size/cost metrics by idle/hold status.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index c24efeb3c577eda3498b89cb2d5d730e45e54b15..f70e2099843fdb0fdb2f511f49b8ea58fb05c6e8 100644 (file)
@@ -5,7 +5,21 @@
 package worker
 
 import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "strings"
+       "sync"
        "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
 )
 
 // State indicates whether a worker is available to do work, and (if
@@ -18,12 +32,6 @@ const (
        StateIdle                  // instance booted, no containers are running
        StateRunning               // instance is running one or more containers
        StateShutdown              // worker has stopped monitoring the instance
-       StateHold                  // running, but not available to run new containers
-)
-
-const (
-       // TODO: configurable
-       maxPingFailTime = 10 * time.Minute
 )
 
 var stateString = map[State]string{
@@ -32,7 +40,6 @@ var stateString = map[State]string{
        StateIdle:     "idle",
        StateRunning:  "running",
        StateShutdown: "shutdown",
-       StateHold:     "hold",
 }
 
 // String implements fmt.Stringer.
@@ -45,3 +52,415 @@ func (s State) String() string {
 func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
+
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+       IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
+       IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
+       IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+       IdleBehaviorRun:   true,
+       IdleBehaviorHold:  true,
+       IdleBehaviorDrain: true,
+}
+
+type worker struct {
+       logger   logrus.FieldLogger
+       executor Executor
+       wp       *Pool
+
+       mtx          sync.Locker // must be wp's Locker.
+       state        State
+       idleBehavior IdleBehavior
+       instance     cloud.Instance
+       instType     arvados.InstanceType
+       vcpus        int64
+       memory       int64
+       appeared     time.Time
+       probed       time.Time
+       updated      time.Time
+       busy         time.Time
+       destroyed    time.Time
+       lastUUID     string
+       running      map[string]struct{} // remember to update state idle<->running when this changes
+       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       probing      chan struct{}
+}
+
+// caller must have lock.
+func (wkr *worker) startContainer(ctr arvados.Container) {
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "Priority":      ctr.Priority,
+       })
+       logger = logger.WithField("Instance", wkr.instance.ID())
+       logger.Debug("starting container")
+       wkr.starting[ctr.UUID] = struct{}{}
+       wkr.state = StateRunning
+       go func() {
+               env := map[string]string{
+                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+               }
+               if wkr.wp.arvClient.Insecure {
+                       env["ARVADOS_API_HOST_INSECURE"] = "1"
+               }
+               envJSON, err := json.Marshal(env)
+               if err != nil {
+                       panic(err)
+               }
+               stdin := bytes.NewBuffer(envJSON)
+               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+               if u := wkr.instance.RemoteUser(); u != "root" {
+                       cmd = "sudo " + cmd
+               }
+               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+               wkr.mtx.Lock()
+               defer wkr.mtx.Unlock()
+               now := time.Now()
+               wkr.updated = now
+               wkr.busy = now
+               delete(wkr.starting, ctr.UUID)
+               wkr.running[ctr.UUID] = struct{}{}
+               wkr.lastUUID = ctr.UUID
+               if err != nil {
+                       logger.WithField("stdout", string(stdout)).
+                               WithField("stderr", string(stderr)).
+                               WithError(err).
+                               Error("error starting crunch-run process")
+                       // Leave uuid in wkr.running, though: it's
+                       // possible the error was just a communication
+                       // failure and the process was in fact
+                       // started.  Wait for next probe to find out.
+                       return
+               }
+               logger.Info("crunch-run process started")
+               wkr.lastUUID = ctr.UUID
+       }()
+}
+
+// ProbeAndUpdate conducts appropriate boot/running probes (if any)
+// for the worker's curent state. If a previous probe is still
+// running, it does nothing.
+//
+// It should be called in a new goroutine.
+func (wkr *worker) ProbeAndUpdate() {
+       select {
+       case wkr.probing <- struct{}{}:
+               wkr.probeAndUpdate()
+               <-wkr.probing
+       default:
+               wkr.logger.Debug("still waiting for last probe to finish")
+       }
+}
+
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
+func (wkr *worker) probeAndUpdate() {
+       wkr.mtx.Lock()
+       updated := wkr.updated
+       initialState := wkr.state
+       wkr.mtx.Unlock()
+
+       var (
+               booted   bool
+               ctrUUIDs []string
+               ok       bool
+               stderr   []byte // from probeBooted
+       )
+
+       switch initialState {
+       case StateShutdown:
+               return
+       case StateIdle, StateRunning:
+               booted = true
+       case StateUnknown, StateBooting:
+       default:
+               panic(fmt.Sprintf("unknown state %s", initialState))
+       }
+
+       probeStart := time.Now()
+       logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+       if !booted {
+               booted, stderr = wkr.probeBooted()
+               if !booted {
+                       // Pretend this probe succeeded if another
+                       // concurrent attempt succeeded.
+                       wkr.mtx.Lock()
+                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       wkr.mtx.Unlock()
+               }
+               if booted {
+                       logger.Info("instance booted; will try probeRunning")
+               }
+       }
+       if booted || wkr.state == StateUnknown {
+               ctrUUIDs, ok = wkr.probeRunning()
+       }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
+               if wkr.state == StateShutdown && wkr.updated.After(updated) {
+                       // Skip the logging noise if shutdown was
+                       // initiated during probe.
+                       return
+               }
+               // Using the start time of the probe as the timeout
+               // threshold ensures we always initiate at least one
+               // probe attempt after the boot/probe timeout expires
+               // (otherwise, a slow probe failure could cause us to
+               // shutdown an instance even though it did in fact
+               // boot/recover before the timeout expired).
+               dur := probeStart.Sub(wkr.probed)
+               if wkr.shutdownIfBroken(dur) {
+                       // stderr from failed run-probes will have
+                       // been logged already, but boot-probe
+                       // failures are normal so they are logged only
+                       // at Debug level. This is our chance to log
+                       // some evidence about why the node never
+                       // booted, even in non-debug mode.
+                       if !booted {
+                               logger.WithFields(logrus.Fields{
+                                       "Duration": dur,
+                                       "stderr":   string(stderr),
+                               }).Info("boot failed")
+                       }
+               }
+               return
+       }
+
+       updateTime := time.Now()
+       wkr.probed = updateTime
+
+       if updated != wkr.updated {
+               // Worker was updated after the probe began, so
+               // wkr.running might have a container UUID that was
+               // not yet running when ctrUUIDs was generated. Leave
+               // wkr.running alone and wait for the next probe to
+               // catch up on any changes.
+               return
+       }
+
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       } else if len(wkr.running) > 0 {
+               // Actual last-busy time was sometime between wkr.busy
+               // and now. Now is the earliest opportunity to take
+               // advantage of the non-busy state, though.
+               wkr.busy = updateTime
+       }
+       changed := false
+
+       // Build a new "running" map. Set changed=true if it differs
+       // from the existing map (wkr.running) to ensure the scheduler
+       // gets notified below.
+       running := map[string]struct{}{}
+       for _, uuid := range ctrUUIDs {
+               running[uuid] = struct{}{}
+               if _, ok := wkr.running[uuid]; !ok {
+                       if _, ok := wkr.starting[uuid]; !ok {
+                               // We didn't start it -- it must have
+                               // been started by a previous
+                               // dispatcher process.
+                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       }
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if _, ok := running[uuid]; !ok {
+                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+                       wkr.wp.notifyExited(uuid, updateTime)
+                       changed = true
+               }
+       }
+
+       // Update state if this was the first successful boot-probe.
+       if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               // Note: this will change again below if
+               // len(wkr.starting)+len(wkr.running) > 0.
+               wkr.state = StateIdle
+               changed = true
+       }
+
+       // If wkr.state and wkr.running aren't changing then there's
+       // no need to log anything, notify the scheduler, move state
+       // back and forth between idle/running, etc.
+       if !changed {
+               return
+       }
+
+       // Log whenever a run-probe reveals crunch-run processes
+       // appearing/disappearing before boot-probe succeeds.
+       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("crunch-run probe succeeded, but boot probe is still failing")
+       }
+
+       wkr.running = running
+       if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+               wkr.state = StateRunning
+       } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+               wkr.state = StateIdle
+       }
+       wkr.updated = updateTime
+       if booted && (initialState == StateUnknown || initialState == StateBooting) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("probes succeeded, instance is in service")
+       }
+       go wkr.wp.notify()
+}
+
+func (wkr *worker) probeRunning() (running []string, ok bool) {
+       cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+       if err != nil {
+               wkr.logger.WithFields(logrus.Fields{
+                       "Command": cmd,
+                       "stdout":  string(stdout),
+                       "stderr":  string(stderr),
+               }).WithError(err).Warn("probe failed")
+               return nil, false
+       }
+       stdout = bytes.TrimRight(stdout, "\n")
+       if len(stdout) == 0 {
+               return nil, true
+       }
+       return strings.Split(string(stdout), "\n"), true
+}
+
+func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+       cmd := wkr.wp.bootProbeCommand
+       if cmd == "" {
+               cmd = "true"
+       }
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       })
+       if err != nil {
+               logger.WithError(err).Debug("boot probe failed")
+               return false, stderr
+       }
+       logger.Info("boot probe succeeded")
+       return true, stderr
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
+       if wkr.idleBehavior == IdleBehaviorHold {
+               // Never shut down.
+               return false
+       }
+       label, threshold := "", wkr.wp.timeoutProbe
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
+               label, threshold = "new ", wkr.wp.timeoutBooting
+       }
+       if dur < threshold {
+               return false
+       }
+       wkr.logger.WithFields(logrus.Fields{
+               "Duration": dur,
+               "Since":    wkr.probed,
+               "State":    wkr.state,
+       }).Warnf("%sinstance unresponsive, shutting down", label)
+       wkr.shutdown()
+       return true
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if wkr.idleBehavior == IdleBehaviorHold {
+               // Never shut down.
+               return false
+       }
+       age := time.Since(wkr.busy)
+
+       old := age >= wkr.wp.timeoutIdle
+       draining := wkr.idleBehavior == IdleBehaviorDrain
+       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
+               (draining && wkr.state == StateBooting)
+       if !shouldShutdown {
+               return false
+       }
+
+       wkr.logger.WithFields(logrus.Fields{
+               "State":        wkr.state,
+               "Age":          age,
+               "IdleBehavior": wkr.idleBehavior,
+       }).Info("shutdown idle worker")
+       wkr.shutdown()
+       return true
+}
+
+// caller must have lock.
+func (wkr *worker) shutdown() {
+       now := time.Now()
+       wkr.updated = now
+       wkr.destroyed = now
+       wkr.state = StateShutdown
+       go wkr.wp.notify()
+       go func() {
+               err := wkr.instance.Destroy()
+               if err != nil {
+                       wkr.logger.WithError(err).Warn("shutdown failed")
+                       return
+               }
+       }()
+}
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+       instance := wkr.instance
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
+               tagKeyInstanceType: wkr.instType.Name,
+               tagKeyIdleBehavior: string(wkr.idleBehavior),
+       }
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
+                       if err != nil {
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
+                       }
+               }()
+       }
+}