15345: Add .../containers/kill management API to dispatcher.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Jun 2019 19:25:20 +0000 (15:25 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Jun 2019 19:25:20 +0000 (15:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/worker/pool.go

index 12c60ecb11177871a4b5230adddd0e313898270c..731c6d25d72c20243dba43e9636f03c786bb1423 100644 (file)
@@ -148,6 +148,7 @@ func (disp *dispatcher) initialize() {
        } else {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiInstanceKill)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
@@ -232,6 +233,20 @@ func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request)
        }
 }
 
+// Management API: send SIGTERM to specified container's crunch-run
+// process now.
+func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
+       uuid := r.FormValue("container_uuid")
+       if uuid == "" {
+               httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
+               return
+       }
+       if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
+               httpserver.Error(w, "container not found", http.StatusNotFound)
+               return
+       }
+}
+
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
        id := cloud.InstanceID(r.FormValue("instance_id"))
        if id == "" {
index 307807e32337257f14d02178cbd6e98de61f7be8..6e00911bd0391df4d2dbc744c5223fe0becec9f4 100644 (file)
@@ -38,7 +38,8 @@ type WorkerPool interface {
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
-       KillContainer(uuid, reason string)
+       KillContainer(uuid, reason string) bool
+       ForgetContainer(uuid string)
        Subscribe() <-chan struct{}
        Unsubscribe(<-chan struct{})
 }
index dab324579dc84e9c8c93086fe693a638f029191f..c683b704d48bb989664b55d71bd136dcc77ca999 100644 (file)
@@ -77,10 +77,13 @@ func (p *stubPool) Create(it arvados.InstanceType) bool {
        p.unalloc[it]++
        return true
 }
-func (p *stubPool) KillContainer(uuid, reason string) {
+func (p *stubPool) ForgetContainer(uuid string) {
+}
+func (p *stubPool) KillContainer(uuid, reason string) bool {
        p.Lock()
        defer p.Unlock()
        delete(p.running, uuid)
+       return true
 }
 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
        p.shutdowns++
index 99bee484c6f7162a3e875b7627738a555fb46e13..78f099549657538102115efc5823c8e9f672c105 100644 (file)
@@ -99,6 +99,7 @@ func (sch *Scheduler) cancel(uuid string, reason string) {
 
 func (sch *Scheduler) kill(uuid string, reason string) {
        sch.pool.KillContainer(uuid, reason)
+       sch.pool.ForgetContainer(uuid)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
index 201e8aad276eb5f8ca353e3ff8c41fc9dee00f2a..97ca7f60a2a916fd6feece03e016c7fe3673e22d 100644 (file)
@@ -154,7 +154,7 @@ type Pool struct {
        creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
@@ -446,7 +446,7 @@ func (wp *Pool) CountWorkers() map[State]int {
 // In the returned map, the time value indicates when the Pool
 // observed that the container process had exited. A container that
 // has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
 // containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
@@ -493,18 +493,15 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string, reason string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
                "Reason":        reason,
        })
-       if _, ok := wp.exited[uuid]; ok {
-               logger.Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
        for _, wkr := range wp.workers {
                rr := wkr.running[uuid]
                if rr == nil {
@@ -512,10 +509,30 @@ func (wp *Pool) KillContainer(uuid string, reason string) {
                }
                if rr != nil {
                        rr.Kill(reason)
-                       return
+                       return true
                }
        }
        logger.Debug("cannot kill: already disappeared")
+       return false
+}
+
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+       }
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {