14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / scheduler / run_queue.go
similarity index 65%
rename from lib/dispatchcloud/scheduler/map.go
rename to lib/dispatchcloud/scheduler/run_queue.go
index aa92d04339b142d1f0794bdd4138cf626e36bc3c..9fc1a16580524f71a910d74f58887bb7fa217062 100644 (file)
@@ -2,11 +2,6 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-// Package scheduler uses a resizable worker pool to execute
-// containers in priority order.
-//
-// Scheduler functions must not be called concurrently using the same
-// queue or pool.
 package scheduler
 
 import (
@@ -18,28 +13,8 @@ import (
        "github.com/Sirupsen/logrus"
 )
 
-// Map maps queued containers onto unallocated workers in priority
-// order, creating new workers if needed. It locks containers that can
-// be mapped onto existing/pending workers, and starts them if
-// possible.
-//
-// Map unlocks any containers that are locked but can't be
-// mapped. (For example, this happens when the cloud provider reaches
-// quota/capacity and a previously mappable container's priority is
-// surpassed by a newer container.)
-//
-// If it encounters errors while creating new workers, Map shuts down
-// idle workers, in case they are consuming quota.
-//
-// Map should not be called without first calling FixStaleLocks.
-//
-//     FixStaleLocks()
-//     for {
-//             Map()
-//             Sync()
-//     }
-func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
-       unsorted, _ := queue.Entries()
+func (sch *Scheduler) runQueue() {
+       unsorted, _ := sch.queue.Entries()
        sorted := make([]container.QueueEnt, 0, len(unsorted))
        for _, ent := range unsorted {
                sorted = append(sorted, ent)
@@ -48,20 +23,20 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                return sorted[i].Container.Priority > sorted[j].Container.Priority
        })
 
-       running := pool.Running()
-       unalloc := pool.Unallocated()
+       running := sch.pool.Running()
+       unalloc := sch.pool.Unallocated()
 
-       logger.WithFields(logrus.Fields{
+       sch.logger.WithFields(logrus.Fields{
                "Containers": len(sorted),
                "Processes":  len(running),
-       }).Debug("mapping")
+       }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
        var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
 
        for i, ctr := range sorted {
                ctr, it := ctr.Container, ctr.InstanceType
-               logger := logger.WithFields(logrus.Fields{
+               logger := sch.logger.WithFields(logrus.Fields{
                        "ContainerUUID": ctr.UUID,
                        "InstanceType":  it.Name,
                })
@@ -69,19 +44,20 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                        continue
                }
                if ctr.State == arvados.ContainerStateQueued {
-                       logger.Debugf("locking")
-                       if unalloc[it] < 1 && pool.AtQuota() {
+                       if unalloc[it] < 1 && sch.pool.AtQuota() {
+                               logger.Debugf("not locking: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
                                break
                        }
-                       err := queue.Lock(ctr.UUID)
+                       logger.Debugf("locking")
+                       err := sch.queue.Lock(ctr.UUID)
                        if err != nil {
                                logger.WithError(err).Warnf("lock error")
                                unalloc[it]++
                                continue
                        }
                        var ok bool
-                       ctr, ok = queue.Get(ctr.UUID)
+                       ctr, ok = sch.queue.Get(ctr.UUID)
                        if !ok {
                                logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
                                continue
@@ -95,12 +71,12 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                }
                if unalloc[it] < 1 {
                        logger.Info("creating new instance")
-                       err := pool.Create(it)
+                       err := sch.pool.Create(it)
                        if err != nil {
                                if _, ok := err.(cloud.QuotaError); !ok {
                                        logger.WithError(err).Warn("error creating worker")
                                }
-                               queue.Unlock(ctr.UUID)
+                               sch.queue.Unlock(ctr.UUID)
                                // Don't let lower-priority containers
                                // starve this one by using keeping
                                // idle workers alive on different
@@ -117,7 +93,7 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                        // higher-priority container on the same
                        // instance type. Don't let this one sneak in
                        // ahead of it.
-               } else if pool.StartContainer(it, ctr) {
+               } else if sch.pool.StartContainer(it, ctr) {
                        unalloc[it]--
                } else {
                        dontstart[it] = true
@@ -130,9 +106,9 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                for _, ctr := range overquota {
                        ctr := ctr.Container
                        if ctr.State == arvados.ContainerStateLocked {
-                               logger := logger.WithField("ContainerUUID", ctr.UUID)
+                               logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
                                logger.Debug("unlock because pool capacity is used by higher priority containers")
-                               err := queue.Unlock(ctr.UUID)
+                               err := sch.queue.Unlock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("error unlocking")
                                }
@@ -144,7 +120,7 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
                        if n < 1 {
                                continue
                        }
-                       pool.Shutdown(it)
+                       sch.pool.Shutdown(it)
                }
        }
 }