14807: Escalate sigterm->sigkill->drain instance.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 18 Mar 2019 15:09:09 +0000 (11:09 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 18 Mar 2019 15:44:32 +0000 (11:44 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/runner.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
sdk/go/arvados/config.go

index fb11b223521da3fbec4d85daa1be4a30cdb84415..d0b4efefa3578c214345a477a1fa25f8e5ab8dc8 100644 (file)
@@ -62,6 +62,9 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        ProbeInterval:      arvados.Duration(5 * time.Millisecond),
                        StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
                        MaxProbesPerSecond: 1000,
+                       TimeoutSignal:      arvados.Duration(3 * time.Millisecond),
+                       TimeoutTERM:        arvados.Duration(20 * time.Millisecond),
+                       TimeoutKILL:        arvados.Duration(20 * time.Millisecond),
                },
                InstanceTypes: arvados.InstanceTypeMap{
                        test.InstanceType(1).Name:  test.InstanceType(1),
index 18cdc94fa52156ceab01d7dbe135d8db20029176..307807e32337257f14d02178cbd6e98de61f7be8 100644 (file)
@@ -38,7 +38,7 @@ type WorkerPool interface {
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
-       KillContainer(uuid string)
+       KillContainer(uuid, reason string)
        Subscribe() <-chan struct{}
        Unsubscribe(<-chan struct{})
 }
index 28b9fd33857a388f6bd15aa1252a4797bec18fee..99bee484c6f7162a3e875b7627738a555fb46e13 100644 (file)
@@ -32,7 +32,7 @@ func (sch *Scheduler) sync() {
                        if !running {
                                go sch.cancel(uuid, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
                                go sch.kill(uuid, "priority=0")
                        }
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -60,7 +60,7 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
@@ -98,9 +98,7 @@ func (sch *Scheduler) cancel(uuid string, reason string) {
 }
 
 func (sch *Scheduler) kill(uuid string, reason string) {
-       logger := sch.logger.WithField("ContainerUUID", uuid)
-       logger.Debugf("killing crunch-run process because %s", reason)
-       sch.pool.KillContainer(uuid)
+       sch.pool.KillContainer(uuid, reason)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
index cd1ed5f5a4a9f8ae5b7333066d71c7ac77a49f5e..28fee6d9278db08482c9daa2f575e217bc6c6bb3 100644 (file)
@@ -340,8 +340,8 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                        return 1
                } else {
                        fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+                       return 0
                }
-               return 0
        }
        if command == "true" {
                return 0
index e90935e2aa9e5747d08e136475cd186c0b4bc766..d13e29c1e89487df597a387f11129a6c7535e48b 100644 (file)
@@ -68,6 +68,9 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutKILL        = time.Second * 20
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -105,6 +108,9 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+               timeoutKILL:        duration(cluster.Dispatch.TimeoutKILL, defaultTimeoutKILL),
+               timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
@@ -136,6 +142,9 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutKILL        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
 
        // private state
@@ -319,9 +328,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -383,19 +390,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -486,53 +488,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -785,7 +763,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
new file mode 100644 (file)
index 0000000..bf1632a
--- /dev/null
@@ -0,0 +1,150 @@
+package worker
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+// remoteRunner handles the starting and stopping of a crunch-run
+// process on a remote machine.
+type remoteRunner struct {
+       uuid          string
+       executor      Executor
+       arvClient     *arvados.Client
+       remoteUser    string
+       timeoutTERM   time.Duration
+       timeoutKILL   time.Duration
+       timeoutSignal time.Duration
+       onUnkillable  func(uuid string) // callback invoked when giving up on SIGKILL
+       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
+       logger        logrus.FieldLogger
+
+       stopping bool          // true if Stop() has been called
+       sentKILL bool          // true if SIGKILL has been sent
+       closed   chan struct{} // channel is closed if Close() has been called
+}
+
+// newRemoteRunner returns a new remoteRunner. Caller should ensure
+// Close() is called to release resources.
+func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       rr := &remoteRunner{
+               uuid:          uuid,
+               executor:      wkr.executor,
+               arvClient:     wkr.wp.arvClient,
+               remoteUser:    wkr.instance.RemoteUser(),
+               timeoutTERM:   wkr.wp.timeoutTERM,
+               timeoutKILL:   wkr.wp.timeoutKILL,
+               timeoutSignal: wkr.wp.timeoutSignal,
+               onUnkillable:  wkr.onUnkillable,
+               onKilled:      wkr.onKilled,
+               logger:        wkr.logger.WithField("ContainerUUID", uuid),
+               closed:        make(chan struct{}),
+       }
+       return rr
+}
+
+// Start a crunch-run process on the remote host.
+//
+// Start does not return any error encountered. The caller should
+// assume the remote process _might_ have started, at least until it
+// probes the worker and finds otherwise.
+func (rr *remoteRunner) Start() {
+       env := map[string]string{
+               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
+       }
+       if rr.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
+       stdin := bytes.NewBuffer(envJSON)
+       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
+       if err != nil {
+               rr.logger.WithField("stdout", string(stdout)).
+                       WithField("stderr", string(stderr)).
+                       WithError(err).
+                       Error("error starting crunch-run process")
+               return
+       }
+       rr.logger.Info("crunch-run process started")
+}
+
+// Close abandons the remote process (if any) and releases
+// resources. Close must not be called more than once.
+func (rr *remoteRunner) Close() {
+       close(rr.closed)
+}
+
+// Kill starts a background task to kill the remote process,
+// escalating from SIGTERM to SIGKILL to onUnkillable() according to
+// the configured timeouts.
+//
+// Once Kill has been called, calling it again has no effect.
+func (rr *remoteRunner) Kill(reason string) {
+       if rr.stopping {
+               return
+       }
+       rr.stopping = true
+       rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
+       go func() {
+               termDeadline := time.Now().Add(rr.timeoutTERM)
+               killDeadline := termDeadline.Add(rr.timeoutKILL)
+               t := time.NewTicker(rr.timeoutSignal)
+               defer t.Stop()
+               for range t.C {
+                       switch {
+                       case rr.isClosed():
+                               return
+                       case time.Now().After(killDeadline):
+                               rr.onUnkillable(rr.uuid)
+                               return
+                       case time.Now().After(termDeadline):
+                               rr.sentKILL = true
+                               rr.kill(syscall.SIGKILL)
+                       default:
+                               rr.kill(syscall.SIGTERM)
+                       }
+               }
+       }()
+}
+
+func (rr *remoteRunner) kill(sig syscall.Signal) {
+       logger := rr.logger.WithField("Signal", int(sig))
+       logger.Info("sending signal")
+       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       if rr.remoteUser != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Info("kill failed")
+               return
+       }
+       rr.onKilled(rr.uuid)
+}
+
+func (rr *remoteRunner) isClosed() bool {
+       select {
+       case <-rr.closed:
+               return true
+       default:
+               return false
+       }
+}
index 64e1f7797af8634be63502faea5faaaa8b30a5f9..b0b030a40daf5e73bd639040191d1b77d358eb11 100644 (file)
@@ -6,7 +6,6 @@ package worker
 
 import (
        "bytes"
-       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -87,62 +86,60 @@ type worker struct {
        busy         time.Time
        destroyed    time.Time
        lastUUID     string
-       running      map[string]struct{} // remember to update state idle<->running when this changes
-       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       running      map[string]*remoteRunner // remember to update state idle<->running when this changes
+       starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
        probing      chan struct{}
 }
 
+func (wkr *worker) onUnkillable(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       logger := wkr.logger.WithField("ContainerUUID", uuid)
+       if wkr.idleBehavior == IdleBehaviorHold {
+               logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+               return
+       }
+       logger.Warn("unkillable container, draining worker")
+       wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       wkr.closeRunner(uuid)
+       go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+       wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+}
+
 // caller must have lock.
 func (wkr *worker) startContainer(ctr arvados.Container) {
        logger := wkr.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
+       rr := newRemoteRunner(ctr.UUID, wkr)
+       wkr.starting[ctr.UUID] = rr
        if wkr.state != StateRunning {
                wkr.state = StateRunning
                go wkr.wp.notify()
        }
        go func() {
-               env := map[string]string{
-                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
-                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
-               }
-               if wkr.wp.arvClient.Insecure {
-                       env["ARVADOS_API_HOST_INSECURE"] = "1"
-               }
-               envJSON, err := json.Marshal(env)
-               if err != nil {
-                       panic(err)
-               }
-               stdin := bytes.NewBuffer(envJSON)
-               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
-               if u := wkr.instance.RemoteUser(); u != "root" {
-                       cmd = "sudo " + cmd
-               }
-               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
+               rr.Start()
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
                wkr.updated = now
                wkr.busy = now
                delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
+               wkr.running[ctr.UUID] = rr
                wkr.lastUUID = ctr.UUID
        }()
 }
@@ -274,31 +271,8 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       changed := false
 
-       // Build a new "running" map. Set changed=true if it differs
-       // from the existing map (wkr.running) to ensure the scheduler
-       // gets notified below.
-       running := map[string]struct{}{}
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       if _, ok := wkr.starting[uuid]; !ok {
-                               // We didn't start it -- it must have
-                               // been started by a previous
-                               // dispatcher process.
-                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
-                       }
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wkr.wp.notifyExited(uuid, updateTime)
-                       changed = true
-               }
-       }
+       changed := wkr.updateRunning(ctrUUIDs)
 
        // Update state if this was the first successful boot-probe.
        if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
@@ -317,14 +291,13 @@ func (wkr *worker) probeAndUpdate() {
 
        // Log whenever a run-probe reveals crunch-run processes
        // appearing/disappearing before boot-probe succeeds.
-       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+       if wkr.state == StateUnknown && changed {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("crunch-run probe succeeded, but boot probe is still failing")
        }
 
-       wkr.running = running
        if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
                wkr.state = StateRunning
        } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
@@ -333,7 +306,7 @@ func (wkr *worker) probeAndUpdate() {
        wkr.updated = updateTime
        if booted && (initialState == StateUnknown || initialState == StateBooting) {
                logger.WithFields(logrus.Fields{
-                       "RunningContainers": len(running),
+                       "RunningContainers": len(wkr.running),
                        "State":             wkr.state,
                }).Info("probes succeeded, instance is in service")
        }
@@ -402,27 +375,52 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        return true
 }
 
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
 // caller must have lock.
-func (wkr *worker) shutdownIfIdle() bool {
+func (wkr *worker) eligibleForShutdown() bool {
        if wkr.idleBehavior == IdleBehaviorHold {
-               // Never shut down.
                return false
        }
-       age := time.Since(wkr.busy)
-
-       old := age >= wkr.wp.timeoutIdle
        draining := wkr.idleBehavior == IdleBehaviorDrain
-       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
-               (draining && wkr.state == StateBooting)
-       if !shouldShutdown {
+       switch wkr.state {
+       case StateBooting:
+               return draining
+       case StateIdle:
+               return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+       case StateRunning:
+               if !draining {
+                       return false
+               }
+               for _, rr := range wkr.running {
+                       if !rr.sentKILL {
+                               return false
+                       }
+               }
+               for _, rr := range wkr.starting {
+                       if !rr.sentKILL {
+                               return false
+                       }
+               }
+               // draining, and all remaining runners are just trying
+               // to force-kill their crunch-run procs
+               return true
+       default:
                return false
        }
+}
 
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if !wkr.eligibleForShutdown() {
+               return false
+       }
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "IdleDuration": stats.Duration(age),
+               "IdleDuration": stats.Duration(time.Since(wkr.busy)),
                "IdleBehavior": wkr.idleBehavior,
-       }).Info("shutdown idle worker")
+       }).Info("shutdown worker")
        wkr.shutdown()
        return true
 }
