14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
index bd0e9b309ef9db51a3eaca47e70395bd05cd98c6..a85162debef250ea9a8078b2ccaba1b442eb0983 100644 (file)
@@ -12,7 +12,7 @@ import (
        "github.com/Sirupsen/logrus"
 )
 
-// Sync resolves discrepancies between the queue and the pool:
+// sync resolves discrepancies between the queue and the pool:
 //
 // Lingering crunch-run processes for finalized and unlocked/requeued
 // containers are killed.
@@ -22,27 +22,24 @@ import (
 //
 // Running containers whose crunch-run processes have exited are
 // cancelled.
-//
-// Sync must not be called concurrently with other calls to Map or
-// Sync using the same queue or pool.
-func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
-       running := pool.Running()
+func (sch *Scheduler) sync() {
+       running := sch.pool.Running()
        cancel := func(ent container.QueueEnt, reason string) {
                uuid := ent.Container.UUID
-               logger := logger.WithField("ContainerUUID", uuid)
+               logger := sch.logger.WithField("ContainerUUID", uuid)
                logger.Infof("cancelling container because %s", reason)
-               err := queue.Cancel(uuid)
+               err := sch.queue.Cancel(uuid)
                if err != nil {
                        logger.WithError(err).Print("error cancelling container")
                }
        }
        kill := func(ent container.QueueEnt) {
                uuid := ent.Container.UUID
-               logger := logger.WithField("ContainerUUID", uuid)
+               logger := sch.logger.WithField("ContainerUUID", uuid)
                logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
-               pool.KillContainer(uuid)
+               sch.pool.KillContainer(uuid)
        }
-       qEntries, qUpdated := queue.Entries()
+       qEntries, qUpdated := sch.queue.Entries()
        for uuid, ent := range qEntries {
                exited, running := running[uuid]
                switch ent.Container.State {
@@ -64,11 +61,11 @@ func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                                // container.
                                go kill(ent)
                        } else {
-                               logger.WithFields(logrus.Fields{
+                               sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
                                        "State":         ent.Container.State,
                                }).Info("container finished")
-                               queue.Forget(uuid)
+                               sch.queue.Forget(uuid)
                        }
                case arvados.ContainerStateQueued:
                        if running {
@@ -80,18 +77,18 @@ func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                        }
                case arvados.ContainerStateLocked:
                        if running && !exited.IsZero() && qUpdated.After(exited) {
-                               logger logger.WithFields(logrus.Fields{
+                               logger := sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
                                        "Exited":        time.Since(exited).Seconds(),
                                })
                                logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
-                               err := queue.Unlock(uuid)
+                               err := sch.queue.Unlock(uuid)
                                if err != nil {
                                        logger.WithError(err).Info("error requeueing container")
                                }
                        }
                default:
-                       logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+                       sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
                }
        }
 }