Merge branch '15964-fix-docs' refs #15964
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
index 00e2a30f7992a5801e70f9395b436b0c7fbc7882..fc683505f93dbae41ff42f31032dd2d145d72169 100644 (file)
@@ -5,14 +5,15 @@
 package scheduler
 
 import (
-       "time"
+       "fmt"
 
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
-// Sync resolves discrepancies between the queue and the pool:
+// sync resolves discrepancies between the queue and the pool:
 //
 // Lingering crunch-run processes for finalized and unlocked/requeued
 // containers are killed.
@@ -22,64 +23,114 @@ import (
 //
 // Running containers whose crunch-run processes have exited are
 // cancelled.
-//
-// Sync must not be called concurrently with other calls to Map or
-// Sync using the same queue or pool.
-func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
-       running := pool.Running()
-       cancel := func(ent container.QueueEnt, reason string) {
-               uuid := ent.Container.UUID
-               logger := logger.WithField("ContainerUUID", uuid)
-               logger.Infof("cancelling container because %s", reason)
-               err := queue.Cancel(uuid)
-               if err != nil {
-                       logger.WithError(err).Print("error cancelling container")
-               }
-       }
-       kill := func(ent container.QueueEnt) {
-               uuid := ent.Container.UUID
-               logger := logger.WithField("ContainerUUID", uuid)
-               logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
-               pool.KillContainer(uuid)
-       }
-       qEntries, qUpdated := queue.Entries()
+func (sch *Scheduler) sync() {
+       anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
+       running := sch.pool.Running()
+       qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
                exited, running := running[uuid]
                switch ent.Container.State {
                case arvados.ContainerStateRunning:
                        if !running {
-                               cancel(ent, "not running on any worker")
+                               if !anyUnknownWorkers {
+                                       go sch.cancel(uuid, "not running on any worker")
+                               }
                        } else if !exited.IsZero() && qUpdated.After(exited) {
-                               cancel(ent, "state=\"Running\" after crunch-run exited")
+                               go sch.cancel(uuid, "state=Running after crunch-run exited")
+                       } else if ent.Container.Priority == 0 {
+                               go sch.kill(uuid, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
-                               kill(ent)
+                               // Kill crunch-run in case it's stuck;
+                               // nothing it does now will matter
+                               // anyway. If crunch-run has already
+                               // exited and we just haven't found
+                               // out about it yet, the only effect
+                               // of kill() will be to make the
+                               // worker available for the next
+                               // container.
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
                        } else {
-                               logger.WithFields(logrus.Fields{
+                               sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
                                        "State":         ent.Container.State,
-                               }).Info("container finished")
-                               queue.Forget(uuid)
+                               }).Info("container finished -- dropping from queue")
+                               sch.queue.Forget(uuid)
                        }
                case arvados.ContainerStateQueued:
                        if running {
-                               kill(ent)
+                               // Can happen if a worker returns from
+                               // a network outage and is still
+                               // preparing to run a container that
+                               // has already been unlocked/requeued.
+                               go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+                       } else if ent.Container.Priority == 0 {
+                               sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                                       "Priority":      ent.Container.Priority,
+                               }).Info("container on hold -- dropping from queue")
+                               sch.queue.Forget(uuid)
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
-                               logger = logger.WithFields(logrus.Fields{
-                                       "ContainerUUID": uuid,
-                                       "Exited":        time.Since(exited).Seconds(),
-                               })
-                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
-                               err := queue.Unlock(uuid)
-                               if err != nil {
-                                       logger.WithError(err).Info("error requeueing container")
-                               }
+                               go sch.requeue(ent, "crunch-run exited")
+                       } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+                               go sch.kill(uuid, "priority=0")
+                       } else if !running && ent.Container.Priority == 0 {
+                               go sch.requeue(ent, "priority=0")
                        }
                default:
-                       logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+                       sch.logger.WithFields(logrus.Fields{
+                               "ContainerUUID": uuid,
+                               "State":         ent.Container.State,
+                       }).Error("BUG: unexpected state")
+               }
+       }
+       for uuid := range running {
+               if _, known := qEntries[uuid]; !known {
+                       go sch.kill(uuid, "not in queue")
                }
        }
 }
+
+func (sch *Scheduler) cancel(uuid string, reason string) {
+       if !sch.uuidLock(uuid, "cancel") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       logger := sch.logger.WithField("ContainerUUID", uuid)
+       logger.Infof("cancelling container because %s", reason)
+       err := sch.queue.Cancel(uuid)
+       if err != nil {
+               logger.WithError(err).Print("error cancelling container")
+       }
+}
+
+func (sch *Scheduler) kill(uuid string, reason string) {
+       if !sch.uuidLock(uuid, "kill") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       sch.pool.KillContainer(uuid, reason)
+       sch.pool.ForgetContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+       uuid := ent.Container.UUID
+       if !sch.uuidLock(uuid, "requeue") {
+               return
+       }
+       defer sch.uuidUnlock(uuid)
+       logger := sch.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         ent.Container.State,
+               "Priority":      ent.Container.Priority,
+       })
+       logger.Infof("requeueing locked container because %s", reason)
+       err := sch.queue.Unlock(uuid)
+       if err != nil {
+               logger.WithError(err).Error("error requeueing container")
+       }
+}