dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+tryrun:
for i, ctr := range sorted {
ctr, it := ctr.Container, ctr.InstanceType
logger := sch.logger.WithFields(logrus.Fields{
if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
continue
}
- if ctr.State == arvados.ContainerStateQueued {
+ switch ctr.State {
+ case arvados.ContainerStateQueued:
if unalloc[it] < 1 && sch.pool.AtQuota() {
- logger.Debugf("not locking: AtQuota and no unalloc workers")
+ logger.Debug("not locking: AtQuota and no unalloc workers")
overquota = sorted[i:]
- break
+ break tryrun
}
- logger.Debugf("locking")
- err := sch.queue.Lock(ctr.UUID)
- if err != nil {
- logger.WithError(err).Warnf("lock error")
+ sch.bgLock(logger, ctr.UUID)
+ unalloc[it]--
+ case arvados.ContainerStateLocked:
+ if unalloc[it] < 1 {
+ if sch.pool.AtQuota() {
+ logger.Debug("not starting: AtQuota and no unalloc workers")
+ overquota = sorted[i:]
+ break tryrun
+ }
+ logger.Info("creating new instance")
+ err := sch.pool.Create(it)
+ if err != nil {
+ if _, ok := err.(cloud.QuotaError); !ok {
+ logger.WithError(err).Warn("error creating worker")
+ }
+ sch.queue.Unlock(ctr.UUID)
+ // Don't let lower-priority
+ // containers starve this one
+ // by using keeping idle
+ // workers alive on different
+ // instance types. TODO:
+ // avoid getting starved here
+ // if instances of a specific
+ // type always fail.
+ overquota = sorted[i:]
+ break tryrun
+ }
unalloc[it]++
- continue
}
- var ok bool
- ctr, ok = sch.queue.Get(ctr.UUID)
- if !ok {
- logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
- continue
- }
- if ctr.State != arvados.ContainerStateLocked {
- logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
- }
- }
- if ctr.State != arvados.ContainerStateLocked {
- continue
- }
- if unalloc[it] < 1 {
- logger.Info("creating new instance")
- err := sch.pool.Create(it)
- if err != nil {
- if _, ok := err.(cloud.QuotaError); !ok {
- logger.WithError(err).Warn("error creating worker")
- }
- sch.queue.Unlock(ctr.UUID)
- // Don't let lower-priority containers
- // starve this one by using keeping
- // idle workers alive on different
- // instance types. TODO: avoid
- // getting starved here if instances
- // of a specific type always fail.
- overquota = sorted[i:]
- break
+
+ if dontstart[it] {
+ // We already tried & failed to start
+ // a higher-priority container on the
+ // same instance type. Don't let this
+ // one sneak in ahead of it.
+ } else if sch.pool.StartContainer(it, ctr) {
+ unalloc[it]--
+ } else {
+ dontstart[it] = true
}
- unalloc[it]++
- }
- if dontstart[it] {
- // We already tried & failed to start a
- // higher-priority container on the same
- // instance type. Don't let this one sneak in
- // ahead of it.
- } else if sch.pool.StartContainer(it, ctr) {
- unalloc[it]--
- } else {
- dontstart[it] = true
}
}
}
}
}
+
+// Start an API call to lock the given container, and return
+// immediately while waiting for the response in a new goroutine. Do
+// nothing if a lock request is already in progress for this
+// container.
+func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
+ logger.Debug("locking")
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ if sch.locking[uuid] {
+ logger.Debug("locking in progress, doing nothing")
+ return
+ }
+ sch.locking[uuid] = true
+ go func() {
+ defer func() {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ delete(sch.locking, uuid)
+ }()
+ err := sch.queue.Lock(uuid)
+ if err != nil {
+ logger.WithError(err).Warn("error locking container")
+ return
+ }
+ ctr, ok := sch.queue.Get(uuid)
+ if !ok {
+ logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+ } else if ctr.State != arvados.ContainerStateLocked {
+ logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+ }
+ }()
+}
{
UUID: test.ContainerUUID(1),
Priority: 1,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(2),
Priority: 2,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(3),
Priority: 3,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
{
UUID: test.ContainerUUID(4),
Priority: 4,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
test.InstanceType(2): 2,
},
running: map[string]time.Time{},
- canCreate: 1,
+ canCreate: 0,
}
New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
- c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)})
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
for uuid := range pool.running {
c.Check(uuid, check.Equals, uuids[4])
// Create(), if AtQuota() is true.
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
for quota := 0; quota < 2; quota++ {
+ c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
- for i := 1; i < 1+quota; i++ {
- shouldCreate = append(shouldCreate, test.InstanceType(i))
+ for i := 0; i < quota; i++ {
+ shouldCreate = append(shouldCreate, test.InstanceType(1))
}
queue := test.Queue{
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
{
UUID: test.ContainerUUID(1),
Priority: 1,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
test.InstanceType(2): 1,
},
running: map[string]time.Time{},
- canCreate: 2,
+ canCreate: 4,
}
queue := test.Queue{
ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
// create a new worker
UUID: test.ContainerUUID(1),
Priority: 1,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// tentatively map to unalloc worker
UUID: test.ContainerUUID(2),
Priority: 2,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// start now on idle worker
UUID: test.ContainerUUID(3),
Priority: 3,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 1,
RAM: 1 << 30,
// create a new worker
UUID: test.ContainerUUID(4),
Priority: 4,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,
// tentatively map to unalloc worker
UUID: test.ContainerUUID(5),
Priority: 5,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,
// start now on idle worker
UUID: test.ContainerUUID(6),
Priority: 6,
- State: arvados.ContainerStateQueued,
+ State: arvados.ContainerStateLocked,
RuntimeConstraints: arvados.RuntimeConstraints{
VCPUs: 2,
RAM: 2 << 30,