14807: Don't send SIGKILL to crunch-run processes.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index e90935e2aa9e5747d08e136475cd186c0b4bc766..81a658535ea593a7a0a0c2d9231fd66f3055bdbd 100644 (file)
@@ -68,6 +68,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
@@ -136,6 +140,8 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
 
        // private state
@@ -319,9 +325,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -383,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -486,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -785,7 +760,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }