import (
"bytes"
+ "encoding/json"
+ "fmt"
"strings"
"sync"
"time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
+const (
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
+)
+
// State indicates whether a worker is available to do work, and (if
// not) whether/when it is expected to become ready.
type State int
StateIdle // instance booted, no containers are running
StateRunning // instance is running one or more containers
StateShutdown // worker has stopped monitoring the instance
- StateHold // running, but not available to run new containers
-)
-
-const (
- // TODO: configurable
- maxPingFailTime = 10 * time.Minute
)
var stateString = map[State]string{
StateIdle: "idle",
StateRunning: "running",
StateShutdown: "shutdown",
- StateHold: "hold",
}
// String implements fmt.Stringer.
return []byte(stateString[s]), nil
}
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+ IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
+ IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
+ IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+ IdleBehaviorRun: true,
+ IdleBehaviorHold: true,
+ IdleBehaviorDrain: true,
+}
+
type worker struct {
logger logrus.FieldLogger
executor Executor
wp *Pool
- mtx sync.Locker // must be wp's Locker.
- state State
- instance cloud.Instance
- instType arvados.InstanceType
- vcpus int64
- memory int64
- appeared time.Time
- probed time.Time
- updated time.Time
- busy time.Time
- destroyed time.Time
- lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
- probing chan struct{}
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ idleBehavior IdleBehavior
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ appeared time.Time
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ destroyed time.Time
+ lastUUID string
+ running map[string]struct{} // remember to update state idle<->running when this changes
+ starting map[string]struct{} // remember to update state idle<->running when this changes
+ probing chan struct{}
}
// caller must have lock.
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance)
+ logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
}
- stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
}
}
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
func (wkr *worker) probeAndUpdate() {
wkr.mtx.Lock()
updated := wkr.updated
- needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
- needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+ initialState := wkr.state
wkr.mtx.Unlock()
- if !needProbeBooted && !needProbeRunning {
- return
- }
var (
+ booted bool
ctrUUIDs []string
ok bool
- stderr []byte
+ stderr []byte // from probeBooted
)
- if needProbeBooted {
- ok, stderr = wkr.probeBooted()
- wkr.mtx.Lock()
- if ok || wkr.state == StateRunning || wkr.state == StateIdle {
- wkr.logger.Info("instance booted; will try probeRunning")
- needProbeRunning = true
+
+ switch initialState {
+ case StateShutdown:
+ return
+ case StateIdle, StateRunning:
+ booted = true
+ case StateUnknown, StateBooting:
+ default:
+ panic(fmt.Sprintf("unknown state %s", initialState))
+ }
+
+ probeStart := time.Now()
+ logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+ if !booted {
+ booted, stderr = wkr.probeBooted()
+ if !booted {
+ // Pretend this probe succeeded if another
+ // concurrent attempt succeeded.
+ wkr.mtx.Lock()
+ booted = wkr.state == StateRunning || wkr.state == StateIdle
+ wkr.mtx.Unlock()
+ }
+ if booted {
+ logger.Info("instance booted; will try probeRunning")
}
- wkr.mtx.Unlock()
}
- if needProbeRunning {
- ctrUUIDs, ok, stderr = wkr.probeRunning()
+ if booted || wkr.state == StateUnknown {
+ ctrUUIDs, ok = wkr.probeRunning()
}
- logger := wkr.logger.WithField("stderr", string(stderr))
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
- if !ok {
+ if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
// initiated during probe.
return
}
- dur := time.Since(wkr.probed)
- logger := logger.WithFields(logrus.Fields{
- "Duration": dur,
- "State": wkr.state,
- })
- if wkr.state == StateBooting && !needProbeRunning {
- // If we know the instance has never passed a
- // boot probe, it's not noteworthy that it
- // hasn't passed this probe.
- logger.Debug("new instance not responding")
- } else {
- logger.Info("instance not responding")
+ // Using the start time of the probe as the timeout
+ // threshold ensures we always initiate at least one
+ // probe attempt after the boot/probe timeout expires
+ // (otherwise, a slow probe failure could cause us to
+ // shutdown an instance even though it did in fact
+ // boot/recover before the timeout expired).
+ dur := probeStart.Sub(wkr.probed)
+ if wkr.shutdownIfBroken(dur) {
+ // stderr from failed run-probes will have
+ // been logged already, but boot-probe
+ // failures are normal so they are logged only
+ // at Debug level. This is our chance to log
+ // some evidence about why the node never
+ // booted, even in non-debug mode.
+ if !booted {
+ logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "stderr": string(stderr),
+ }).Info("boot failed")
+ }
}
- wkr.shutdownIfBroken(dur)
return
}
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- running := map[string]struct{}{}
changed := false
+
+ // Build a new "running" map. Set changed=true if it differs
+ // from the existing map (wkr.running) to ensure the scheduler
+ // gets notified below.
+ running := map[string]struct{}{}
for _, uuid := range ctrUUIDs {
running[uuid] = struct{}{}
if _, ok := wkr.running[uuid]; !ok {
+ if _, ok := wkr.starting[uuid]; !ok {
+ // We didn't start it -- it must have
+ // been started by a previous
+ // dispatcher process.
+ logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ }
changed = true
}
}
changed = true
}
}
- if wkr.state == StateUnknown || wkr.state == StateBooting {
+
+ // Update state if this was the first successful boot-probe.
+ if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+ // Note: this will change again below if
+ // len(wkr.starting)+len(wkr.running) > 0.
wkr.state = StateIdle
changed = true
}
- if changed {
- wkr.running = running
- if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
- wkr.state = StateRunning
- } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
- wkr.state = StateIdle
- }
- wkr.updated = updateTime
- go wkr.wp.notify()
+
+ // If wkr.state and wkr.running aren't changing then there's
+ // no need to log anything, notify the scheduler, move state
+ // back and forth between idle/running, etc.
+ if !changed {
+ return
}
+
+ // Log whenever a run-probe reveals crunch-run processes
+ // appearing/disappearing before boot-probe succeeds.
+ if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(running),
+ "State": wkr.state,
+ }).Info("crunch-run probe succeeded, but boot probe is still failing")
+ }
+
+ wkr.running = running
+ if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+ wkr.state = StateRunning
+ } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+ wkr.state = StateIdle
+ }
+ wkr.updated = updateTime
+ if booted && (initialState == StateUnknown || initialState == StateBooting) {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(running),
+ "State": wkr.state,
+ }).Info("probes succeeded, instance is in service")
+ }
+ go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, ok bool) {
cmd := "crunch-run --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false, stderr
+ return nil, false
}
stdout = bytes.TrimRight(stdout, "\n")
if len(stdout) == 0 {
- return nil, true, stderr
+ return nil, true
}
- return strings.Split(string(stdout), "\n"), true, stderr
+ return strings.Split(string(stdout), "\n"), true
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
}
// caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
- if wkr.state == StateHold {
- return
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ // Never shut down.
+ return false
}
label, threshold := "", wkr.wp.timeoutProbe
- if wkr.state == StateBooting {
+ if wkr.state == StateUnknown || wkr.state == StateBooting {
label, threshold = "new ", wkr.wp.timeoutBooting
}
if dur < threshold {
- return
+ return false
}
wkr.logger.WithFields(logrus.Fields{
"Duration": dur,
"State": wkr.state,
}).Warnf("%sinstance unresponsive, shutting down", label)
wkr.shutdown()
+ return true
}
// caller must have lock.
func (wkr *worker) shutdownIfIdle() bool {
- if wkr.state != StateIdle {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ // Never shut down.
return false
}
age := time.Since(wkr.busy)
- if age < wkr.wp.timeoutIdle {
+
+ old := age >= wkr.wp.timeoutIdle
+ draining := wkr.idleBehavior == IdleBehaviorDrain
+ shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
+ (draining && wkr.state == StateBooting)
+ if !shouldShutdown {
return false
}
- wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+
+ wkr.logger.WithFields(logrus.Fields{
+ "State": wkr.state,
+ "IdleDuration": stats.Duration(age),
+ "IdleBehavior": wkr.idleBehavior,
+ }).Info("shutdown idle worker")
wkr.shutdown()
return true
}
-// caller must have lock
+// caller must have lock.
func (wkr *worker) shutdown() {
now := time.Now()
wkr.updated = now
}
}()
}
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+ instance := wkr.instance
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
+ tagKeyInstanceType: wkr.instType.Name,
+ tagKeyIdleBehavior: string(wkr.idleBehavior),
+ }
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
+ if err != nil {
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
+ }
+ }()
+ }
+}