From cd020c016106fbe844501c5f434c16f4def4e08d Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 18 Mar 2019 11:09:09 -0400 Subject: [PATCH] 14807: Escalate sigterm->sigkill->drain instance. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/dispatchcloud/dispatcher_test.go | 3 + lib/dispatchcloud/scheduler/interfaces.go | 2 +- lib/dispatchcloud/scheduler/sync.go | 10 +- lib/dispatchcloud/test/stub_driver.go | 2 +- lib/dispatchcloud/worker/pool.go | 74 +++----- lib/dispatchcloud/worker/runner.go | 150 +++++++++++++++ lib/dispatchcloud/worker/worker.go | 211 ++++++++++++++-------- sdk/go/arvados/config.go | 9 + 8 files changed, 331 insertions(+), 130 deletions(-) create mode 100644 lib/dispatchcloud/worker/runner.go diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index fb11b22352..d0b4efefa3 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -62,6 +62,9 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { ProbeInterval: arvados.Duration(5 * time.Millisecond), StaleLockTimeout: arvados.Duration(5 * time.Millisecond), MaxProbesPerSecond: 1000, + TimeoutSignal: arvados.Duration(3 * time.Millisecond), + TimeoutTERM: arvados.Duration(20 * time.Millisecond), + TimeoutKILL: arvados.Duration(20 * time.Millisecond), }, InstanceTypes: arvados.InstanceTypeMap{ test.InstanceType(1).Name: test.InstanceType(1), diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go index 18cdc94fa5..307807e323 100644 --- a/lib/dispatchcloud/scheduler/interfaces.go +++ b/lib/dispatchcloud/scheduler/interfaces.go @@ -38,7 +38,7 @@ type WorkerPool interface { Create(arvados.InstanceType) bool Shutdown(arvados.InstanceType) bool StartContainer(arvados.InstanceType, arvados.Container) bool - KillContainer(uuid string) + KillContainer(uuid, reason string) Subscribe() <-chan struct{} Unsubscribe(<-chan struct{}) } diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go index 28b9fd3385..99bee484c6 100644 --- a/lib/dispatchcloud/scheduler/sync.go +++ b/lib/dispatchcloud/scheduler/sync.go @@ -32,7 +32,7 @@ func (sch *Scheduler) sync() { if !running { go sch.cancel(uuid, "not running on any worker") } else if !exited.IsZero() && qUpdated.After(exited) { - go sch.cancel(uuid, "state=\"Running\" after crunch-run exited") + go sch.cancel(uuid, "state=Running after crunch-run exited") } else if ent.Container.Priority == 0 { go sch.kill(uuid, "priority=0") } @@ -46,7 +46,7 @@ func (sch *Scheduler) sync() { // of kill() will be to make the // worker available for the next // container. - go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State)) } else { sch.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, @@ -60,7 +60,7 @@ func (sch *Scheduler) sync() { // a network outage and is still // preparing to run a container that // has already been unlocked/requeued. - go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State)) } case arvados.ContainerStateLocked: if running && !exited.IsZero() && qUpdated.After(exited) { @@ -98,9 +98,7 @@ func (sch *Scheduler) cancel(uuid string, reason string) { } func (sch *Scheduler) kill(uuid string, reason string) { - logger := sch.logger.WithField("ContainerUUID", uuid) - logger.Debugf("killing crunch-run process because %s", reason) - sch.pool.KillContainer(uuid) + sch.pool.KillContainer(uuid, reason) } func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) { diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index cd1ed5f5a4..28fee6d927 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -340,8 +340,8 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, return 1 } else { fmt.Fprintf(stderr, "%s: container is not running\n", uuid) + return 0 } - return 0 } if command == "true" { return 0 diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index e90935e2aa..d13e29c1e8 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -68,6 +68,9 @@ const ( defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutKILL = time.Second * 20 + defaultTimeoutSignal = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -105,6 +108,9 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM), + timeoutKILL: duration(cluster.Dispatch.TimeoutKILL, defaultTimeoutKILL), + timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, stop: make(chan bool), } @@ -136,6 +142,9 @@ type Pool struct { timeoutBooting time.Duration timeoutProbe time.Duration timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutKILL time.Duration + timeoutSignal time.Duration installPublicKey ssh.PublicKey // private state @@ -319,9 +328,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) if !ok { return errors.New("requested instance does not exist") } - wkr.idleBehavior = idleBehavior - wkr.saveTags() - wkr.shutdownIfIdle() + wkr.setIdleBehavior(idleBehavior) return nil } @@ -383,19 +390,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor probed: now, busy: now, updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), + running: make(map[string]*remoteRunner), + starting: make(map[string]*remoteRunner), probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true } -// caller must have lock. -func (wp *Pool) notifyExited(uuid string, t time.Time) { - wp.exited[uuid] = t -} - // Shutdown shuts down a worker with the given type, or returns false // if all workers with the given type are busy. func (wp *Pool) Shutdown(it arvados.InstanceType) bool { @@ -486,53 +488,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // // KillContainer returns immediately; the act of killing the container // takes some time, and runs in the background. -func (wp *Pool) KillContainer(uuid string) { +func (wp *Pool) KillContainer(uuid string, reason string) { wp.mtx.Lock() defer wp.mtx.Unlock() + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Reason": reason, + }) if _, ok := wp.exited[uuid]; ok { - wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + logger.Debug("clearing placeholder for exited crunch-run process") delete(wp.exited, uuid) return } for _, wkr := range wp.workers { - if _, ok := wkr.running[uuid]; ok { - go wp.kill(wkr, uuid) - return + rr := wkr.running[uuid] + if rr == nil { + rr = wkr.starting[uuid] } - } - wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") -} - -func (wp *Pool) kill(wkr *worker, uuid string) { - logger := wp.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Instance": wkr.instance.ID(), - }) - logger.Debug("killing process") - cmd := "crunch-run --kill 15 " + uuid - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) - if err != nil { - logger.WithFields(logrus.Fields{ - "stderr": string(stderr), - "stdout": string(stdout), - "error": err, - }).Warn("kill failed") - return - } - logger.Debug("killing process succeeded") - wp.mtx.Lock() - defer wp.mtx.Unlock() - if _, ok := wkr.running[uuid]; ok { - delete(wkr.running, uuid) - if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { - wkr.state = StateIdle + if rr != nil { + rr.Kill(reason) + return } - wkr.updated = time.Now() - go wp.notify() } + logger.Debug("cannot kill: already disappeared") } func (wp *Pool) registerMetrics(reg *prometheus.Registry) { @@ -785,7 +763,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { }) logger.Info("instance disappeared in cloud") delete(wp.workers, id) - go wkr.executor.Close() + go wkr.Close() notify = true } diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go new file mode 100644 index 0000000000..bf1632a6a2 --- /dev/null +++ b/lib/dispatchcloud/worker/runner.go @@ -0,0 +1,150 @@ +package worker + +import ( + "bytes" + "encoding/json" + "fmt" + "syscall" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" +) + +// remoteRunner handles the starting and stopping of a crunch-run +// process on a remote machine. +type remoteRunner struct { + uuid string + executor Executor + arvClient *arvados.Client + remoteUser string + timeoutTERM time.Duration + timeoutKILL time.Duration + timeoutSignal time.Duration + onUnkillable func(uuid string) // callback invoked when giving up on SIGKILL + onKilled func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL + logger logrus.FieldLogger + + stopping bool // true if Stop() has been called + sentKILL bool // true if SIGKILL has been sent + closed chan struct{} // channel is closed if Close() has been called +} + +// newRemoteRunner returns a new remoteRunner. Caller should ensure +// Close() is called to release resources. +func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { + rr := &remoteRunner{ + uuid: uuid, + executor: wkr.executor, + arvClient: wkr.wp.arvClient, + remoteUser: wkr.instance.RemoteUser(), + timeoutTERM: wkr.wp.timeoutTERM, + timeoutKILL: wkr.wp.timeoutKILL, + timeoutSignal: wkr.wp.timeoutSignal, + onUnkillable: wkr.onUnkillable, + onKilled: wkr.onKilled, + logger: wkr.logger.WithField("ContainerUUID", uuid), + closed: make(chan struct{}), + } + return rr +} + +// Start a crunch-run process on the remote host. +// +// Start does not return any error encountered. The caller should +// assume the remote process _might_ have started, at least until it +// probes the worker and finds otherwise. +func (rr *remoteRunner) Start() { + env := map[string]string{ + "ARVADOS_API_HOST": rr.arvClient.APIHost, + "ARVADOS_API_TOKEN": rr.arvClient.AuthToken, + } + if rr.arvClient.Insecure { + env["ARVADOS_API_HOST_INSECURE"] = "1" + } + envJSON, err := json.Marshal(env) + if err != nil { + panic(err) + } + stdin := bytes.NewBuffer(envJSON) + cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'" + if rr.remoteUser != "root" { + cmd = "sudo " + cmd + } + stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin) + if err != nil { + rr.logger.WithField("stdout", string(stdout)). + WithField("stderr", string(stderr)). + WithError(err). + Error("error starting crunch-run process") + return + } + rr.logger.Info("crunch-run process started") +} + +// Close abandons the remote process (if any) and releases +// resources. Close must not be called more than once. +func (rr *remoteRunner) Close() { + close(rr.closed) +} + +// Kill starts a background task to kill the remote process, +// escalating from SIGTERM to SIGKILL to onUnkillable() according to +// the configured timeouts. +// +// Once Kill has been called, calling it again has no effect. +func (rr *remoteRunner) Kill(reason string) { + if rr.stopping { + return + } + rr.stopping = true + rr.logger.WithField("Reason", reason).Info("killing crunch-run process") + go func() { + termDeadline := time.Now().Add(rr.timeoutTERM) + killDeadline := termDeadline.Add(rr.timeoutKILL) + t := time.NewTicker(rr.timeoutSignal) + defer t.Stop() + for range t.C { + switch { + case rr.isClosed(): + return + case time.Now().After(killDeadline): + rr.onUnkillable(rr.uuid) + return + case time.Now().After(termDeadline): + rr.sentKILL = true + rr.kill(syscall.SIGKILL) + default: + rr.kill(syscall.SIGTERM) + } + } + }() +} + +func (rr *remoteRunner) kill(sig syscall.Signal) { + logger := rr.logger.WithField("Signal", int(sig)) + logger.Info("sending signal") + cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid) + if rr.remoteUser != "root" { + cmd = "sudo " + cmd + } + stdout, stderr, err := rr.executor.Execute(nil, cmd, nil) + if err != nil { + logger.WithFields(logrus.Fields{ + "stderr": string(stderr), + "stdout": string(stdout), + "error": err, + }).Info("kill failed") + return + } + rr.onKilled(rr.uuid) +} + +func (rr *remoteRunner) isClosed() bool { + select { + case <-rr.closed: + return true + default: + return false + } +} diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 64e1f7797a..b0b030a40d 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,7 +6,6 @@ package worker import ( "bytes" - "encoding/json" "fmt" "strings" "sync" @@ -87,62 +86,60 @@ type worker struct { busy time.Time destroyed time.Time lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes + running map[string]*remoteRunner // remember to update state idle<->running when this changes + starting map[string]*remoteRunner // remember to update state idle<->running when this changes probing chan struct{} } +func (wkr *worker) onUnkillable(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + logger := wkr.logger.WithField("ContainerUUID", uuid) + if wkr.idleBehavior == IdleBehaviorHold { + logger.Warn("unkillable container, but worker has IdleBehavior=Hold") + return + } + logger.Warn("unkillable container, draining worker") + wkr.setIdleBehavior(IdleBehaviorDrain) +} + +func (wkr *worker) onKilled(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + wkr.closeRunner(uuid) + go wkr.wp.notify() +} + +// caller must have lock. +func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { + wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") + wkr.idleBehavior = idleBehavior + wkr.saveTags() + wkr.shutdownIfIdle() +} + // caller must have lock. func (wkr *worker) startContainer(ctr arvados.Container) { logger := wkr.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "Priority": ctr.Priority, }) - logger = logger.WithField("Instance", wkr.instance.ID()) logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} + rr := newRemoteRunner(ctr.UUID, wkr) + wkr.starting[ctr.UUID] = rr if wkr.state != StateRunning { wkr.state = StateRunning go wkr.wp.notify() } go func() { - env := map[string]string{ - "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, - "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, - } - if wkr.wp.arvClient.Insecure { - env["ARVADOS_API_HOST_INSECURE"] = "1" - } - envJSON, err := json.Marshal(env) - if err != nil { - panic(err) - } - stdin := bytes.NewBuffer(envJSON) - cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'" - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin) + rr.Start() wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() wkr.updated = now wkr.busy = now delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - wkr.lastUUID = ctr.UUID - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") + wkr.running[ctr.UUID] = rr wkr.lastUUID = ctr.UUID }() } @@ -274,31 +271,8 @@ func (wkr *worker) probeAndUpdate() { // advantage of the non-busy state, though. wkr.busy = updateTime } - changed := false - // Build a new "running" map. Set changed=true if it differs - // from the existing map (wkr.running) to ensure the scheduler - // gets notified below. - running := map[string]struct{}{} - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - if _, ok := wkr.starting[uuid]; !ok { - // We didn't start it -- it must have - // been started by a previous - // dispatcher process. - logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") - } - changed = true - } - } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wkr.wp.notifyExited(uuid, updateTime) - changed = true - } - } + changed := wkr.updateRunning(ctrUUIDs) // Update state if this was the first successful boot-probe. if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { @@ -317,14 +291,13 @@ func (wkr *worker) probeAndUpdate() { // Log whenever a run-probe reveals crunch-run processes // appearing/disappearing before boot-probe succeeds. - if wkr.state == StateUnknown && len(running) != len(wkr.running) { + if wkr.state == StateUnknown && changed { logger.WithFields(logrus.Fields{ - "RunningContainers": len(running), + "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("crunch-run probe succeeded, but boot probe is still failing") } - wkr.running = running if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { wkr.state = StateRunning } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { @@ -333,7 +306,7 @@ func (wkr *worker) probeAndUpdate() { wkr.updated = updateTime if booted && (initialState == StateUnknown || initialState == StateBooting) { logger.WithFields(logrus.Fields{ - "RunningContainers": len(running), + "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("probes succeeded, instance is in service") } @@ -402,27 +375,52 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { return true } +// Returns true if the instance is eligible for shutdown: either it's +// been idle too long, or idleBehavior=Drain and nothing is running. +// // caller must have lock. -func (wkr *worker) shutdownIfIdle() bool { +func (wkr *worker) eligibleForShutdown() bool { if wkr.idleBehavior == IdleBehaviorHold { - // Never shut down. return false } - age := time.Since(wkr.busy) - - old := age >= wkr.wp.timeoutIdle draining := wkr.idleBehavior == IdleBehaviorDrain - shouldShutdown := ((old || draining) && wkr.state == StateIdle) || - (draining && wkr.state == StateBooting) - if !shouldShutdown { + switch wkr.state { + case StateBooting: + return draining + case StateIdle: + return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle + case StateRunning: + if !draining { + return false + } + for _, rr := range wkr.running { + if !rr.sentKILL { + return false + } + } + for _, rr := range wkr.starting { + if !rr.sentKILL { + return false + } + } + // draining, and all remaining runners are just trying + // to force-kill their crunch-run procs + return true + default: return false } +} +// caller must have lock. +func (wkr *worker) shutdownIfIdle() bool { + if !wkr.eligibleForShutdown() { + return false + } wkr.logger.WithFields(logrus.Fields{ "State": wkr.state, - "IdleDuration": stats.Duration(age), + "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, - }).Info("shutdown idle worker") + }).Info("shutdown worker") wkr.shutdown() return true } @@ -468,3 +466,68 @@ func (wkr *worker) saveTags() { }() } } + +func (wkr *worker) Close() { + // This might take time, so do it after unlocking mtx. + defer wkr.executor.Close() + + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + for uuid, rr := range wkr.running { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } + for uuid, rr := range wkr.starting { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } +} + +// Add/remove entries in wkr.running to match ctrUUIDs returned by a +// probe. Returns true if anything was added or removed. +// +// Caller must have lock. +func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) { + alive := map[string]bool{} + for _, uuid := range ctrUUIDs { + alive[uuid] = true + if _, ok := wkr.running[uuid]; ok { + // unchanged + } else if rr, ok := wkr.starting[uuid]; ok { + wkr.running[uuid] = rr + delete(wkr.starting, uuid) + changed = true + } else { + // We didn't start it -- it must have been + // started by a previous dispatcher process. + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") + wkr.running[uuid] = newRemoteRunner(uuid, wkr) + changed = true + } + } + for uuid := range wkr.running { + if !alive[uuid] { + wkr.closeRunner(uuid) + changed = true + } + } + return +} + +// caller must have lock. +func (wkr *worker) closeRunner(uuid string) { + rr := wkr.running[uuid] + if rr == nil { + return + } + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") + delete(wkr.running, uuid) + rr.Close() + + now := time.Now() + wkr.updated = now + wkr.wp.exited[uuid] = now + if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { + wkr.state = StateIdle + } +} diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index f16f98a943..8bd29097f4 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -122,6 +122,15 @@ type Dispatch struct { // Maximum total worker probes per second MaxProbesPerSecond int + + // Time before repeating TERM/KILL signal + TimeoutSignal Duration + + // Time to give up on TERM and move to KILL + TimeoutTERM Duration + + // Time to give up on KILL and write off the worker + TimeoutKILL Duration } type CloudVMs struct { -- 2.30.2