package worker
import (
- "bytes"
"fmt"
"strings"
"sync"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
const (
IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
- IdleBehaviorHold = "hold" // don't shutdown or run more containers
- IdleBehaviorDrain = "drain" // shutdown immediately when idle
+ IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
+ IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
)
var validIdleBehavior = map[IdleBehavior]bool{
busy time.Time
destroyed time.Time
lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
}
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
logger := wkr.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance)
logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
- wkr.state = StateRunning
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
+ if wkr.state != StateRunning {
+ wkr.state = StateRunning
+ go wkr.wp.notify()
+ }
go func() {
- env := map[string]string{
- "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
- "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
- }
- stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+ rr.Start()
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
wkr.updated = now
wkr.busy = now
delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
+ wkr.running[ctr.UUID] = rr
wkr.lastUUID = ctr.UUID
}()
}
booted bool
ctrUUIDs []string
ok bool
- stderr []byte
+ stderr []byte // from probeBooted
)
switch initialState {
panic(fmt.Sprintf("unknown state %s", initialState))
}
+ probeStart := time.Now()
+ logger := wkr.logger.WithField("ProbeStart", probeStart)
+
if !booted {
booted, stderr = wkr.probeBooted()
if !booted {
wkr.mtx.Lock()
booted = wkr.state == StateRunning || wkr.state == StateIdle
wkr.mtx.Unlock()
- } else {
- wkr.logger.Info("instance booted; will try probeRunning")
+ }
+ if booted {
+ logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok, stderr = wkr.probeRunning()
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
- logger := wkr.logger.WithField("stderr", string(stderr))
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
// initiated during probe.
return
}
- dur := time.Since(wkr.probed)
- logger := logger.WithFields(logrus.Fields{
- "Duration": dur,
- "State": wkr.state,
- })
- if !booted {
- // While we're polling the VM to see if it's
- // finished booting, failures are not
- // noteworthy, so we log at Debug level.
- logger.Debug("new instance not responding")
- } else {
- logger.Info("instance not responding")
+ // Using the start time of the probe as the timeout
+ // threshold ensures we always initiate at least one
+ // probe attempt after the boot/probe timeout expires
+ // (otherwise, a slow probe failure could cause us to
+ // shutdown an instance even though it did in fact
+ // boot/recover before the timeout expired).
+ dur := probeStart.Sub(wkr.probed)
+ if wkr.shutdownIfBroken(dur) {
+ // stderr from failed run-probes will have
+ // been logged already, but boot-probe
+ // failures are normal so they are logged only
+ // at Debug level. This is our chance to log
+ // some evidence about why the node never
+ // booted, even in non-debug mode.
+ if !booted {
+ logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "stderr": string(stderr),
+ }).Info("boot failed")
+ }
}
- wkr.shutdownIfBroken(dur)
return
}
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- running := map[string]struct{}{}
- changed := false
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wkr.wp.notifyExited(uuid, updateTime)
- changed = true
- }
- }
+
+ changed := wkr.updateRunning(ctrUUIDs)
+
+ // Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
// Note: this will change again below if
// len(wkr.starting)+len(wkr.running) > 0.
wkr.state = StateIdle
changed = true
- } else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
- logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
- "State": wkr.state,
- }).Info("crunch-run probe succeeded, but boot probe is still failing")
}
+
+ // If wkr.state and wkr.running aren't changing then there's
+ // no need to log anything, notify the scheduler, move state
+ // back and forth between idle/running, etc.
if !changed {
return
}
- wkr.running = running
+ // Log whenever a run-probe reveals crunch-run processes
+ // appearing/disappearing before boot-probe succeeds.
+ if wkr.state == StateUnknown && changed {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(wkr.running),
+ "State": wkr.state,
+ }).Info("crunch-run probe succeeded, but boot probe is still failing")
+ }
+
if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
wkr.state = StateRunning
} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("probes succeeded, instance is in service")
}
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd := "crunch-run --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false, stderr
+ return
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true, stderr
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
- return strings.Split(string(stdout), "\n"), true, stderr
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
}
// caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
if wkr.idleBehavior == IdleBehaviorHold {
- return
+ // Never shut down.
+ return false
}
label, threshold := "", wkr.wp.timeoutProbe
if wkr.state == StateUnknown || wkr.state == StateBooting {
label, threshold = "new ", wkr.wp.timeoutBooting
}
if dur < threshold {
- return
+ return false
}
wkr.logger.WithFields(logrus.Fields{
"Duration": dur,
"State": wkr.state,
}).Warnf("%sinstance unresponsive, shutting down", label)
wkr.shutdown()
+ return true
}
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
// caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
if wkr.idleBehavior == IdleBehaviorHold {
return false
}
- if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
+ draining := wkr.idleBehavior == IdleBehaviorDrain
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.givenup {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.givenup {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
return false
}
- age := time.Since(wkr.busy)
- if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
return false
}
wkr.logger.WithFields(logrus.Fields{
- "Age": age,
+ "State": wkr.state,
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
- }).Info("shutdown idle worker")
+ }).Info("shutdown worker")
wkr.shutdown()
return true
}
// match. Caller must have lock.
func (wkr *worker) saveTags() {
instance := wkr.instance
- have := instance.Tags()
- want := cloud.InstanceTags{
- tagKeyInstanceType: wkr.instType.Name,
- tagKeyIdleBehavior: string(wkr.idleBehavior),
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
+ wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+ wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
}
- go func() {
- for k, v := range want {
- if v == have[k] {
- continue
- }
- err := instance.SetTags(want)
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
if err != nil {
- wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
}
- break
+ }()
+ }
+}
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
}
- }()
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
}