Merge branch '20520-instance-init-command'
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
index de69df98227e624fc29ef8e55884e8457db29592..4d601d6ae834dfeb242a8328dd4c62d79959d46e 100644 (file)
@@ -8,10 +8,13 @@ import (
        "fmt"
 
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
+var reportedUnexpectedState = false
+
 // sync resolves discrepancies between the queue and the pool:
 //
 // Lingering crunch-run processes for finalized and unlocked/requeued
@@ -23,6 +26,7 @@ import (
 // Running containers whose crunch-run processes have exited are
 // cancelled.
 func (sch *Scheduler) sync() {
+       anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
        running := sch.pool.Running()
        qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
@@ -30,7 +34,9 @@ func (sch *Scheduler) sync() {
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               go sch.cancel(uuid, "not running on any worker")
+                               if !anyUnknownWorkers {
+                                       go sch.cancel(uuid, "not running on any worker")
+                               }
                        } else if !exited.IsZero() && qUpdated.After(exited) {
                                go sch.cancel(uuid, "state=Running after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
@@ -60,7 +66,7 @@ func (sch *Scheduler) sync() {
                                // a network outage and is still
                                // preparing to run a container that
                                // has already been unlocked/requeued.
-                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+                               go sch.kill(uuid, fmt.Sprintf("pool says running, but queue says state=%s", ent.Container.State))
                        } else if ent.Container.Priority == 0 {
                                sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
@@ -78,10 +84,13 @@ func (sch *Scheduler) sync() {
                                go sch.requeue(ent, "priority=0")
                        }
                default:
-                       sch.logger.WithFields(logrus.Fields{
-                               "ContainerUUID": uuid,
-                               "State":         ent.Container.State,
-                       }).Error("BUG: unexpected state")
+                       if !reportedUnexpectedState {
+                               sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                               }).Error("BUG: unexpected state")
+                               reportedUnexpectedState = true
+                       }
                }
        }
        for uuid := range running {
@@ -105,13 +114,21 @@ func (sch *Scheduler) cancel(uuid string, reason string) {
 }
 
 func (sch *Scheduler) kill(uuid string, reason string) {
+       if !sch.uuidLock(uuid, "kill") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       sch.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "reason":        reason,
+       }).Debug("kill")
        sch.pool.KillContainer(uuid, reason)
        sch.pool.ForgetContainer(uuid)
 }
 
 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
        uuid := ent.Container.UUID
-       if !sch.uuidLock(uuid, "cancel") {
+       if !sch.uuidLock(uuid, "requeue") {
                return
        }
        defer sch.uuidUnlock(uuid)