// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package scheduler import ( "sort" "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/lib/dispatchcloud/container" "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/Sirupsen/logrus" ) func (sch *Scheduler) runQueue() { unsorted, _ := sch.queue.Entries() sorted := make([]container.QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { sorted = append(sorted, ent) } sort.Slice(sorted, func(i, j int) bool { return sorted[i].Container.Priority > sorted[j].Container.Priority }) running := sch.pool.Running() unalloc := sch.pool.Unallocated() sch.logger.WithFields(logrus.Fields{ "Containers": len(sorted), "Processes": len(running), }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota for i, ctr := range sorted { ctr, it := ctr.Container, ctr.InstanceType logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "InstanceType": it.Name, }) if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } if ctr.State == arvados.ContainerStateQueued { if unalloc[it] < 1 && sch.pool.AtQuota() { logger.Debugf("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] break } logger.Debugf("locking") err := sch.queue.Lock(ctr.UUID) if err != nil { logger.WithError(err).Warnf("lock error") unalloc[it]++ continue } var ok bool ctr, ok = sch.queue.Get(ctr.UUID) if !ok { logger.Error("(BUG?) container disappeared from queue after Lock succeeded") continue } if ctr.State != arvados.ContainerStateLocked { logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State) } } if ctr.State != arvados.ContainerStateLocked { continue } if unalloc[it] < 1 { logger.Info("creating new instance") err := sch.pool.Create(it) if err != nil { if _, ok := err.(cloud.QuotaError); !ok { logger.WithError(err).Warn("error creating worker") } sch.queue.Unlock(ctr.UUID) // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different // instance types. TODO: avoid // getting starved here if instances // of a specific type always fail. overquota = sorted[i:] break } unalloc[it]++ } if dontstart[it] { // We already tried & failed to start a // higher-priority container on the same // instance type. Don't let this one sneak in // ahead of it. } else if sch.pool.StartContainer(it, ctr) { unalloc[it]-- } else { dontstart[it] = true } } if len(overquota) > 0 { // Unlock any containers that are unmappable while // we're at quota. for _, ctr := range overquota { ctr := ctr.Container if ctr.State == arvados.ContainerStateLocked { logger := sch.logger.WithField("ContainerUUID", ctr.UUID) logger.Debug("unlock because pool capacity is used by higher priority containers") err := sch.queue.Unlock(ctr.UUID) if err != nil { logger.WithError(err).Warn("error unlocking") } } } // Shut down idle workers that didn't get any // containers mapped onto them before we hit quota. for it, n := range unalloc { if n < 1 { continue } sch.pool.Shutdown(it) } } }