16663: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
index 23fc621dea26c76be659ddc4f88bea7565f4bd4c..116ca7643117d3f4df3b6e8d4e99864a44d6dfe6 100644 (file)
@@ -7,8 +7,9 @@ package scheduler
 import (
        "fmt"
 
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
@@ -23,6 +24,7 @@ import (
 // Running containers whose crunch-run processes have exited are
 // cancelled.
 func (sch *Scheduler) sync() {
+       anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
        running := sch.pool.Running()
        qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
@@ -30,11 +32,13 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(ent, "not running on any worker")
+                               if !anyUnknownWorkers {
+                                       go sch.cancel(uuid, "not running on any worker")
+                               }
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -46,12 +50,12 @@ func (sch *Scheduler) sync() {
                                // of kill() will be to make the
                                // worker available for the next
                                // container.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
                                        "State":         ent.Container.State,
-                               }).Info("container finished")
+                               }).Info("container finished -- dropping from queue")
                                sch.queue.Forget(uuid)
                        }
                case arvados.ContainerStateQueued:
@@ -60,13 +64,20 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+                       } else if ent.Container.Priority == 0 {
+                               sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                                       "Priority":      ent.Container.Priority,
+                               }).Info("container on hold -- dropping from queue")
+                               sch.queue.Forget(uuid)
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
                                go sch.requeue(ent, "crunch-run exited")
                        } else if running && exited.IsZero() && ent.Container.Priority == 0 {
-                               go sch.kill(ent, "priority=0")
+                               go sch.kill(uuid, "priority=0")
                        } else if !running && ent.Container.Priority == 0 {
                                go sch.requeue(ent, "priority=0")
                        }
@@ -77,10 +88,14 @@ func (sch *Scheduler) sync() {
                        }).Error("BUG: unexpected state")
                }
        }
+       for uuid := range running {
+               if _, known := qEntries[uuid]; !known {
+                       go sch.kill(uuid, "not in queue")
+               }
+       }
 }
 
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
        if !sch.uuidLock(uuid, "cancel") {
                return
        }
@@ -93,11 +108,9 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
        }
 }
 
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
-       uuid := ent.Container.UUID
-       logger := sch.logger.WithField("ContainerUUID", uuid)
-       logger.Debugf("killing crunch-run process because %s", reason)
-       sch.pool.KillContainer(uuid)
+func (sch *Scheduler) kill(uuid string, reason string) {
+       sch.pool.KillContainer(uuid, reason)
+       sch.pool.ForgetContainer(uuid)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {