From ac39afed6cd3de1704d75aecdbc46544b02f02b2 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 7 Nov 2023 11:25:10 -0500 Subject: [PATCH] Merge branch '20978-instance-types' closes #20978 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- doc/admin/upgrading.html.textile.liquid | 20 ++++ lib/cloud/ec2/ec2.go | 33 +++++- lib/cloud/ec2/ec2_test.go | 51 +++++++- lib/config/config.default.yml | 11 ++ lib/config/export.go | 1 + lib/dispatchcloud/container/queue.go | 23 ++-- lib/dispatchcloud/container/queue_test.go | 11 +- lib/dispatchcloud/dispatcher.go | 2 +- lib/dispatchcloud/dispatcher_test.go | 15 ++- lib/dispatchcloud/node_size.go | 65 +++++++--- lib/dispatchcloud/node_size_test.go | 68 +++++++++-- lib/dispatchcloud/scheduler/run_queue.go | 111 ++++++++++++------ lib/dispatchcloud/scheduler/run_queue_test.go | 6 +- lib/dispatchcloud/test/queue.go | 10 +- lib/dispatchcloud/test/stub_driver.go | 16 ++- sdk/go/arvados/config.go | 1 + .../crunch-dispatch-slurm.go | 8 +- 17 files changed, 344 insertions(+), 108 deletions(-) diff --git a/doc/admin/upgrading.html.textile.liquid b/doc/admin/upgrading.html.textile.liquid index 46b008ca70..3e7428eb17 100644 --- a/doc/admin/upgrading.html.textile.liquid +++ b/doc/admin/upgrading.html.textile.liquid @@ -28,6 +28,26 @@ TODO: extract this information based on git commit messages and generate changel
+h2(#2_7_1). v2.7.1 (2023-11-29) + +"previous: Upgrading to 2.7.0":#v2_7_0 + +h3. Check implications of Containers.MaximumPriceFactor 1.5 + +When scheduling a container, Arvados now considers using instance types other than the lowest-cost type consistent with the container's resource constraints. If a larger instance is already running and idle, or the cloud provider reports that the optimal instance type is not currently available, Arvados will select a larger instance type, provided the cost does not exceed 1.5x the optimal instance type cost. + +This will typically reduce overall latency for containers and reduce instance booting/shutdown overhead, but may increase costs depending on workload and instance availability. To avoid this behavior, configure @Containers.MaximumPriceFactor: 1.0@. + +h3. Synchronize keepstore and keep-balance upgrades + +The internal communication between keepstore and keep-balance about read-only volumes has changed. After keep-balance is upgraded, old versions of keepstore will be treated as read-only. We recommend upgrading and restarting all keepstore services first, then upgrading and restarting keep-balance. + +h3. Separate configs for MaxConcurrentRequests and MaxConcurrentRailsRequests + +The default configuration value @API.MaxConcurrentRequests@ (the number of concurrent requests that will be processed by a single instance of an arvados service process) is raised from 8 to 64. + +A new configuration key @API.MaxConcurrentRailsRequests@ (default 8) limits the number of concurrent requests processed by a RailsAPI service process. + h2(#v2_7_0). v2.7.0 (2023-09-21) "previous: Upgrading to 2.6.3":#v2_6_3 diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go index 816df48d90..0d181be0e9 100644 --- a/lib/cloud/ec2/ec2.go +++ b/lib/cloud/ec2/ec2.go @@ -288,7 +288,7 @@ func (instanceSet *ec2InstanceSet) Create( } var rsv *ec2.Reservation - var err error + var errToReturn error subnets := instanceSet.ec2config.SubnetID currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex)) for tryOffset := 0; ; tryOffset++ { @@ -299,8 +299,15 @@ func (instanceSet *ec2InstanceSet) Create( trySubnet = subnets[tryIndex] rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet) } + var err error rsv, err = instanceSet.client.RunInstances(&rii) instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1) + if !isErrorCapacity(errToReturn) || isErrorCapacity(err) { + // We want to return the last capacity error, + // if any; otherwise the last non-capacity + // error. + errToReturn = err + } if isErrorSubnetSpecific(err) && tryOffset < len(subnets)-1 { instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]). @@ -320,9 +327,8 @@ func (instanceSet *ec2InstanceSet) Create( atomic.StoreInt32(&instanceSet.currentSubnetIDIndex, int32(tryIndex)) break } - err = wrapError(err, &instanceSet.throttleDelayCreate) - if err != nil { - return nil, err + if rsv == nil { + return nil, wrapError(errToReturn, &instanceSet.throttleDelayCreate) } return &ec2Instance{ provider: instanceSet, @@ -711,7 +717,22 @@ func isErrorSubnetSpecific(err error) bool { code := aerr.Code() return strings.Contains(code, "Subnet") || code == "InsufficientInstanceCapacity" || - code == "InsufficientVolumeCapacity" + code == "InsufficientVolumeCapacity" || + code == "Unsupported" +} + +// isErrorCapacity returns true if the error indicates lack of +// capacity (either temporary or permanent) to run a specific instance +// type -- i.e., retrying with a different instance type might +// succeed. +func isErrorCapacity(err error) bool { + aerr, ok := err.(awserr.Error) + if !ok { + return false + } + code := aerr.Code() + return code == "InsufficientInstanceCapacity" || + (code == "Unsupported" && strings.Contains(aerr.Message(), "requested instance type")) } type ec2QuotaError struct { @@ -737,7 +758,7 @@ func wrapError(err error, throttleValue *atomic.Value) error { return rateLimitError{error: err, earliestRetry: time.Now().Add(d)} } else if isErrorQuota(err) { return &ec2QuotaError{err} - } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" { + } else if isErrorCapacity(err) { return &capacityError{err, true} } else if err != nil { throttleValue.Store(time.Duration(0)) diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go index a57fcebf76..4b83005896 100644 --- a/lib/cloud/ec2/ec2_test.go +++ b/lib/cloud/ec2/ec2_test.go @@ -399,6 +399,37 @@ func (*EC2InstanceSetSuite) TestCreateAllSubnetsFailing(c *check.C) { `.*`) } +func (*EC2InstanceSetSuite) TestCreateOneSubnetFailingCapacity(c *check.C) { + if *live != "" { + c.Skip("not applicable in live mode") + return + } + ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`) + ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{ + "subnet-full": &ec2stubError{ + code: "InsufficientFreeAddressesInSubnet", + message: "subnet is full", + }, + "subnet-broken": &ec2stubError{ + code: "InsufficientInstanceCapacity", + message: "insufficient capacity", + }, + } + for i := 0; i < 3; i++ { + _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil) + c.Check(err, check.NotNil) + c.Check(err, check.ErrorMatches, `.*InsufficientInstanceCapacity.*`) + } + c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 6) + metrics := arvadostest.GatherMetricsAsString(reg) + c.Check(metrics, check.Matches, `(?ms).*`+ + `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 3\n`+ + `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+ + `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 3\n`+ + `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+ + `.*`) +} + func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) { ap, _, _, _ := GetInstanceSet(c, "{}") l, err := ap.Instances(nil) @@ -513,10 +544,18 @@ func (*EC2InstanceSetSuite) TestWrapError(c *check.C) { _, ok = wrapped.(cloud.QuotaError) c.Check(ok, check.Equals, true) - capacityError := awserr.New("InsufficientInstanceCapacity", "", nil) - wrapped = wrapError(capacityError, nil) - caperr, ok := wrapped.(cloud.CapacityError) - c.Check(ok, check.Equals, true) - c.Check(caperr.IsCapacityError(), check.Equals, true) - c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true) + for _, trial := range []struct { + code string + msg string + }{ + {"InsufficientInstanceCapacity", ""}, + {"Unsupported", "Your requested instance type (t3.micro) is not supported in your requested Availability Zone (us-east-1e). Please retry your request by not specifying an Availability Zone or choosing us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f."}, + } { + capacityError := awserr.New(trial.code, trial.msg, nil) + wrapped = wrapError(capacityError, nil) + caperr, ok := wrapped.(cloud.CapacityError) + c.Check(ok, check.Equals, true) + c.Check(caperr.IsCapacityError(), check.Equals, true) + c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true) + } } diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 32727b1bce..c10bf0e0f2 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -1102,6 +1102,17 @@ Clusters: # A price factor of 1.0 is a reasonable starting point. PreemptiblePriceFactor: 0 + # When the lowest-priced instance type for a given container is + # not available, try other instance types, up to the indicated + # maximum price factor. + # + # For example, with AvailabilityPriceFactor 1.5, if the + # lowest-cost instance type A suitable for a given container + # costs $2/h, Arvados may run the container on any instance type + # B costing $3/h or less when instance type A is not available + # or an idle instance of type B is already running. + MaximumPriceFactor: 1.5 + # PEM encoded SSH key (RSA, DSA, or ECDSA) used by the # cloud dispatcher for executing containers on worker VMs. # Begins with "-----BEGIN RSA PRIVATE KEY-----\n" diff --git a/lib/config/export.go b/lib/config/export.go index 88c64f69a1..3b577b023f 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -135,6 +135,7 @@ var whitelist = map[string]bool{ "Containers.LogReuseDecisions": false, "Containers.LSF": false, "Containers.MaxDispatchAttempts": false, + "Containers.MaximumPriceFactor": true, "Containers.MaxRetryAttempts": true, "Containers.MinRetryPeriod": true, "Containers.PreemptiblePriceFactor": false, diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 77752013e8..8d8b7ff9af 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -22,7 +22,7 @@ import ( // load at the cost of increased under light load. const queuedContainersTarget = 100 -type typeChooser func(*arvados.Container) (arvados.InstanceType, error) +type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error) // An APIClient performs Arvados API requests. It is typically an // *arvados.Client. @@ -36,9 +36,9 @@ type QueueEnt struct { // The container to run. Only the UUID, State, Priority, // RuntimeConstraints, ContainerImage, SchedulingParameters, // and CreatedAt fields are populated. - Container arvados.Container `json:"container"` - InstanceType arvados.InstanceType `json:"instance_type"` - FirstSeenAt time.Time `json:"first_seen_at"` + Container arvados.Container `json:"container"` + InstanceTypes []arvados.InstanceType `json:"instance_types"` + FirstSeenAt time.Time `json:"first_seen_at"` } // String implements fmt.Stringer by returning the queued container's @@ -252,7 +252,7 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { logger.WithError(err).Warn("error getting mounts") return } - it, err := cq.chooseType(&ctr) + types, err := cq.chooseType(&ctr) // Avoid wasting memory on a large Mounts attr (we don't need // it after choosing type). @@ -304,13 +304,20 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { }() return } + typeNames := "" + for _, it := range types { + if typeNames != "" { + typeNames += ", " + } + typeNames += it.Name + } cq.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "State": ctr.State, "Priority": ctr.Priority, - "InstanceType": it.Name, + "InstanceTypes": typeNames, }).Info("adding container to queue") - cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()} + cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()} } // Lock acquires the dispatch lock for the given container. @@ -577,7 +584,7 @@ func (cq *Queue) runMetrics(reg *prometheus.Registry) { } ents, _ := cq.Entries() for _, ent := range ents { - count[entKey{ent.Container.State, ent.InstanceType.Name}]++ + count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++ } for k, v := range count { mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v)) diff --git a/lib/dispatchcloud/container/queue_test.go b/lib/dispatchcloud/container/queue_test.go index ca10983534..928c6dd8c8 100644 --- a/lib/dispatchcloud/container/queue_test.go +++ b/lib/dispatchcloud/container/queue_test.go @@ -40,9 +40,9 @@ func (suite *IntegrationSuite) TearDownTest(c *check.C) { } func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) { - typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) { + typeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) { c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000)) - return arvados.InstanceType{Name: "testType"}, nil + return []arvados.InstanceType{{Name: "testType"}}, nil } client := arvados.NewClientFromEnv() @@ -62,7 +62,8 @@ func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) { var wg sync.WaitGroup for uuid, ent := range ents { c.Check(ent.Container.UUID, check.Equals, uuid) - c.Check(ent.InstanceType.Name, check.Equals, "testType") + c.Check(ent.InstanceTypes, check.HasLen, 1) + c.Check(ent.InstanceTypes[0].Name, check.Equals, "testType") c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued) c.Check(ent.Container.Priority > 0, check.Equals, true) // Mounts should be deleted to avoid wasting memory @@ -108,7 +109,7 @@ func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) { } func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) { - errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) { + errorTypeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) { // Make sure the relevant container fields are // actually populated. c.Check(ctr.ContainerImage, check.Equals, "test") @@ -116,7 +117,7 @@ func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) { c.Check(ctr.RuntimeConstraints.RAM, check.Equals, int64(12000000000)) c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000)) c.Check(ctr.Mounts["/var/spool/cwl"].Capacity, check.Equals, int64(24000000000)) - return arvados.InstanceType{}, errors.New("no suitable instance type") + return nil, errors.New("no suitable instance type") } client := arvados.NewClientFromEnv() diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index 49be9e68a2..47e60abdee 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -111,7 +111,7 @@ func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor { return exr } -func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) { +func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) { return ChooseInstanceType(disp.Cluster, ctr) } diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 6c057edc70..33d7f4e9ac 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/lib/config" @@ -72,6 +73,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { StaleLockTimeout: arvados.Duration(5 * time.Millisecond), RuntimeEngine: "stub", MaxDispatchAttempts: 10, + MaximumPriceFactor: 1.5, CloudVMs: arvados.CloudVMsConfig{ Driver: "test", SyncInterval: arvados.Duration(10 * time.Millisecond), @@ -160,7 +162,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { s.disp.setupOnce.Do(s.disp.initialize) queue := &test.Queue{ MaxDispatchAttempts: 5, - ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) { return ChooseInstanceType(s.cluster, ctr) }, Logger: ctxlog.TestLogger(c), @@ -205,9 +207,15 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { finishContainer(ctr) return int(rand.Uint32() & 0x3) } + var countCapacityErrors int64 n := 0 s.stubDriver.Queue = queue - s.stubDriver.SetupVM = func(stubvm *test.StubVM) { + s.stubDriver.SetupVM = func(stubvm *test.StubVM) error { + if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType { + c.Logf("test: returning capacity error for instance type %s", pt) + atomic.AddInt64(&countCapacityErrors, 1) + return test.CapacityError{InstanceTypeSpecific: true} + } n++ stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) @@ -235,6 +243,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { stubvm.CrunchRunCrashRate = 0.1 stubvm.ArvMountDeadlockRate = 0.1 } + return nil } s.stubDriver.Bugf = c.Errorf @@ -270,6 +279,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { } } + c.Check(countCapacityErrors, check.Not(check.Equals), int64(0)) + req := httptest.NewRequest("GET", "/metrics", nil) req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken) resp := httptest.NewRecorder() diff --git a/lib/dispatchcloud/node_size.go b/lib/dispatchcloud/node_size.go index 0b394f4cfe..802bc65c28 100644 --- a/lib/dispatchcloud/node_size.go +++ b/lib/dispatchcloud/node_size.go @@ -6,6 +6,7 @@ package dispatchcloud import ( "errors" + "math" "regexp" "sort" "strconv" @@ -99,12 +100,16 @@ func versionLess(vs1 string, vs2 string) (bool, error) { return v1 < v2, nil } -// ChooseInstanceType returns the cheapest available -// arvados.InstanceType big enough to run ctr. -func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) { +// ChooseInstanceType returns the arvados.InstanceTypes eligible to +// run ctr, i.e., those that have enough RAM, VCPUs, etc., and are not +// too expensive according to cluster configuration. +// +// The returned types are sorted with lower prices first. +// +// The error is non-nil if and only if the returned slice is empty. +func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) ([]arvados.InstanceType, error) { if len(cc.InstanceTypes) == 0 { - err = ErrInstanceTypesNotConfigured - return + return nil, ErrInstanceTypesNotConfigured } needScratch := EstimateScratchSpace(ctr) @@ -121,31 +126,33 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad } needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent) - ok := false + maxPriceFactor := math.Max(cc.Containers.MaximumPriceFactor, 1) + var types []arvados.InstanceType + var maxPrice float64 for _, it := range cc.InstanceTypes { driverInsuff, driverErr := versionLess(it.CUDA.DriverVersion, ctr.RuntimeConstraints.CUDA.DriverVersion) capabilityInsuff, capabilityErr := versionLess(it.CUDA.HardwareCapability, ctr.RuntimeConstraints.CUDA.HardwareCapability) switch { // reasons to reject a node - case ok && it.Price > best.Price: // already selected a node, and this one is more expensive + case maxPrice > 0 && it.Price > maxPrice: // too expensive case int64(it.Scratch) < needScratch: // insufficient scratch case int64(it.RAM) < needRAM: // insufficient RAM case it.VCPUs < needVCPUs: // insufficient VCPUs case it.Preemptible != ctr.SchedulingParameters.Preemptible: // wrong preemptable setting - case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs): // same price, worse specs case it.CUDA.DeviceCount < ctr.RuntimeConstraints.CUDA.DeviceCount: // insufficient CUDA devices case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (driverInsuff || driverErr != nil): // insufficient driver version case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (capabilityInsuff || capabilityErr != nil): // insufficient hardware capability // Don't select this node default: // Didn't reject the node, so select it - // Lower price || (same price && better specs) - best = it - ok = true + types = append(types, it) + if newmax := it.Price * maxPriceFactor; newmax < maxPrice || maxPrice == 0 { + maxPrice = newmax + } } } - if !ok { + if len(types) == 0 { availableTypes := make([]arvados.InstanceType, 0, len(cc.InstanceTypes)) for _, t := range cc.InstanceTypes { availableTypes = append(availableTypes, t) @@ -153,11 +160,39 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad sort.Slice(availableTypes, func(a, b int) bool { return availableTypes[a].Price < availableTypes[b].Price }) - err = ConstraintsNotSatisfiableError{ + return nil, ConstraintsNotSatisfiableError{ errors.New("constraints not satisfiable by any configured instance type"), availableTypes, } - return } - return + sort.Slice(types, func(i, j int) bool { + if types[i].Price != types[j].Price { + // prefer lower price + return types[i].Price < types[j].Price + } + if types[i].RAM != types[j].RAM { + // if same price, prefer more RAM + return types[i].RAM > types[j].RAM + } + if types[i].VCPUs != types[j].VCPUs { + // if same price and RAM, prefer more VCPUs + return types[i].VCPUs > types[j].VCPUs + } + if types[i].Scratch != types[j].Scratch { + // if same price and RAM and VCPUs, prefer more scratch + return types[i].Scratch > types[j].Scratch + } + // no preference, just sort the same way each time + return types[i].Name < types[j].Name + }) + // Truncate types at maxPrice. We rejected it.Price>maxPrice + // in the loop above, but at that point maxPrice wasn't + // necessarily the final (lowest) maxPrice. + for i, it := range types { + if i > 0 && it.Price > maxPrice { + types = types[:i] + break + } + } + return types, nil } diff --git a/lib/dispatchcloud/node_size_test.go b/lib/dispatchcloud/node_size_test.go index 86bfbec7b6..5d2713e982 100644 --- a/lib/dispatchcloud/node_size_test.go +++ b/lib/dispatchcloud/node_size_test.go @@ -93,12 +93,51 @@ func (*NodeSizeSuite) TestChoose(c *check.C) { KeepCacheRAM: 123456789, }, }) - c.Check(err, check.IsNil) - c.Check(best.Name, check.Equals, "best") - c.Check(best.RAM >= 1234567890, check.Equals, true) - c.Check(best.VCPUs >= 2, check.Equals, true) - c.Check(best.Scratch >= 2*GiB, check.Equals, true) + c.Assert(err, check.IsNil) + c.Assert(best, check.Not(check.HasLen), 0) + c.Check(best[0].Name, check.Equals, "best") + c.Check(best[0].RAM >= 1234567890, check.Equals, true) + c.Check(best[0].VCPUs >= 2, check.Equals, true) + c.Check(best[0].Scratch >= 2*GiB, check.Equals, true) + for i := range best { + // If multiple instance types are returned + // then they should all have the same price, + // because we didn't set MaximumPriceFactor>1. + c.Check(best[i].Price, check.Equals, best[0].Price) + } + } +} + +func (*NodeSizeSuite) TestMaximumPriceFactor(c *check.C) { + menu := map[string]arvados.InstanceType{ + "best+7": {Price: 3.4, RAM: 8000000000, VCPUs: 8, Scratch: 64 * GiB, Name: "best+7"}, + "best+5": {Price: 3.0, RAM: 8000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+5"}, + "best+3": {Price: 2.6, RAM: 4000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+3"}, + "best+2": {Price: 2.4, RAM: 4000000000, VCPUs: 8, Scratch: 4 * GiB, Name: "best+2"}, + "best+1": {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 4 * GiB, Name: "best+1"}, + "best": {Price: 2.0, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"}, + "small+1": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 16 * GiB, Name: "small+1"}, + "small": {Price: 1.0, RAM: 2000000000, VCPUs: 2, Scratch: 1 * GiB, Name: "small"}, } + best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{ + MaximumPriceFactor: 1.5, + }}, &arvados.Container{ + Mounts: map[string]arvados.Mount{ + "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)}, + }, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 987654321, + KeepCacheRAM: 123456789, + }, + }) + c.Assert(err, check.IsNil) + c.Assert(best, check.HasLen, 5) + c.Check(best[0].Name, check.Equals, "best") // best price is $2 + c.Check(best[1].Name, check.Equals, "best+1") + c.Check(best[2].Name, check.Equals, "best+2") + c.Check(best[3].Name, check.Equals, "best+3") + c.Check(best[4].Name, check.Equals, "best+5") // max price is $2 * 1.5 = $3 } func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) { @@ -121,7 +160,8 @@ func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) { }, }) c.Check(err, check.IsNil) - c.Check(best.Name, check.Equals, "best") + c.Assert(best, check.HasLen, 1) + c.Check(best[0].Name, check.Equals, "best") } func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) { @@ -145,11 +185,12 @@ func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) { }, }) c.Check(err, check.IsNil) - c.Check(best.Name, check.Equals, "best") - c.Check(best.RAM >= 1234567890, check.Equals, true) - c.Check(best.VCPUs >= 2, check.Equals, true) - c.Check(best.Scratch >= 2*GiB, check.Equals, true) - c.Check(best.Preemptible, check.Equals, true) + c.Assert(best, check.HasLen, 1) + c.Check(best[0].Name, check.Equals, "best") + c.Check(best[0].RAM >= 1234567890, check.Equals, true) + c.Check(best[0].VCPUs >= 2, check.Equals, true) + c.Check(best[0].Scratch >= 2*GiB, check.Equals, true) + c.Check(best[0].Preemptible, check.Equals, true) } func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) { @@ -252,9 +293,10 @@ func (*NodeSizeSuite) TestChooseGPU(c *check.C) { CUDA: tc.CUDA, }, }) - if best.Name != "" { + if len(best) > 0 { c.Check(err, check.IsNil) - c.Check(best.Name, check.Equals, tc.SelectedInstance) + c.Assert(best, check.HasLen, 1) + c.Check(best[0].Name, check.Equals, tc.SelectedInstance) } else { c.Check(err, check.Not(check.IsNil)) } diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 3505c3e064..2f1f175890 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -163,10 +163,9 @@ func (sch *Scheduler) runQueue() { tryrun: for i, ent := range sorted { - ctr, it := ent.Container, ent.InstanceType + ctr, types := ent.Container, ent.InstanceTypes logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, - "InstanceType": it.Name, }) if ctr.SchedulingParameters.Supervisor { supervisors += 1 @@ -178,6 +177,35 @@ tryrun: if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { continue } + // If we have unalloc instances of any of the eligible + // instance types, unallocOK is true and unallocType + // is the lowest-cost type. + var unallocOK bool + var unallocType arvados.InstanceType + for _, it := range types { + if unalloc[it] > 0 { + unallocOK = true + unallocType = it + break + } + } + // If the pool is not reporting AtCapacity for any of + // the eligible instance types, availableOK is true + // and availableType is the lowest-cost type. + var availableOK bool + var availableType arvados.InstanceType + for _, it := range types { + if atcapacity[it.ProviderType] { + continue + } else if sch.pool.AtCapacity(it) { + atcapacity[it.ProviderType] = true + continue + } else { + availableOK = true + availableType = it + break + } + } switch ctr.State { case arvados.ContainerStateQueued: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { @@ -185,14 +213,13 @@ tryrun: continue } trying++ - if unalloc[it] < 1 && sch.pool.AtQuota() { + if !unallocOK && sch.pool.AtQuota() { logger.Trace("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] break tryrun } - if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) { + if !unallocOK && !availableOK { logger.Trace("not locking: AtCapacity and no unalloc workers") - atcapacity[it.ProviderType] = true continue } if sch.pool.KillContainer(ctr.UUID, "about to lock") { @@ -200,16 +227,36 @@ tryrun: continue } go sch.lockContainer(logger, ctr.UUID) - unalloc[it]-- + unalloc[unallocType]-- case arvados.ContainerStateLocked: if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency { logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency) continue } trying++ - if unalloc[it] > 0 { - unalloc[it]-- - } else if sch.pool.AtQuota() { + if unallocOK { + // We have a suitable instance type, + // so mark it as allocated, and try to + // start the container. + unalloc[unallocType]-- + logger = logger.WithField("InstanceType", unallocType) + if dontstart[unallocType] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.KillContainer(ctr.UUID, "about to start") { + logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") + } else if sch.pool.StartContainer(unallocType, ctr) { + logger.Trace("StartContainer => true") + } else { + logger.Trace("StartContainer => false") + containerAllocatedWorkerBootingCount += 1 + dontstart[unallocType] = true + } + continue + } + if sch.pool.AtQuota() { // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -217,7 +264,8 @@ tryrun: logger.Trace("overquota") overquota = sorted[i:] break tryrun - } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) { + } + if !availableOK { // Continue trying lower-priority // containers in case they can run on // different instance types that are @@ -231,41 +279,26 @@ tryrun: // container A on the next call to // runQueue(), rather than run // container B now. - // - // TODO: try running this container on - // a bigger (but not much more - // expensive) instance type. - logger.WithField("InstanceType", it.Name).Trace("at capacity") - atcapacity[it.ProviderType] = true + logger.Trace("all eligible types at capacity") continue - } else if sch.pool.Create(it) { - // Success. (Note pool.Create works - // asynchronously and does its own - // logging about the eventual outcome, - // so we don't need to.) - logger.Info("creating new instance") - } else { + } + logger = logger.WithField("InstanceType", availableType) + if !sch.pool.Create(availableType) { // Failed despite not being at quota, // e.g., cloud ops throttled. logger.Trace("pool declined to create new instance") continue } - - if dontstart[it] { - // We already tried & failed to start - // a higher-priority container on the - // same instance type. Don't let this - // one sneak in ahead of it. - } else if sch.pool.KillContainer(ctr.UUID, "about to start") { - logger.Info("not restarting yet: crunch-run process from previous attempt has not exited") - } else if sch.pool.StartContainer(it, ctr) { - logger.Trace("StartContainer => true") - // Success. - } else { - logger.Trace("StartContainer => false") - containerAllocatedWorkerBootingCount += 1 - dontstart[it] = true - } + // Success. (Note pool.Create works + // asynchronously and does its own logging + // about the eventual outcome, so we don't + // need to.) + logger.Info("creating new instance") + // Don't bother trying to start the container + // yet -- obviously the instance will take + // some time to boot and become ready. + containerAllocatedWorkerBootingCount += 1 + dontstart[availableType] = true } } diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go index da6955029a..e4a05daba5 100644 --- a/lib/dispatchcloud/scheduler/run_queue_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -139,8 +139,8 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container return true } -func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) { - return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil +func chooseType(ctr *arvados.Container) ([]arvados.InstanceType, error) { + return []arvados.InstanceType{test.InstanceType(ctr.RuntimeConstraints.VCPUs)}, nil } var _ = check.Suite(&SchedulerSuite{}) @@ -364,7 +364,7 @@ func (*SchedulerSuite) TestInstanceCapacity(c *check.C) { // type4, so we skip trying to create an instance for // container3, skip locking container2, but do try to create a // type1 instance for container1. - c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)}) + c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)}) c.Check(pool.shutdowns, check.Equals, 0) c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)}) c.Check(queue.StateChanges(), check.HasLen, 0) diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 2be8246bd6..ea2b98236f 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -22,7 +22,7 @@ type Queue struct { // ChooseType will be called for each entry in Containers. It // must not be nil. - ChooseType func(*arvados.Container) (arvados.InstanceType, error) + ChooseType func(*arvados.Container) ([]arvados.InstanceType, error) // Mimic railsapi implementation of MaxDispatchAttempts config MaxDispatchAttempts int @@ -167,12 +167,12 @@ func (q *Queue) Update() error { ent.Container = ctr upd[ctr.UUID] = ent } else { - it, _ := q.ChooseType(&ctr) + types, _ := q.ChooseType(&ctr) ctr.Mounts = nil upd[ctr.UUID] = container.QueueEnt{ - Container: ctr, - InstanceType: it, - FirstSeenAt: time.Now(), + Container: ctr, + InstanceTypes: types, + FirstSeenAt: time.Now(), } } } diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index 6e0b129487..0a74d97606 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -34,7 +34,10 @@ type StubDriver struct { // SetupVM, if set, is called upon creation of each new // StubVM. This is the caller's opportunity to customize the // VM's error rate and other behaviors. - SetupVM func(*StubVM) + // + // If SetupVM returns an error, that error will be returned to + // the caller of Create(), and the new VM will be discarded. + SetupVM func(*StubVM) error // Bugf, if set, is called if a bug is detected in the caller // or stub. Typically set to (*check.C)Errorf. If unset, @@ -152,7 +155,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, Exec: svm.Exec, } if setup := sis.driver.SetupVM; setup != nil { - setup(svm) + err := setup(svm) + if err != nil { + return nil, err + } } sis.servers[svm.id] = svm return svm.Instance(), nil @@ -195,6 +201,12 @@ type RateLimitError struct{ Retry time.Time } func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) } func (e RateLimitError) EarliestRetry() time.Time { return e.Retry } +type CapacityError struct{ InstanceTypeSpecific bool } + +func (e CapacityError) Error() string { return "insufficient capacity" } +func (e CapacityError) IsCapacityError() bool { return true } +func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific } + // StubVM is a fake server that runs an SSH service. It represents a // VM running in a fake cloud. // diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index a3e54952da..0afcf2be3c 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -515,6 +515,7 @@ type ContainersConfig struct { SupportedDockerImageFormats StringSet AlwaysUsePreemptibleInstances bool PreemptiblePriceFactor float64 + MaximumPriceFactor float64 RuntimeEngine string LocalKeepBlobBuffersPerVCPU int LocalKeepLogsToContainerLog string diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 1c0f6ad28f..5a9ef91c3d 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -197,14 +197,16 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error if disp.cluster == nil { // no instance types configured args = append(args, disp.slurmConstraintArgs(container)...) - } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured { + } else if types, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured { // ditto args = append(args, disp.slurmConstraintArgs(container)...) } else if err != nil { return nil, err } else { - // use instancetype constraint instead of slurm mem/cpu/tmp specs - args = append(args, "--constraint=instancetype="+it.Name) + // use instancetype constraint instead of slurm + // mem/cpu/tmp specs (note types[0] is the lowest-cost + // suitable instance type) + args = append(args, "--constraint=instancetype="+types[0].Name) } if len(container.SchedulingParameters.Partitions) > 0 { -- 2.30.2