@@ -468,3 +466,68 @@ func (wkr *worker) saveTags() {
                }()
        }
 }
+
+func (wkr *worker) Close() {
+       // This might take time, so do it after unlocking mtx.
+       defer wkr.executor.Close()
+
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       for uuid, rr := range wkr.running {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+       for uuid, rr := range wkr.starting {
+               wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+               rr.Close()
+       }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+       alive := map[string]bool{}
+       for _, uuid := range ctrUUIDs {
+               alive[uuid] = true
+               if _, ok := wkr.running[uuid]; ok {
+                       // unchanged
+               } else if rr, ok := wkr.starting[uuid]; ok {
+                       wkr.running[uuid] = rr
+                       delete(wkr.starting, uuid)
+                       changed = true
+               } else {
+                       // We didn't start it -- it must have been
+                       // started by a previous dispatcher process.
+                       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if !alive[uuid] {
+                       wkr.closeRunner(uuid)
+                       changed = true
+               }
+       }
+       return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+       rr := wkr.running[uuid]
+       if rr == nil {
+               return
+       }
+       wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+       delete(wkr.running, uuid)
+       rr.Close()
+
+       now := time.Now()
+       wkr.updated = now
+       wkr.wp.exited[uuid] = now
+       if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+               wkr.state = StateIdle
+       }
+}
index f16f98a943cdbe2f0501a35d95cb3e45e9c9d5a9..8bd29097f42f4af78e727d46b69f62caf7512040 100644 (file)
@@ -122,6 +122,15 @@ type Dispatch struct {
 
        // Maximum total worker probes per second
        MaxProbesPerSecond int
+
+       // Time before repeating TERM/KILL signal
+       TimeoutSignal Duration
+
+       // Time to give up on TERM and move to KILL
+       TimeoutTERM Duration
+
+       // Time to give up on KILL and write off the worker
+       TimeoutKILL Duration
 }
 
 type CloudVMs struct {