14977: Kill containers that don't exist according to controller.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 15 Mar 2019 20:11:48 +0000 (16:11 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 15 Mar 2019 20:34:14 +0000 (16:34 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/scheduler/fix_stale_locks.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/sync.go

index 148b653c2e52305b2ece2255c49d98bf6cb72f50..1f9338f7b8d02e66d73239cc0f0c76ebc53ab423 100644 (file)
@@ -47,7 +47,6 @@ waiting:
                        // Give up.
                        break waiting
                }
-
        }
 
        for _, uuid := range stale {
index 4296a1364c911fc94d44af28512ecac195b4e5f5..9f26877f552c0b6e35af4661deb859f7a5085503 100644 (file)
@@ -335,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
        }
        c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
 }
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 0,
+               },
+               running: map[string]time.Time{
+                       test.ContainerUUID(2): time.Time{},
+               },
+       }
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+       c.Check(pool.running, check.HasLen, 1)
+       sch.sync()
+       for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+       }
+       c.Check(pool.Running(), check.HasLen, 0)
+}
index 23fc621dea26c76be659ddc4f88bea7565f4bd4c..28b9fd33857a388f6bd15aa1252a4797bec18fee 100644 (file)
@@ -30,11 +30,11 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(ent, "not running on any worker")
+                               go sch.cancel(uuid, "not running on any worker")
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -60,13 +60,13 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
                                go sch.requeue(ent, "crunch-run exited")
                        } else if running && exited.IsZero() && ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        } else if !running && ent.Container.Priority == 0 {
                                go sch.requeue(ent, "priority=0")
                        }
@@ -77,10 +77,14 @@ func (sch *Scheduler) sync() {
                        }).Error("BUG: unexpected state")
                }
        }
+       for uuid := range running {
+               if _, known := qEntries[uuid]; !known {
+                       go sch.kill(uuid, "not in queue")
+               }
+       }
 }
 
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
        if !sch.uuidLock(uuid, "cancel") {
                return
        }
@@ -93,8 +97,7 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
        }
 }
 
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
+func (sch *Scheduler) kill(uuid string, reason string) {
        logger := sch.logger.WithField("ContainerUUID", uuid)
        logger.Debugf("killing crunch-run process because %s", reason)
        sch.pool.KillContainer(uuid)