"previous: Upgrading to 2.7.0":#v2_7_0
+h3. Check implications of Containers.MaximumPriceFactor 1.5
+
+When scheduling a container, Arvados now considers using instance types other than the lowest-cost type consistent with the container's resource constraints. If a larger instance is already running and idle, or the cloud provider reports that the optimal instance type is not currently available, Arvados will select a larger instance type, provided the cost does not exceed 1.5x the optimal instance type cost.
+
+This will typically reduce overall latency for containers and reduce instance booting/shutdown overhead, but may increase costs depending on workload and instance availability. To avoid this behavior, configure @Containers.MaximumPriceFactor: 1.0@.
+
h3. Synchronize keepstore and keep-balance upgrades
The internal communication between keepstore and keep-balance about read-only volumes has changed. After keep-balance is upgraded, old versions of keepstore will be treated as read-only. We recommend upgrading and restarting all keepstore services first, then upgrading and restarting keep-balance.
}
var rsv *ec2.Reservation
- var err error
+ var errToReturn error
subnets := instanceSet.ec2config.SubnetID
currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
for tryOffset := 0; ; tryOffset++ {
trySubnet = subnets[tryIndex]
rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
}
+ var err error
rsv, err = instanceSet.client.RunInstances(&rii)
instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
+ if !isErrorCapacity(errToReturn) || isErrorCapacity(err) {
+ // We want to return the last capacity error,
+ // if any; otherwise the last non-capacity
+ // error.
+ errToReturn = err
+ }
if isErrorSubnetSpecific(err) &&
tryOffset < len(subnets)-1 {
instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
atomic.StoreInt32(&instanceSet.currentSubnetIDIndex, int32(tryIndex))
break
}
- err = wrapError(err, &instanceSet.throttleDelayCreate)
- if err != nil {
- return nil, err
+ if rsv == nil {
+ return nil, wrapError(errToReturn, &instanceSet.throttleDelayCreate)
}
return &ec2Instance{
provider: instanceSet,
code := aerr.Code()
return strings.Contains(code, "Subnet") ||
code == "InsufficientInstanceCapacity" ||
- code == "InsufficientVolumeCapacity"
+ code == "InsufficientVolumeCapacity" ||
+ code == "Unsupported"
+}
+
+// isErrorCapacity returns true if the error indicates lack of
+// capacity (either temporary or permanent) to run a specific instance
+// type -- i.e., retrying with a different instance type might
+// succeed.
+func isErrorCapacity(err error) bool {
+ aerr, ok := err.(awserr.Error)
+ if !ok {
+ return false
+ }
+ code := aerr.Code()
+ return code == "InsufficientInstanceCapacity" ||
+ (code == "Unsupported" && strings.Contains(aerr.Message(), "requested instance type"))
}
type ec2QuotaError struct {
return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
} else if isErrorQuota(err) {
return &ec2QuotaError{err}
- } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+ } else if isErrorCapacity(err) {
return &capacityError{err, true}
} else if err != nil {
throttleValue.Store(time.Duration(0))
`.*`)
}
+func (*EC2InstanceSetSuite) TestCreateOneSubnetFailingCapacity(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ "subnet-broken": &ec2stubError{
+ code: "InsufficientInstanceCapacity",
+ message: "insufficient capacity",
+ },
+ }
+ for i := 0; i < 3; i++ {
+ _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InsufficientInstanceCapacity.*`)
+ }
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 6)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 3\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 3\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+}
+
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
_, ok = wrapped.(cloud.QuotaError)
c.Check(ok, check.Equals, true)
- capacityError := awserr.New("InsufficientInstanceCapacity", "", nil)
- wrapped = wrapError(capacityError, nil)
- caperr, ok := wrapped.(cloud.CapacityError)
- c.Check(ok, check.Equals, true)
- c.Check(caperr.IsCapacityError(), check.Equals, true)
- c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
+ for _, trial := range []struct {
+ code string
+ msg string
+ }{
+ {"InsufficientInstanceCapacity", ""},
+ {"Unsupported", "Your requested instance type (t3.micro) is not supported in your requested Availability Zone (us-east-1e). Please retry your request by not specifying an Availability Zone or choosing us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f."},
+ } {
+ capacityError := awserr.New(trial.code, trial.msg, nil)
+ wrapped = wrapError(capacityError, nil)
+ caperr, ok := wrapped.(cloud.CapacityError)
+ c.Check(ok, check.Equals, true)
+ c.Check(caperr.IsCapacityError(), check.Equals, true)
+ c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
+ }
}
# A price factor of 1.0 is a reasonable starting point.
PreemptiblePriceFactor: 0
+ # When the lowest-priced instance type for a given container is
+ # not available, try other instance types, up to the indicated
+ # maximum price factor.
+ #
+ # For example, with AvailabilityPriceFactor 1.5, if the
+ # lowest-cost instance type A suitable for a given container
+ # costs $2/h, Arvados may run the container on any instance type
+ # B costing $3/h or less when instance type A is not available
+ # or an idle instance of type B is already running.
+ MaximumPriceFactor: 1.5
+
# PEM encoded SSH key (RSA, DSA, or ECDSA) used by the
# cloud dispatcher for executing containers on worker VMs.
# Begins with "-----BEGIN RSA PRIVATE KEY-----\n"
"Containers.LogReuseDecisions": false,
"Containers.LSF": false,
"Containers.MaxDispatchAttempts": false,
+ "Containers.MaximumPriceFactor": true,
"Containers.MaxRetryAttempts": true,
"Containers.MinRetryPeriod": true,
"Containers.PreemptiblePriceFactor": false,
// load at the cost of increased under light load.
const queuedContainersTarget = 100
-type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
// An APIClient performs Arvados API requests. It is typically an
// *arvados.Client.
// The container to run. Only the UUID, State, Priority,
// RuntimeConstraints, ContainerImage, SchedulingParameters,
// and CreatedAt fields are populated.
- Container arvados.Container `json:"container"`
- InstanceType arvados.InstanceType `json:"instance_type"`
- FirstSeenAt time.Time `json:"first_seen_at"`
+ Container arvados.Container `json:"container"`
+ InstanceTypes []arvados.InstanceType `json:"instance_types"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
logger.WithError(err).Warn("error getting mounts")
return
}
- it, err := cq.chooseType(&ctr)
+ types, err := cq.chooseType(&ctr)
// Avoid wasting memory on a large Mounts attr (we don't need
// it after choosing type).
}()
return
}
+ typeNames := ""
+ for _, it := range types {
+ if typeNames != "" {
+ typeNames += ", "
+ }
+ typeNames += it.Name
+ }
cq.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
"State": ctr.State,
"Priority": ctr.Priority,
- "InstanceType": it.Name,
+ "InstanceTypes": typeNames,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
}
// Lock acquires the dispatch lock for the given container.
}
ents, _ := cq.Entries()
for _, ent := range ents {
- count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
}
for k, v := range count {
mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
}
func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
- typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ typeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
- return arvados.InstanceType{Name: "testType"}, nil
+ return []arvados.InstanceType{{Name: "testType"}}, nil
}
client := arvados.NewClientFromEnv()
var wg sync.WaitGroup
for uuid, ent := range ents {
c.Check(ent.Container.UUID, check.Equals, uuid)
- c.Check(ent.InstanceType.Name, check.Equals, "testType")
+ c.Check(ent.InstanceTypes, check.HasLen, 1)
+ c.Check(ent.InstanceTypes[0].Name, check.Equals, "testType")
c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
c.Check(ent.Container.Priority > 0, check.Equals, true)
// Mounts should be deleted to avoid wasting memory
}
func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
- errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ errorTypeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
// Make sure the relevant container fields are
// actually populated.
c.Check(ctr.ContainerImage, check.Equals, "test")
c.Check(ctr.RuntimeConstraints.RAM, check.Equals, int64(12000000000))
c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
c.Check(ctr.Mounts["/var/spool/cwl"].Capacity, check.Equals, int64(24000000000))
- return arvados.InstanceType{}, errors.New("no suitable instance type")
+ return nil, errors.New("no suitable instance type")
}
client := arvados.NewClientFromEnv()
return exr
}
-func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) {
return ChooseInstanceType(disp.Cluster, ctr)
}
"net/url"
"os"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/lib/config"
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
RuntimeEngine: "stub",
MaxDispatchAttempts: 10,
+ MaximumPriceFactor: 1.5,
CloudVMs: arvados.CloudVMsConfig{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
s.disp.setupOnce.Do(s.disp.initialize)
queue := &test.Queue{
MaxDispatchAttempts: 5,
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
return ChooseInstanceType(s.cluster, ctr)
},
Logger: ctxlog.TestLogger(c),
finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
+ var countCapacityErrors int64
n := 0
s.stubDriver.Queue = queue
- s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+ if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+ c.Logf("test: returning capacity error for instance type %s", pt)
+ atomic.AddInt64(&countCapacityErrors, 1)
+ return test.CapacityError{InstanceTypeSpecific: true}
+ }
n++
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.CrunchRunCrashRate = 0.1
stubvm.ArvMountDeadlockRate = 0.1
}
+ return nil
}
s.stubDriver.Bugf = c.Errorf
}
}
+ c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
req := httptest.NewRequest("GET", "/metrics", nil)
req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
resp := httptest.NewRecorder()
import (
"errors"
+ "math"
"regexp"
"sort"
"strconv"
return v1 < v2, nil
}
-// ChooseInstanceType returns the cheapest available
-// arvados.InstanceType big enough to run ctr.
-func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+// ChooseInstanceType returns the arvados.InstanceTypes eligible to
+// run ctr, i.e., those that have enough RAM, VCPUs, etc., and are not
+// too expensive according to cluster configuration.
+//
+// The returned types are sorted with lower prices first.
+//
+// The error is non-nil if and only if the returned slice is empty.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) ([]arvados.InstanceType, error) {
if len(cc.InstanceTypes) == 0 {
- err = ErrInstanceTypesNotConfigured
- return
+ return nil, ErrInstanceTypesNotConfigured
}
needScratch := EstimateScratchSpace(ctr)
}
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
- ok := false
+ maxPriceFactor := math.Max(cc.Containers.MaximumPriceFactor, 1)
+ var types []arvados.InstanceType
+ var maxPrice float64
for _, it := range cc.InstanceTypes {
driverInsuff, driverErr := versionLess(it.CUDA.DriverVersion, ctr.RuntimeConstraints.CUDA.DriverVersion)
capabilityInsuff, capabilityErr := versionLess(it.CUDA.HardwareCapability, ctr.RuntimeConstraints.CUDA.HardwareCapability)
switch {
// reasons to reject a node
- case ok && it.Price > best.Price: // already selected a node, and this one is more expensive
+ case maxPrice > 0 && it.Price > maxPrice: // too expensive
case int64(it.Scratch) < needScratch: // insufficient scratch
case int64(it.RAM) < needRAM: // insufficient RAM
case it.VCPUs < needVCPUs: // insufficient VCPUs
case it.Preemptible != ctr.SchedulingParameters.Preemptible: // wrong preemptable setting
- case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs): // same price, worse specs
case it.CUDA.DeviceCount < ctr.RuntimeConstraints.CUDA.DeviceCount: // insufficient CUDA devices
case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (driverInsuff || driverErr != nil): // insufficient driver version
case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (capabilityInsuff || capabilityErr != nil): // insufficient hardware capability
// Don't select this node
default:
// Didn't reject the node, so select it
- // Lower price || (same price && better specs)
- best = it
- ok = true
+ types = append(types, it)
+ if newmax := it.Price * maxPriceFactor; newmax < maxPrice || maxPrice == 0 {
+ maxPrice = newmax
+ }
}
}
- if !ok {
+ if len(types) == 0 {
availableTypes := make([]arvados.InstanceType, 0, len(cc.InstanceTypes))
for _, t := range cc.InstanceTypes {
availableTypes = append(availableTypes, t)
sort.Slice(availableTypes, func(a, b int) bool {
return availableTypes[a].Price < availableTypes[b].Price
})
- err = ConstraintsNotSatisfiableError{
+ return nil, ConstraintsNotSatisfiableError{
errors.New("constraints not satisfiable by any configured instance type"),
availableTypes,
}
- return
}
- return
+ sort.Slice(types, func(i, j int) bool {
+ if types[i].Price != types[j].Price {
+ // prefer lower price
+ return types[i].Price < types[j].Price
+ }
+ if types[i].RAM != types[j].RAM {
+ // if same price, prefer more RAM
+ return types[i].RAM > types[j].RAM
+ }
+ if types[i].VCPUs != types[j].VCPUs {
+ // if same price and RAM, prefer more VCPUs
+ return types[i].VCPUs > types[j].VCPUs
+ }
+ if types[i].Scratch != types[j].Scratch {
+ // if same price and RAM and VCPUs, prefer more scratch
+ return types[i].Scratch > types[j].Scratch
+ }
+ // no preference, just sort the same way each time
+ return types[i].Name < types[j].Name
+ })
+ // Truncate types at maxPrice. We rejected it.Price>maxPrice
+ // in the loop above, but at that point maxPrice wasn't
+ // necessarily the final (lowest) maxPrice.
+ for i, it := range types {
+ if i > 0 && it.Price > maxPrice {
+ types = types[:i]
+ break
+ }
+ }
+ return types, nil
}
KeepCacheRAM: 123456789,
},
})
- c.Check(err, check.IsNil)
- c.Check(best.Name, check.Equals, "best")
- c.Check(best.RAM >= 1234567890, check.Equals, true)
- c.Check(best.VCPUs >= 2, check.Equals, true)
- c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+ c.Assert(err, check.IsNil)
+ c.Assert(best, check.Not(check.HasLen), 0)
+ c.Check(best[0].Name, check.Equals, "best")
+ c.Check(best[0].RAM >= 1234567890, check.Equals, true)
+ c.Check(best[0].VCPUs >= 2, check.Equals, true)
+ c.Check(best[0].Scratch >= 2*GiB, check.Equals, true)
+ for i := range best {
+ // If multiple instance types are returned
+ // then they should all have the same price,
+ // because we didn't set MaximumPriceFactor>1.
+ c.Check(best[i].Price, check.Equals, best[0].Price)
+ }
+ }
+}
+
+func (*NodeSizeSuite) TestMaximumPriceFactor(c *check.C) {
+ menu := map[string]arvados.InstanceType{
+ "best+7": {Price: 3.4, RAM: 8000000000, VCPUs: 8, Scratch: 64 * GiB, Name: "best+7"},
+ "best+5": {Price: 3.0, RAM: 8000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+5"},
+ "best+3": {Price: 2.6, RAM: 4000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+3"},
+ "best+2": {Price: 2.4, RAM: 4000000000, VCPUs: 8, Scratch: 4 * GiB, Name: "best+2"},
+ "best+1": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 4 * GiB, Name: "best+1"},
+ "best": {Price: 2.0, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+ "small+1": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 16 * GiB, Name: "small+1"},
+ "small": {Price: 1.0, RAM: 2000000000, VCPUs: 2, Scratch: 1 * GiB, Name: "small"},
}
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{
+ MaximumPriceFactor: 1.5,
+ }}, &arvados.Container{
+ Mounts: map[string]arvados.Mount{
+ "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
+ },
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ })
+ c.Assert(err, check.IsNil)
+ c.Assert(best, check.HasLen, 5)
+ c.Check(best[0].Name, check.Equals, "best") // best price is $2
+ c.Check(best[1].Name, check.Equals, "best+1")
+ c.Check(best[2].Name, check.Equals, "best+2")
+ c.Check(best[3].Name, check.Equals, "best+3")
+ c.Check(best[4].Name, check.Equals, "best+5") // max price is $2 * 1.5 = $3
}
func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) {
},
})
c.Check(err, check.IsNil)
- c.Check(best.Name, check.Equals, "best")
+ c.Assert(best, check.HasLen, 1)
+ c.Check(best[0].Name, check.Equals, "best")
}
func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) {
},
})
c.Check(err, check.IsNil)
- c.Check(best.Name, check.Equals, "best")
- c.Check(best.RAM >= 1234567890, check.Equals, true)
- c.Check(best.VCPUs >= 2, check.Equals, true)
- c.Check(best.Scratch >= 2*GiB, check.Equals, true)
- c.Check(best.Preemptible, check.Equals, true)
+ c.Assert(best, check.HasLen, 1)
+ c.Check(best[0].Name, check.Equals, "best")
+ c.Check(best[0].RAM >= 1234567890, check.Equals, true)
+ c.Check(best[0].VCPUs >= 2, check.Equals, true)
+ c.Check(best[0].Scratch >= 2*GiB, check.Equals, true)
+ c.Check(best[0].Preemptible, check.Equals, true)
}
func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) {
CUDA: tc.CUDA,
},
})
- if best.Name != "" {
+ if len(best) > 0 {
c.Check(err, check.IsNil)
- c.Check(best.Name, check.Equals, tc.SelectedInstance)
+ c.Assert(best, check.HasLen, 1)
+ c.Check(best[0].Name, check.Equals, tc.SelectedInstance)
} else {
c.Check(err, check.Not(check.IsNil))
}
tryrun:
for i, ent := range sorted {
- ctr, it := ent.Container, ent.InstanceType
+ ctr, types := ent.Container, ent.InstanceTypes
logger := sch.logger.WithFields(logrus.Fields{
"ContainerUUID": ctr.UUID,
- "InstanceType": it.Name,
})
if ctr.SchedulingParameters.Supervisor {
supervisors += 1
if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
continue
}
+ // If we have unalloc instances of any of the eligible
+ // instance types, unallocOK is true and unallocType
+ // is the lowest-cost type.
+ var unallocOK bool
+ var unallocType arvados.InstanceType
+ for _, it := range types {
+ if unalloc[it] > 0 {
+ unallocOK = true
+ unallocType = it
+ break
+ }
+ }
+ // If the pool is not reporting AtCapacity for any of
+ // the eligible instance types, availableOK is true
+ // and availableType is the lowest-cost type.
+ var availableOK bool
+ var availableType arvados.InstanceType
+ for _, it := range types {
+ if atcapacity[it.ProviderType] {
+ continue
+ } else if sch.pool.AtCapacity(it) {
+ atcapacity[it.ProviderType] = true
+ continue
+ } else {
+ availableOK = true
+ availableType = it
+ break
+ }
+ }
switch ctr.State {
case arvados.ContainerStateQueued:
if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
continue
}
trying++
- if unalloc[it] < 1 && sch.pool.AtQuota() {
+ if !unallocOK && sch.pool.AtQuota() {
logger.Trace("not locking: AtQuota and no unalloc workers")
overquota = sorted[i:]
break tryrun
}
- if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) {
+ if !unallocOK && !availableOK {
logger.Trace("not locking: AtCapacity and no unalloc workers")
- atcapacity[it.ProviderType] = true
continue
}
if sch.pool.KillContainer(ctr.UUID, "about to lock") {
continue
}
go sch.lockContainer(logger, ctr.UUID)
- unalloc[it]--
+ unalloc[unallocType]--
case arvados.ContainerStateLocked:
if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency)
continue
}
trying++
- if unalloc[it] > 0 {
- unalloc[it]--
- } else if sch.pool.AtQuota() {
+ if unallocOK {
+ // We have a suitable instance type,
+ // so mark it as allocated, and try to
+ // start the container.
+ unalloc[unallocType]--
+ logger = logger.WithField("InstanceType", unallocType)
+ if dontstart[unallocType] {
+ // We already tried & failed to start
+ // a higher-priority container on the
+ // same instance type. Don't let this
+ // one sneak in ahead of it.
+ } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
+ logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
+ } else if sch.pool.StartContainer(unallocType, ctr) {
+ logger.Trace("StartContainer => true")
+ } else {
+ logger.Trace("StartContainer => false")
+ containerAllocatedWorkerBootingCount += 1
+ dontstart[unallocType] = true
+ }
+ continue
+ }
+ if sch.pool.AtQuota() {
// Don't let lower-priority containers
// starve this one by using keeping
// idle workers alive on different
logger.Trace("overquota")
overquota = sorted[i:]
break tryrun
- } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) {
+ }
+ if !availableOK {
// Continue trying lower-priority
// containers in case they can run on
// different instance types that are
// container A on the next call to
// runQueue(), rather than run
// container B now.
- //
- // TODO: try running this container on
- // a bigger (but not much more
- // expensive) instance type.
- logger.WithField("InstanceType", it.Name).Trace("at capacity")
- atcapacity[it.ProviderType] = true
+ logger.Trace("all eligible types at capacity")
continue
- } else if sch.pool.Create(it) {
- // Success. (Note pool.Create works
- // asynchronously and does its own
- // logging about the eventual outcome,
- // so we don't need to.)
- logger.Info("creating new instance")
- } else {
+ }
+ logger = logger.WithField("InstanceType", availableType)
+ if !sch.pool.Create(availableType) {
// Failed despite not being at quota,
// e.g., cloud ops throttled.
logger.Trace("pool declined to create new instance")
continue
}
-
- if dontstart[it] {
- // We already tried & failed to start
- // a higher-priority container on the
- // same instance type. Don't let this
- // one sneak in ahead of it.
- } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
- logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
- } else if sch.pool.StartContainer(it, ctr) {
- logger.Trace("StartContainer => true")
- // Success.
- } else {
- logger.Trace("StartContainer => false")
- containerAllocatedWorkerBootingCount += 1
- dontstart[it] = true
- }
+ // Success. (Note pool.Create works
+ // asynchronously and does its own logging
+ // about the eventual outcome, so we don't
+ // need to.)
+ logger.Info("creating new instance")
+ // Don't bother trying to start the container
+ // yet -- obviously the instance will take
+ // some time to boot and become ready.
+ containerAllocatedWorkerBootingCount += 1
+ dontstart[availableType] = true
}
}
return true
}
-func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+func chooseType(ctr *arvados.Container) ([]arvados.InstanceType, error) {
+ return []arvados.InstanceType{test.InstanceType(ctr.RuntimeConstraints.VCPUs)}, nil
}
var _ = check.Suite(&SchedulerSuite{})
// type4, so we skip trying to create an instance for
// container3, skip locking container2, but do try to create a
// type1 instance for container1.
- c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)})
+ c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.shutdowns, check.Equals, 0)
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
c.Check(queue.StateChanges(), check.HasLen, 0)
// ChooseType will be called for each entry in Containers. It
// must not be nil.
- ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+ ChooseType func(*arvados.Container) ([]arvados.InstanceType, error)
// Mimic railsapi implementation of MaxDispatchAttempts config
MaxDispatchAttempts int
ent.Container = ctr
upd[ctr.UUID] = ent
} else {
- it, _ := q.ChooseType(&ctr)
+ types, _ := q.ChooseType(&ctr)
ctr.Mounts = nil
upd[ctr.UUID] = container.QueueEnt{
- Container: ctr,
- InstanceType: it,
- FirstSeenAt: time.Now(),
+ Container: ctr,
+ InstanceTypes: types,
+ FirstSeenAt: time.Now(),
}
}
}
// SetupVM, if set, is called upon creation of each new
// StubVM. This is the caller's opportunity to customize the
// VM's error rate and other behaviors.
- SetupVM func(*StubVM)
+ //
+ // If SetupVM returns an error, that error will be returned to
+ // the caller of Create(), and the new VM will be discarded.
+ SetupVM func(*StubVM) error
// Bugf, if set, is called if a bug is detected in the caller
// or stub. Typically set to (*check.C)Errorf. If unset,
Exec: svm.Exec,
}
if setup := sis.driver.SetupVM; setup != nil {
- setup(svm)
+ err := setup(svm)
+ if err != nil {
+ return nil, err
+ }
}
sis.servers[svm.id] = svm
return svm.Instance(), nil
func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
SupportedDockerImageFormats StringSet
AlwaysUsePreemptibleInstances bool
PreemptiblePriceFactor float64
+ MaximumPriceFactor float64
RuntimeEngine string
LocalKeepBlobBuffersPerVCPU int
LocalKeepLogsToContainerLog string
if disp.cluster == nil {
// no instance types configured
args = append(args, disp.slurmConstraintArgs(container)...)
- } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ } else if types, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
// ditto
args = append(args, disp.slurmConstraintArgs(container)...)
} else if err != nil {
return nil, err
} else {
- // use instancetype constraint instead of slurm mem/cpu/tmp specs
- args = append(args, "--constraint=instancetype="+it.Name)
+ // use instancetype constraint instead of slurm
+ // mem/cpu/tmp specs (note types[0] is the lowest-cost
+ // suitable instance type)
+ args = append(args, "--constraint=instancetype="+types[0].Name)
}
if len(container.SchedulingParameters.Partitions) > 0 {