//
// SPDX-License-Identifier: AGPL-3.0
-// Package scheduler uses a resizable worker pool to execute
-// containers in priority order.
-//
-// Scheduler functions must not be called concurrently using the same
-// queue or pool.
package scheduler
import (
"github.com/Sirupsen/logrus"
)
-// Map maps queued containers onto unallocated workers in priority
-// order, creating new workers if needed. It locks containers that can
-// be mapped onto existing/pending workers, and starts them if
-// possible.
-//
-// Map unlocks any containers that are locked but can't be
-// mapped. (For example, this happens when the cloud provider reaches
-// quota/capacity and a previously mappable container's priority is
-// surpassed by a newer container.)
-//
-// If it encounters errors while creating new workers, Map shuts down
-// idle workers, in case they are consuming quota.
-//
-// Map should not be called without first calling FixStaleLocks.
-//
-// FixStaleLocks()
-// for {
-// Map()
-// Sync()
-// }
-func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
- unsorted, _ := queue.Entries()
+func (sch *Scheduler) runQueue() {
+ unsorted, _ := sch.queue.Entries()
sorted := make([]container.QueueEnt, 0, len(unsorted))
for _, ent := range unsorted {
sorted = append(sorted, ent)
return sorted[i].Container.Priority > sorted[j].Container.Priority
})
- running := pool.Running()
- unalloc := pool.Unallocated()
+ running := sch.pool.Running()
+ unalloc := sch.pool.Unallocated()
- logger.WithFields(logrus.Fields{
+ sch.logger.WithFields(logrus.Fields{
"Containers": len(sorted),
"Processes": len(running),
- }).Debug("mapping")
+ }).Debug("runQueue")
dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
for i, ctr := range sorted {
ctr, it := ctr.Container, ctr.InstanceType
- logger := logger.WithFields(logrus.Fields{
+ logger := sch.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"InstanceType": it.Name,
})
continue
}
if ctr.State == arvados.ContainerStateQueued {
- logger.Debugf("locking")
- if unalloc[it] < 1 && pool.AtQuota() {
+ if unalloc[it] < 1 && sch.pool.AtQuota() {
+ logger.Debugf("not locking: AtQuota and no unalloc workers")
overquota = sorted[i:]
break
}
- err := queue.Lock(ctr.UUID)
+ logger.Debugf("locking")
+ err := sch.queue.Lock(ctr.UUID)
if err != nil {
logger.WithError(err).Warnf("lock error")
unalloc[it]++
continue
}
var ok bool
- ctr, ok = queue.Get(ctr.UUID)
+ ctr, ok = sch.queue.Get(ctr.UUID)
if !ok {
logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
continue
}
if unalloc[it] < 1 {
logger.Info("creating new instance")
- err := pool.Create(it)
+ err := sch.pool.Create(it)
if err != nil {
if _, ok := err.(cloud.QuotaError); !ok {
logger.WithError(err).Warn("error creating worker")
}
- queue.Unlock(ctr.UUID)
+ sch.queue.Unlock(ctr.UUID)
// Don't let lower-priority containers
// starve this one by using keeping
// idle workers alive on different
// higher-priority container on the same
// instance type. Don't let this one sneak in
// ahead of it.
- } else if pool.StartContainer(it, ctr) {
+ } else if sch.pool.StartContainer(it, ctr) {
unalloc[it]--
} else {
dontstart[it] = true
for _, ctr := range overquota {
ctr := ctr.Container
if ctr.State == arvados.ContainerStateLocked {
- logger := logger.WithField("ContainerUUID", ctr.UUID)
+ logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
logger.Debug("unlock because pool capacity is used by higher priority containers")
- err := queue.Unlock(ctr.UUID)
+ err := sch.queue.Unlock(ctr.UUID)
if err != nil {
logger.WithError(err).Warn("error unlocking")
}
if n < 1 {
continue
}
- pool.Shutdown(it)
+ sch.pool.Shutdown(it)
}
}
}