14325: Clean up unsafe concurrency in tests.
[arvados.git] / lib / dispatchcloud / worker / worker.go
index 85104a13aa9aa745fbdfe7a6a0e32b0eb29219b8..a75d2bbb88611225c709a98f151dded697b5974c 100644 (file)
@@ -6,13 +6,19 @@ package worker
 
 import (
        "bytes"
+       "fmt"
        "strings"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "github.com/sirupsen/logrus"
+)
+
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
 )
 
 // State indicates whether a worker is available to do work, and (if
@@ -25,12 +31,6 @@ const (
        StateIdle                  // instance booted, no containers are running
        StateRunning               // instance is running one or more containers
        StateShutdown              // worker has stopped monitoring the instance
-       StateHold                  // running, but not available to run new containers
-)
-
-const (
-       // TODO: configurable
-       maxPingFailTime = 10 * time.Minute
 )
 
 var stateString = map[State]string{
@@ -39,7 +39,6 @@ var stateString = map[State]string{
        StateIdle:     "idle",
        StateRunning:  "running",
        StateShutdown: "shutdown",
-       StateHold:     "hold",
 }
 
 // String implements fmt.Stringer.
@@ -53,25 +52,42 @@ func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
 
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+       IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
+       IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
+       IdleBehaviorDrain              = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+       IdleBehaviorRun:   true,
+       IdleBehaviorHold:  true,
+       IdleBehaviorDrain: true,
+}
+
 type worker struct {
        logger   logrus.FieldLogger
        executor Executor
        wp       *Pool
 
-       mtx       sync.Locker // must be wp's Locker.
-       state     State
-       instance  cloud.Instance
-       instType  arvados.InstanceType
-       vcpus     int64
-       memory    int64
-       probed    time.Time
-       updated   time.Time
-       busy      time.Time
-       destroyed time.Time
-       lastUUID  string
-       running   map[string]struct{} // remember to update state idle<->running when this changes
-       starting  map[string]struct{} // remember to update state idle<->running when this changes
-       probing   chan struct{}
+       mtx          sync.Locker // must be wp's Locker.
+       state        State
+       idleBehavior IdleBehavior
+       instance     cloud.Instance
+       instType     arvados.InstanceType
+       vcpus        int64
+       memory       int64
+       appeared     time.Time
+       probed       time.Time
+       updated      time.Time
+       busy         time.Time
+       destroyed    time.Time
+       lastUUID     string
+       running      map[string]struct{} // remember to update state idle<->running when this changes
+       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       probing      chan struct{}
 }
 
 // caller must have lock.
@@ -85,7 +101,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
        go func() {
-               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+               env := map[string]string{
+                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+               }
+               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -125,38 +145,65 @@ func (wkr *worker) ProbeAndUpdate() {
        }
 }
 
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
 func (wkr *worker) probeAndUpdate() {
        wkr.mtx.Lock()
        updated := wkr.updated
-       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
-       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+       initialState := wkr.state
        wkr.mtx.Unlock()
-       if !needProbeBooted && !needProbeRunning {
-               return
-       }
 
        var (
+               booted   bool
                ctrUUIDs []string
                ok       bool
                stderr   []byte
        )
-       if needProbeBooted {
-               ok, stderr = wkr.probeBooted()
-               wkr.mtx.Lock()
-               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+
+       switch initialState {
+       case StateShutdown:
+               return
+       case StateIdle, StateRunning:
+               booted = true
+       case StateUnknown, StateBooting:
+       default:
+               panic(fmt.Sprintf("unknown state %s", initialState))
+       }
+
+       if !booted {
+               booted, stderr = wkr.probeBooted()
+               if !booted {
+                       // Pretend this probe succeeded if another
+                       // concurrent attempt succeeded.
+                       wkr.mtx.Lock()
+                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       wkr.mtx.Unlock()
+               } else {
                        wkr.logger.Info("instance booted; will try probeRunning")
-                       needProbeRunning = true
                }
-               wkr.mtx.Unlock()
        }
-       if needProbeRunning {
+       if booted || wkr.state == StateUnknown {
                ctrUUIDs, ok, stderr = wkr.probeRunning()
        }
        logger := wkr.logger.WithField("stderr", string(stderr))
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
-       if !ok {
+       if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
                        // initiated during probe.
@@ -167,7 +214,10 @@ func (wkr *worker) probeAndUpdate() {
                        "Duration": dur,
                        "State":    wkr.state,
                })
-               if wkr.state == StateBooting {
+               if !booted {
+                       // While we're polling the VM to see if it's
+                       // finished booting, failures are not
+                       // noteworthy, so we log at Debug level.
                        logger.Debug("new instance not responding")
                } else {
                        logger.Info("instance not responding")
@@ -212,25 +262,40 @@ func (wkr *worker) probeAndUpdate() {
                        changed = true
                }
        }
-       if wkr.state == StateUnknown || wkr.state == StateBooting {
+       if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               // Note: this will change again below if
+               // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
                changed = true
+       } else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
-       if changed {
-               wkr.running = running
-               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
-                       wkr.state = StateRunning
-               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = updateTime
-               go wkr.wp.notify()
+       if !changed {
+               return
+       }
+
+       wkr.running = running
+       if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+               wkr.state = StateRunning
+       } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+               wkr.state = StateIdle
+       }
+       wkr.updated = updateTime
+       if booted && (initialState == StateUnknown || initialState == StateBooting) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("probes succeeded, instance is in service")
        }
+       go wkr.wp.notify()
 }
 
 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
        cmd := "crunch-run --list"
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
                        "Command": cmd,
@@ -251,7 +316,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
        if cmd == "" {
                cmd = "true"
        }
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        logger := wkr.logger.WithFields(logrus.Fields{
                "Command": cmd,
                "stdout":  string(stdout),
@@ -267,11 +332,11 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
-       if wkr.state == StateHold {
+       if wkr.idleBehavior == IdleBehaviorHold {
                return
        }
        label, threshold := "", wkr.wp.timeoutProbe
-       if wkr.state == StateBooting {
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
                label, threshold = "new ", wkr.wp.timeoutBooting
        }
        if dur < threshold {
@@ -287,19 +352,25 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
 
 // caller must have lock.
 func (wkr *worker) shutdownIfIdle() bool {
-       if wkr.state != StateIdle {
+       if wkr.idleBehavior == IdleBehaviorHold {
+               return false
+       }
+       if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
                return false
        }
        age := time.Since(wkr.busy)
-       if age < wkr.wp.timeoutIdle {
+       if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
                return false
        }
-       wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+       wkr.logger.WithFields(logrus.Fields{
+               "Age":          age,
+               "IdleBehavior": wkr.idleBehavior,
+       }).Info("shutdown idle worker")
        wkr.shutdown()
        return true
 }
 
-// caller must have lock
+// caller must have lock.
 func (wkr *worker) shutdown() {
        now := time.Now()
        wkr.updated = now
@@ -314,3 +385,27 @@ func (wkr *worker) shutdown() {
                }
        }()
 }
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+       instance := wkr.instance
+       have := instance.Tags()
+       want := cloud.InstanceTags{
+               tagKeyInstanceType: wkr.instType.Name,
+               tagKeyIdleBehavior: string(wkr.idleBehavior),
+       }
+       go func() {
+               for k, v := range want {
+                       if v == have[k] {
+                               continue
+                       }
+                       err := instance.SetTags(want)
+                       if err != nil {
+                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                       }
+                       break
+
+               }
+       }()
+}