ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
+ TimeoutKILL: arvados.Duration(20 * time.Millisecond),
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
- KillContainer(uuid string)
+ KillContainer(uuid, reason string)
Subscribe() <-chan struct{}
Unsubscribe(<-chan struct{})
}
if !running {
go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
go sch.kill(uuid, "priority=0")
}
// of kill() will be to make the
// worker available for the next
// container.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
}
func (sch *Scheduler) kill(uuid string, reason string) {
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
+ sch.pool.KillContainer(uuid, reason)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
return 1
} else {
fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ return 0
}
- return 0
}
if command == "true" {
return 0
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutKILL = time.Second * 20
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutKILL: duration(cluster.Dispatch.TimeoutKILL, defaultTimeoutKILL),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
stop: make(chan bool),
}
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutKILL time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
// private state
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
}
- }
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
+ if rr != nil {
+ rr.Kill(reason)
+ return
}
- wkr.updated = time.Now()
- go wp.notify()
}
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
--- /dev/null
+package worker
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+ uuid string
+ executor Executor
+ arvClient *arvados.Client
+ remoteUser string
+ timeoutTERM time.Duration
+ timeoutKILL time.Duration
+ timeoutSignal time.Duration
+ onUnkillable func(uuid string) // callback invoked when giving up on SIGKILL
+ onKilled func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
+ logger logrus.FieldLogger
+
+ stopping bool // true if Stop() has been called
+ sentKILL bool // true if SIGKILL has been sent
+ closed chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ rr := &remoteRunner{
+ uuid: uuid,
+ executor: wkr.executor,
+ arvClient: wkr.wp.arvClient,
+ remoteUser: wkr.instance.RemoteUser(),
+ timeoutTERM: wkr.wp.timeoutTERM,
+ timeoutKILL: wkr.wp.timeoutKILL,
+ timeoutSignal: wkr.wp.timeoutSignal,
+ onUnkillable: wkr.onUnkillable,
+ onKilled: wkr.onKilled,
+ logger: wkr.logger.WithField("ContainerUUID", uuid),
+ closed: make(chan struct{}),
+ }
+ return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+ env := map[string]string{
+ "ARVADOS_API_HOST": rr.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+ }
+ if rr.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+ if err != nil {
+ rr.logger.WithField("stdout", string(stdout)).
+ WithField("stderr", string(stderr)).
+ WithError(err).
+ Error("error starting crunch-run process")
+ return
+ }
+ rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+ close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process,
+// escalating from SIGTERM to SIGKILL to onUnkillable() according to
+// the configured timeouts.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+ if rr.stopping {
+ return
+ }
+ rr.stopping = true
+ rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+ go func() {
+ termDeadline := time.Now().Add(rr.timeoutTERM)
+ killDeadline := termDeadline.Add(rr.timeoutKILL)
+ t := time.NewTicker(rr.timeoutSignal)
+ defer t.Stop()
+ for range t.C {
+ switch {
+ case rr.isClosed():
+ return
+ case time.Now().After(killDeadline):
+ rr.onUnkillable(rr.uuid)
+ return
+ case time.Now().After(termDeadline):
+ rr.sentKILL = true
+ rr.kill(syscall.SIGKILL)
+ default:
+ rr.kill(syscall.SIGTERM)
+ }
+ }
+ }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+ logger := rr.logger.WithField("Signal", int(sig))
+ logger.Info("sending signal")
+ cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ if rr.remoteUser != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+ if err != nil {
+ logger.WithFields(logrus.Fields{
+ "stderr": string(stderr),
+ "stdout": string(stdout),
+ "error": err,
+ }).Info("kill failed")
+ return
+ }
+ rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+ select {
+ case <-rr.closed:
+ return true
+ default:
+ return false
+ }
+}
import (
"bytes"
- "encoding/json"
"fmt"
"strings"
"sync"
busy time.Time
destroyed time.Time
lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
probing chan struct{}
}
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
// caller must have lock.
func (wkr *worker) startContainer(ctr arvados.Container) {
logger := wkr.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
- wkr.starting[ctr.UUID] = struct{}{}
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
if wkr.state != StateRunning {
wkr.state = StateRunning
go wkr.wp.notify()
}
go func() {
- env := map[string]string{
- "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
- "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
- }
- if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
- }
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
- cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+ rr.Start()
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
wkr.updated = now
wkr.busy = now
delete(wkr.starting, ctr.UUID)
- wkr.running[ctr.UUID] = struct{}{}
- wkr.lastUUID = ctr.UUID
- if err != nil {
- logger.WithField("stdout", string(stdout)).
- WithField("stderr", string(stderr)).
- WithError(err).
- Error("error starting crunch-run process")
- // Leave uuid in wkr.running, though: it's
- // possible the error was just a communication
- // failure and the process was in fact
- // started. Wait for next probe to find out.
- return
- }
- logger.Info("crunch-run process started")
+ wkr.running[ctr.UUID] = rr
wkr.lastUUID = ctr.UUID
}()
}
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- changed := false
- // Build a new "running" map. Set changed=true if it differs
- // from the existing map (wkr.running) to ensure the scheduler
- // gets notified below.
- running := map[string]struct{}{}
- for _, uuid := range ctrUUIDs {
- running[uuid] = struct{}{}
- if _, ok := wkr.running[uuid]; !ok {
- if _, ok := wkr.starting[uuid]; !ok {
- // We didn't start it -- it must have
- // been started by a previous
- // dispatcher process.
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
- }
- changed = true
- }
- }
- for uuid := range wkr.running {
- if _, ok := running[uuid]; !ok {
- logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
- wkr.wp.notifyExited(uuid, updateTime)
- changed = true
- }
- }
+ changed := wkr.updateRunning(ctrUUIDs)
// Update state if this was the first successful boot-probe.
if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
// Log whenever a run-probe reveals crunch-run processes
// appearing/disappearing before boot-probe succeeds.
- if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ if wkr.state == StateUnknown && changed {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("crunch-run probe succeeded, but boot probe is still failing")
}
- wkr.running = running
if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
wkr.state = StateRunning
} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
wkr.updated = updateTime
if booted && (initialState == StateUnknown || initialState == StateBooting) {
logger.WithFields(logrus.Fields{
- "RunningContainers": len(running),
+ "RunningContainers": len(wkr.running),
"State": wkr.state,
}).Info("probes succeeded, instance is in service")
}
return true
}
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
// caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
if wkr.idleBehavior == IdleBehaviorHold {
- // Never shut down.
return false
}
- age := time.Since(wkr.busy)
-
- old := age >= wkr.wp.timeoutIdle
draining := wkr.idleBehavior == IdleBehaviorDrain
- shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
- (draining && wkr.state == StateBooting)
- if !shouldShutdown {
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.sentKILL {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.sentKILL {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
return false
}
+}
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
+ return false
+ }
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "IdleDuration": stats.Duration(age),
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
"IdleBehavior": wkr.idleBehavior,
- }).Info("shutdown idle worker")
+ }).Info("shutdown worker")
wkr.shutdown()
return true
}
}()
}
}
+
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
+}
// Maximum total worker probes per second
MaxProbesPerSecond int
+
+ // Time before repeating TERM/KILL signal
+ TimeoutSignal Duration
+
+ // Time to give up on TERM and move to KILL
+ TimeoutTERM Duration
+
+ // Time to give up on KILL and write off the worker
+ TimeoutKILL Duration
}
type CloudVMs struct {