Merge branch '20978-instance-types'
authorTom Clegg <tom@curii.com>
Tue, 7 Nov 2023 16:25:10 +0000 (11:25 -0500)
committerTom Clegg <tom@curii.com>
Tue, 7 Nov 2023 16:25:10 +0000 (11:25 -0500)
closes #20978

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

17 files changed:
doc/admin/upgrading.html.textile.liquid
lib/cloud/ec2/ec2.go
lib/cloud/ec2/ec2_test.go
lib/config/config.default.yml
lib/config/export.go
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/container/queue_test.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/node_size.go
lib/dispatchcloud/node_size_test.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/test/queue.go
lib/dispatchcloud/test/stub_driver.go
sdk/go/arvados/config.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

index 32b3bf701cf0f82ad46334ab128e1065303082f7..e8441dd5f0f6c3203232d0bcc2985fbbf36c44d6 100644 (file)
@@ -32,6 +32,12 @@ h2(#main). development main
 
 "previous: Upgrading to 2.7.0":#v2_7_0
 
+h3. Check implications of Containers.MaximumPriceFactor 1.5
+
+When scheduling a container, Arvados now considers using instance types other than the lowest-cost type consistent with the container's resource constraints. If a larger instance is already running and idle, or the cloud provider reports that the optimal instance type is not currently available, Arvados will select a larger instance type, provided the cost does not exceed 1.5x the optimal instance type cost.
+
+This will typically reduce overall latency for containers and reduce instance booting/shutdown overhead, but may increase costs depending on workload and instance availability. To avoid this behavior, configure @Containers.MaximumPriceFactor: 1.0@.
+
 h3. Synchronize keepstore and keep-balance upgrades
 
 The internal communication between keepstore and keep-balance about read-only volumes has changed. After keep-balance is upgraded, old versions of keepstore will be treated as read-only. We recommend upgrading and restarting all keepstore services first, then upgrading and restarting keep-balance.
index 816df48d90c7dced2bacc1c864ea71da8dccd3d2..0d181be0e9f87212f5f833b35e8e9220476d4815 100644 (file)
@@ -288,7 +288,7 @@ func (instanceSet *ec2InstanceSet) Create(
        }
 
        var rsv *ec2.Reservation
-       var err error
+       var errToReturn error
        subnets := instanceSet.ec2config.SubnetID
        currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
        for tryOffset := 0; ; tryOffset++ {
@@ -299,8 +299,15 @@ func (instanceSet *ec2InstanceSet) Create(
                        trySubnet = subnets[tryIndex]
                        rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
                }
+               var err error
                rsv, err = instanceSet.client.RunInstances(&rii)
                instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
+               if !isErrorCapacity(errToReturn) || isErrorCapacity(err) {
+                       // We want to return the last capacity error,
+                       // if any; otherwise the last non-capacity
+                       // error.
+                       errToReturn = err
+               }
                if isErrorSubnetSpecific(err) &&
                        tryOffset < len(subnets)-1 {
                        instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
@@ -320,9 +327,8 @@ func (instanceSet *ec2InstanceSet) Create(
                atomic.StoreInt32(&instanceSet.currentSubnetIDIndex, int32(tryIndex))
                break
        }
-       err = wrapError(err, &instanceSet.throttleDelayCreate)
-       if err != nil {
-               return nil, err
+       if rsv == nil {
+               return nil, wrapError(errToReturn, &instanceSet.throttleDelayCreate)
        }
        return &ec2Instance{
                provider: instanceSet,
@@ -711,7 +717,22 @@ func isErrorSubnetSpecific(err error) bool {
        code := aerr.Code()
        return strings.Contains(code, "Subnet") ||
                code == "InsufficientInstanceCapacity" ||
-               code == "InsufficientVolumeCapacity"
+               code == "InsufficientVolumeCapacity" ||
+               code == "Unsupported"
+}
+
+// isErrorCapacity returns true if the error indicates lack of
+// capacity (either temporary or permanent) to run a specific instance
+// type -- i.e., retrying with a different instance type might
+// succeed.
+func isErrorCapacity(err error) bool {
+       aerr, ok := err.(awserr.Error)
+       if !ok {
+               return false
+       }
+       code := aerr.Code()
+       return code == "InsufficientInstanceCapacity" ||
+               (code == "Unsupported" && strings.Contains(aerr.Message(), "requested instance type"))
 }
 
 type ec2QuotaError struct {
@@ -737,7 +758,7 @@ func wrapError(err error, throttleValue *atomic.Value) error {
                return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
        } else if isErrorQuota(err) {
                return &ec2QuotaError{err}
-       } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+       } else if isErrorCapacity(err) {
                return &capacityError{err, true}
        } else if err != nil {
                throttleValue.Store(time.Duration(0))
index a57fcebf76874bea175462da8c272ee746991b82..4b830058963b93cfc508ee1795e65d22d3d70af9 100644 (file)
@@ -399,6 +399,37 @@ func (*EC2InstanceSetSuite) TestCreateAllSubnetsFailing(c *check.C) {
                `.*`)
 }
 
+func (*EC2InstanceSetSuite) TestCreateOneSubnetFailingCapacity(c *check.C) {
+       if *live != "" {
+               c.Skip("not applicable in live mode")
+               return
+       }
+       ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
+       ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+               "subnet-full": &ec2stubError{
+                       code:    "InsufficientFreeAddressesInSubnet",
+                       message: "subnet is full",
+               },
+               "subnet-broken": &ec2stubError{
+                       code:    "InsufficientInstanceCapacity",
+                       message: "insufficient capacity",
+               },
+       }
+       for i := 0; i < 3; i++ {
+               _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+               c.Check(err, check.NotNil)
+               c.Check(err, check.ErrorMatches, `.*InsufficientInstanceCapacity.*`)
+       }
+       c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 6)
+       metrics := arvadostest.GatherMetricsAsString(reg)
+       c.Check(metrics, check.Matches, `(?ms).*`+
+               `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 3\n`+
+               `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+               `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 3\n`+
+               `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+               `.*`)
+}
+
 func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
        ap, _, _, _ := GetInstanceSet(c, "{}")
        l, err := ap.Instances(nil)
@@ -513,10 +544,18 @@ func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
        _, ok = wrapped.(cloud.QuotaError)
        c.Check(ok, check.Equals, true)
 
-       capacityError := awserr.New("InsufficientInstanceCapacity", "", nil)
-       wrapped = wrapError(capacityError, nil)
-       caperr, ok := wrapped.(cloud.CapacityError)
-       c.Check(ok, check.Equals, true)
-       c.Check(caperr.IsCapacityError(), check.Equals, true)
-       c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
+       for _, trial := range []struct {
+               code string
+               msg  string
+       }{
+               {"InsufficientInstanceCapacity", ""},
+               {"Unsupported", "Your requested instance type (t3.micro) is not supported in your requested Availability Zone (us-east-1e). Please retry your request by not specifying an Availability Zone or choosing us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f."},
+       } {
+               capacityError := awserr.New(trial.code, trial.msg, nil)
+               wrapped = wrapError(capacityError, nil)
+               caperr, ok := wrapped.(cloud.CapacityError)
+               c.Check(ok, check.Equals, true)
+               c.Check(caperr.IsCapacityError(), check.Equals, true)
+               c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
+       }
 }
index 9155a8edaa36352e6c687b58b991e8564d6051e5..24d626aae50a1a9b46ec3d3dde9b3efcccbbb853 100644 (file)
@@ -1112,6 +1112,17 @@ Clusters:
       # A price factor of 1.0 is a reasonable starting point.
       PreemptiblePriceFactor: 0
 
+      # When the lowest-priced instance type for a given container is
+      # not available, try other instance types, up to the indicated
+      # maximum price factor.
+      #
+      # For example, with AvailabilityPriceFactor 1.5, if the
+      # lowest-cost instance type A suitable for a given container
+      # costs $2/h, Arvados may run the container on any instance type
+      # B costing $3/h or less when instance type A is not available
+      # or an idle instance of type B is already running.
+      MaximumPriceFactor: 1.5
+
       # PEM encoded SSH key (RSA, DSA, or ECDSA) used by the
       # cloud dispatcher for executing containers on worker VMs.
       # Begins with "-----BEGIN RSA PRIVATE KEY-----\n"
index e1f5ff9ee13473d77328e8c8521d9d4370b76483..cbd9ff6d7f982c2818829b89f7a15fab9b5ca044 100644 (file)
@@ -136,6 +136,7 @@ var whitelist = map[string]bool{
        "Containers.LogReuseDecisions":             false,
        "Containers.LSF":                           false,
        "Containers.MaxDispatchAttempts":           false,
+       "Containers.MaximumPriceFactor":            true,
        "Containers.MaxRetryAttempts":              true,
        "Containers.MinRetryPeriod":                true,
        "Containers.PreemptiblePriceFactor":        false,
index 77752013e8e6444431ffdb49020e9a6781b031ca..8d8b7ff9af09a152ec106d020097c6b820920b62 100644 (file)
@@ -22,7 +22,7 @@ import (
 // load at the cost of increased under light load.
 const queuedContainersTarget = 100
 
-type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+type typeChooser func(*arvados.Container) ([]arvados.InstanceType, error)
 
 // An APIClient performs Arvados API requests. It is typically an
 // *arvados.Client.
@@ -36,9 +36,9 @@ type QueueEnt struct {
        // The container to run. Only the UUID, State, Priority,
        // RuntimeConstraints, ContainerImage, SchedulingParameters,
        // and CreatedAt fields are populated.
-       Container    arvados.Container    `json:"container"`
-       InstanceType arvados.InstanceType `json:"instance_type"`
-       FirstSeenAt  time.Time            `json:"first_seen_at"`
+       Container     arvados.Container      `json:"container"`
+       InstanceTypes []arvados.InstanceType `json:"instance_types"`
+       FirstSeenAt   time.Time              `json:"first_seen_at"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -252,7 +252,7 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                logger.WithError(err).Warn("error getting mounts")
                return
        }
-       it, err := cq.chooseType(&ctr)
+       types, err := cq.chooseType(&ctr)
 
        // Avoid wasting memory on a large Mounts attr (we don't need
        // it after choosing type).
@@ -304,13 +304,20 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                }()
                return
        }
+       typeNames := ""
+       for _, it := range types {
+               if typeNames != "" {
+                       typeNames += ", "
+               }
+               typeNames += it.Name
+       }
        cq.logger.WithFields(logrus.Fields{
                "ContainerUUID": ctr.UUID,
                "State":         ctr.State,
                "Priority":      ctr.Priority,
-               "InstanceType":  it.Name,
+               "InstanceTypes": typeNames,
        }).Info("adding container to queue")
-       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceTypes: types, FirstSeenAt: time.Now()}
 }
 
 // Lock acquires the dispatch lock for the given container.
@@ -577,7 +584,7 @@ func (cq *Queue) runMetrics(reg *prometheus.Registry) {
                }
                ents, _ := cq.Entries()
                for _, ent := range ents {
-                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+                       count[entKey{ent.Container.State, ent.InstanceTypes[0].Name}]++
                }
                for k, v := range count {
                        mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
index ca10983534a3579e79e8a02a6601126af9d32ae0..928c6dd8c87400403d8feaab6459b2ba0fff40f0 100644 (file)
@@ -40,9 +40,9 @@ func (suite *IntegrationSuite) TearDownTest(c *check.C) {
 }
 
 func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
-       typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+       typeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
-               return arvados.InstanceType{Name: "testType"}, nil
+               return []arvados.InstanceType{{Name: "testType"}}, nil
        }
 
        client := arvados.NewClientFromEnv()
@@ -62,7 +62,8 @@ func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
        var wg sync.WaitGroup
        for uuid, ent := range ents {
                c.Check(ent.Container.UUID, check.Equals, uuid)
-               c.Check(ent.InstanceType.Name, check.Equals, "testType")
+               c.Check(ent.InstanceTypes, check.HasLen, 1)
+               c.Check(ent.InstanceTypes[0].Name, check.Equals, "testType")
                c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
                c.Check(ent.Container.Priority > 0, check.Equals, true)
                // Mounts should be deleted to avoid wasting memory
@@ -108,7 +109,7 @@ func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
 }
 
 func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
-       errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+       errorTypeChooser := func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                // Make sure the relevant container fields are
                // actually populated.
                c.Check(ctr.ContainerImage, check.Equals, "test")
@@ -116,7 +117,7 @@ func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
                c.Check(ctr.RuntimeConstraints.RAM, check.Equals, int64(12000000000))
                c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
                c.Check(ctr.Mounts["/var/spool/cwl"].Capacity, check.Equals, int64(24000000000))
-               return arvados.InstanceType{}, errors.New("no suitable instance type")
+               return nil, errors.New("no suitable instance type")
        }
 
        client := arvados.NewClientFromEnv()
index 49be9e68a2358b6fcb40c95e76ed0bbbb2c29471..47e60abdee4744689f18ecd2094fd606188982d3 100644 (file)
@@ -111,7 +111,7 @@ func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
        return exr
 }
 
-func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) {
        return ChooseInstanceType(disp.Cluster, ctr)
 }
 
index 6c057edc70223001a1a8d66a3134158806d559cb..33d7f4e9acd2e5e15293faffefd4d2667e2899f7 100644 (file)
@@ -15,6 +15,7 @@ import (
        "net/url"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -72,6 +73,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        StaleLockTimeout:       arvados.Duration(5 * time.Millisecond),
                        RuntimeEngine:          "stub",
                        MaxDispatchAttempts:    10,
+                       MaximumPriceFactor:     1.5,
                        CloudVMs: arvados.CloudVMsConfig{
                                Driver:               "test",
                                SyncInterval:         arvados.Duration(10 * time.Millisecond),
@@ -160,7 +162,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                MaxDispatchAttempts: 5,
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
                Logger: ctxlog.TestLogger(c),
@@ -205,9 +207,15 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                finishContainer(ctr)
                return int(rand.Uint32() & 0x3)
        }
+       var countCapacityErrors int64
        n := 0
        s.stubDriver.Queue = queue
-       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if pt := stubvm.Instance().ProviderType(); pt == test.InstanceType(6).ProviderType {
+                       c.Logf("test: returning capacity error for instance type %s", pt)
+                       atomic.AddInt64(&countCapacityErrors, 1)
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
                n++
                stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
                stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
@@ -235,6 +243,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        stubvm.CrunchRunCrashRate = 0.1
                        stubvm.ArvMountDeadlockRate = 0.1
                }
+               return nil
        }
        s.stubDriver.Bugf = c.Errorf
 
@@ -270,6 +279,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                }
        }
 
+       c.Check(countCapacityErrors, check.Not(check.Equals), int64(0))
+
        req := httptest.NewRequest("GET", "/metrics", nil)
        req.Header.Set("Authorization", "Bearer "+s.cluster.ManagementToken)
        resp := httptest.NewRecorder()
index 0b394f4cfe4f76849fc2eb42541ed613e325921f..802bc65c28ca3b96c05ae269b467f7698fac55db 100644 (file)
@@ -6,6 +6,7 @@ package dispatchcloud
 
 import (
        "errors"
+       "math"
        "regexp"
        "sort"
        "strconv"
@@ -99,12 +100,16 @@ func versionLess(vs1 string, vs2 string) (bool, error) {
        return v1 < v2, nil
 }
 
-// ChooseInstanceType returns the cheapest available
-// arvados.InstanceType big enough to run ctr.
-func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+// ChooseInstanceType returns the arvados.InstanceTypes eligible to
+// run ctr, i.e., those that have enough RAM, VCPUs, etc., and are not
+// too expensive according to cluster configuration.
+//
+// The returned types are sorted with lower prices first.
+//
+// The error is non-nil if and only if the returned slice is empty.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) ([]arvados.InstanceType, error) {
        if len(cc.InstanceTypes) == 0 {
-               err = ErrInstanceTypesNotConfigured
-               return
+               return nil, ErrInstanceTypesNotConfigured
        }
 
        needScratch := EstimateScratchSpace(ctr)
@@ -121,31 +126,33 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
        }
        needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
 
-       ok := false
+       maxPriceFactor := math.Max(cc.Containers.MaximumPriceFactor, 1)
+       var types []arvados.InstanceType
+       var maxPrice float64
        for _, it := range cc.InstanceTypes {
                driverInsuff, driverErr := versionLess(it.CUDA.DriverVersion, ctr.RuntimeConstraints.CUDA.DriverVersion)
                capabilityInsuff, capabilityErr := versionLess(it.CUDA.HardwareCapability, ctr.RuntimeConstraints.CUDA.HardwareCapability)
 
                switch {
                // reasons to reject a node
-               case ok && it.Price > best.Price: // already selected a node, and this one is more expensive
+               case maxPrice > 0 && it.Price > maxPrice: // too expensive
                case int64(it.Scratch) < needScratch: // insufficient scratch
                case int64(it.RAM) < needRAM: // insufficient RAM
                case it.VCPUs < needVCPUs: // insufficient VCPUs
                case it.Preemptible != ctr.SchedulingParameters.Preemptible: // wrong preemptable setting
-               case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs): // same price, worse specs
                case it.CUDA.DeviceCount < ctr.RuntimeConstraints.CUDA.DeviceCount: // insufficient CUDA devices
                case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (driverInsuff || driverErr != nil): // insufficient driver version
                case ctr.RuntimeConstraints.CUDA.DeviceCount > 0 && (capabilityInsuff || capabilityErr != nil): // insufficient hardware capability
                        // Don't select this node
                default:
                        // Didn't reject the node, so select it
-                       // Lower price || (same price && better specs)
-                       best = it
-                       ok = true
+                       types = append(types, it)
+                       if newmax := it.Price * maxPriceFactor; newmax < maxPrice || maxPrice == 0 {
+                               maxPrice = newmax
+                       }
                }
        }
-       if !ok {
+       if len(types) == 0 {
                availableTypes := make([]arvados.InstanceType, 0, len(cc.InstanceTypes))
                for _, t := range cc.InstanceTypes {
                        availableTypes = append(availableTypes, t)
@@ -153,11 +160,39 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
                sort.Slice(availableTypes, func(a, b int) bool {
                        return availableTypes[a].Price < availableTypes[b].Price
                })
-               err = ConstraintsNotSatisfiableError{
+               return nil, ConstraintsNotSatisfiableError{
                        errors.New("constraints not satisfiable by any configured instance type"),
                        availableTypes,
                }
-               return
        }
-       return
+       sort.Slice(types, func(i, j int) bool {
+               if types[i].Price != types[j].Price {
+                       // prefer lower price
+                       return types[i].Price < types[j].Price
+               }
+               if types[i].RAM != types[j].RAM {
+                       // if same price, prefer more RAM
+                       return types[i].RAM > types[j].RAM
+               }
+               if types[i].VCPUs != types[j].VCPUs {
+                       // if same price and RAM, prefer more VCPUs
+                       return types[i].VCPUs > types[j].VCPUs
+               }
+               if types[i].Scratch != types[j].Scratch {
+                       // if same price and RAM and VCPUs, prefer more scratch
+                       return types[i].Scratch > types[j].Scratch
+               }
+               // no preference, just sort the same way each time
+               return types[i].Name < types[j].Name
+       })
+       // Truncate types at maxPrice. We rejected it.Price>maxPrice
+       // in the loop above, but at that point maxPrice wasn't
+       // necessarily the final (lowest) maxPrice.
+       for i, it := range types {
+               if i > 0 && it.Price > maxPrice {
+                       types = types[:i]
+                       break
+               }
+       }
+       return types, nil
 }
index 86bfbec7b629dc731e309740346dec85a24ae2d7..5d2713e982a4a4d397a9888081eb225133617443 100644 (file)
@@ -93,12 +93,51 @@ func (*NodeSizeSuite) TestChoose(c *check.C) {
                                KeepCacheRAM: 123456789,
                        },
                })
-               c.Check(err, check.IsNil)
-               c.Check(best.Name, check.Equals, "best")
-               c.Check(best.RAM >= 1234567890, check.Equals, true)
-               c.Check(best.VCPUs >= 2, check.Equals, true)
-               c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+               c.Assert(err, check.IsNil)
+               c.Assert(best, check.Not(check.HasLen), 0)
+               c.Check(best[0].Name, check.Equals, "best")
+               c.Check(best[0].RAM >= 1234567890, check.Equals, true)
+               c.Check(best[0].VCPUs >= 2, check.Equals, true)
+               c.Check(best[0].Scratch >= 2*GiB, check.Equals, true)
+               for i := range best {
+                       // If multiple instance types are returned
+                       // then they should all have the same price,
+                       // because we didn't set MaximumPriceFactor>1.
+                       c.Check(best[i].Price, check.Equals, best[0].Price)
+               }
+       }
+}
+
+func (*NodeSizeSuite) TestMaximumPriceFactor(c *check.C) {
+       menu := map[string]arvados.InstanceType{
+               "best+7":  {Price: 3.4, RAM: 8000000000, VCPUs: 8, Scratch: 64 * GiB, Name: "best+7"},
+               "best+5":  {Price: 3.0, RAM: 8000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+5"},
+               "best+3":  {Price: 2.6, RAM: 4000000000, VCPUs: 8, Scratch: 16 * GiB, Name: "best+3"},
+               "best+2":  {Price: 2.4, RAM: 4000000000, VCPUs: 8, Scratch: 4 * GiB, Name: "best+2"},
+               "best+1":  {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 4 * GiB, Name: "best+1"},
+               "best":    {Price: 2.0, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+               "small+1": {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 16 * GiB, Name: "small+1"},
+               "small":   {Price: 1.0, RAM: 2000000000, VCPUs: 2, Scratch: 1 * GiB, Name: "small"},
        }
+       best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu, Containers: arvados.ContainersConfig{
+               MaximumPriceFactor: 1.5,
+       }}, &arvados.Container{
+               Mounts: map[string]arvados.Mount{
+                       "/tmp": {Kind: "tmp", Capacity: 2 * int64(GiB)},
+               },
+               RuntimeConstraints: arvados.RuntimeConstraints{
+                       VCPUs:        2,
+                       RAM:          987654321,
+                       KeepCacheRAM: 123456789,
+               },
+       })
+       c.Assert(err, check.IsNil)
+       c.Assert(best, check.HasLen, 5)
+       c.Check(best[0].Name, check.Equals, "best") // best price is $2
+       c.Check(best[1].Name, check.Equals, "best+1")
+       c.Check(best[2].Name, check.Equals, "best+2")
+       c.Check(best[3].Name, check.Equals, "best+3")
+       c.Check(best[4].Name, check.Equals, "best+5") // max price is $2 * 1.5 = $3
 }
 
 func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) {
@@ -121,7 +160,8 @@ func (*NodeSizeSuite) TestChooseWithBlobBuffersOverhead(c *check.C) {
                },
        })
        c.Check(err, check.IsNil)
-       c.Check(best.Name, check.Equals, "best")
+       c.Assert(best, check.HasLen, 1)
+       c.Check(best[0].Name, check.Equals, "best")
 }
 
 func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) {
@@ -145,11 +185,12 @@ func (*NodeSizeSuite) TestChoosePreemptible(c *check.C) {
                },
        })
        c.Check(err, check.IsNil)
-       c.Check(best.Name, check.Equals, "best")
-       c.Check(best.RAM >= 1234567890, check.Equals, true)
-       c.Check(best.VCPUs >= 2, check.Equals, true)
-       c.Check(best.Scratch >= 2*GiB, check.Equals, true)
-       c.Check(best.Preemptible, check.Equals, true)
+       c.Assert(best, check.HasLen, 1)
+       c.Check(best[0].Name, check.Equals, "best")
+       c.Check(best[0].RAM >= 1234567890, check.Equals, true)
+       c.Check(best[0].VCPUs >= 2, check.Equals, true)
+       c.Check(best[0].Scratch >= 2*GiB, check.Equals, true)
+       c.Check(best[0].Preemptible, check.Equals, true)
 }
 
 func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) {
@@ -252,9 +293,10 @@ func (*NodeSizeSuite) TestChooseGPU(c *check.C) {
                                CUDA:         tc.CUDA,
                        },
                })
-               if best.Name != "" {
+               if len(best) > 0 {
                        c.Check(err, check.IsNil)
-                       c.Check(best.Name, check.Equals, tc.SelectedInstance)
+                       c.Assert(best, check.HasLen, 1)
+                       c.Check(best[0].Name, check.Equals, tc.SelectedInstance)
                } else {
                        c.Check(err, check.Not(check.IsNil))
                }
index 3505c3e064e288a22d0c59cbbdfdadc65edd44ce..2f1f17589029dfea32f2a44ac25f0a65f0ee017e 100644 (file)
@@ -163,10 +163,9 @@ func (sch *Scheduler) runQueue() {
 
 tryrun:
        for i, ent := range sorted {
-               ctr, it := ent.Container, ent.InstanceType
+               ctr, types := ent.Container, ent.InstanceTypes
                logger := sch.logger.WithFields(logrus.Fields{
                        "ContainerUUID": ctr.UUID,
-                       "InstanceType":  it.Name,
                })
                if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
@@ -178,6 +177,35 @@ tryrun:
                if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
                        continue
                }
+               // If we have unalloc instances of any of the eligible
+               // instance types, unallocOK is true and unallocType
+               // is the lowest-cost type.
+               var unallocOK bool
+               var unallocType arvados.InstanceType
+               for _, it := range types {
+                       if unalloc[it] > 0 {
+                               unallocOK = true
+                               unallocType = it
+                               break
+                       }
+               }
+               // If the pool is not reporting AtCapacity for any of
+               // the eligible instance types, availableOK is true
+               // and availableType is the lowest-cost type.
+               var availableOK bool
+               var availableType arvados.InstanceType
+               for _, it := range types {
+                       if atcapacity[it.ProviderType] {
+                               continue
+                       } else if sch.pool.AtCapacity(it) {
+                               atcapacity[it.ProviderType] = true
+                               continue
+                       } else {
+                               availableOK = true
+                               availableType = it
+                               break
+                       }
+               }
                switch ctr.State {
                case arvados.ContainerStateQueued:
                        if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
@@ -185,14 +213,13 @@ tryrun:
                                continue
                        }
                        trying++
-                       if unalloc[it] < 1 && sch.pool.AtQuota() {
+                       if !unallocOK && sch.pool.AtQuota() {
                                logger.Trace("not locking: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
                                break tryrun
                        }
-                       if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) {
+                       if !unallocOK && !availableOK {
                                logger.Trace("not locking: AtCapacity and no unalloc workers")
-                               atcapacity[it.ProviderType] = true
                                continue
                        }
                        if sch.pool.KillContainer(ctr.UUID, "about to lock") {
@@ -200,16 +227,36 @@ tryrun:
                                continue
                        }
                        go sch.lockContainer(logger, ctr.UUID)
-                       unalloc[it]--
+                       unalloc[unallocType]--
                case arvados.ContainerStateLocked:
                        if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
                                logger.Tracef("not starting: already at maxConcurrency %d", sch.maxConcurrency)
                                continue
                        }
                        trying++
-                       if unalloc[it] > 0 {
-                               unalloc[it]--
-                       } else if sch.pool.AtQuota() {
+                       if unallocOK {
+                               // We have a suitable instance type,
+                               // so mark it as allocated, and try to
+                               // start the container.
+                               unalloc[unallocType]--
+                               logger = logger.WithField("InstanceType", unallocType)
+                               if dontstart[unallocType] {
+                                       // We already tried & failed to start
+                                       // a higher-priority container on the
+                                       // same instance type. Don't let this
+                                       // one sneak in ahead of it.
+                               } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
+                                       logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
+                               } else if sch.pool.StartContainer(unallocType, ctr) {
+                                       logger.Trace("StartContainer => true")
+                               } else {
+                                       logger.Trace("StartContainer => false")
+                                       containerAllocatedWorkerBootingCount += 1
+                                       dontstart[unallocType] = true
+                               }
+                               continue
+                       }
+                       if sch.pool.AtQuota() {
                                // Don't let lower-priority containers
                                // starve this one by using keeping
                                // idle workers alive on different
@@ -217,7 +264,8 @@ tryrun:
                                logger.Trace("overquota")
                                overquota = sorted[i:]
                                break tryrun
-                       } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) {
+                       }
+                       if !availableOK {
                                // Continue trying lower-priority
                                // containers in case they can run on
                                // different instance types that are
@@ -231,41 +279,26 @@ tryrun:
                                // container A on the next call to
                                // runQueue(), rather than run
                                // container B now.
-                               //
-                               // TODO: try running this container on
-                               // a bigger (but not much more
-                               // expensive) instance type.
-                               logger.WithField("InstanceType", it.Name).Trace("at capacity")
-                               atcapacity[it.ProviderType] = true
+                               logger.Trace("all eligible types at capacity")
                                continue
-                       } else if sch.pool.Create(it) {
-                               // Success. (Note pool.Create works
-                               // asynchronously and does its own
-                               // logging about the eventual outcome,
-                               // so we don't need to.)
-                               logger.Info("creating new instance")
-                       } else {
+                       }
+                       logger = logger.WithField("InstanceType", availableType)
+                       if !sch.pool.Create(availableType) {
                                // Failed despite not being at quota,
                                // e.g., cloud ops throttled.
                                logger.Trace("pool declined to create new instance")
                                continue
                        }
-
-                       if dontstart[it] {
-                               // We already tried & failed to start
-                               // a higher-priority container on the
-                               // same instance type. Don't let this
-                               // one sneak in ahead of it.
-                       } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
-                               logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
-                       } else if sch.pool.StartContainer(it, ctr) {
-                               logger.Trace("StartContainer => true")
-                               // Success.
-                       } else {
-                               logger.Trace("StartContainer => false")
-                               containerAllocatedWorkerBootingCount += 1
-                               dontstart[it] = true
-                       }
+                       // Success. (Note pool.Create works
+                       // asynchronously and does its own logging
+                       // about the eventual outcome, so we don't
+                       // need to.)
+                       logger.Info("creating new instance")
+                       // Don't bother trying to start the container
+                       // yet -- obviously the instance will take
+                       // some time to boot and become ready.
+                       containerAllocatedWorkerBootingCount += 1
+                       dontstart[availableType] = true
                }
        }
 
index da6955029a0fb16aedf4da1df05e12982732168e..e4a05daba535f4a2e50252085f64c447b1476bcf 100644 (file)
@@ -139,8 +139,8 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
        return true
 }
 
-func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
-       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+func chooseType(ctr *arvados.Container) ([]arvados.InstanceType, error) {
+       return []arvados.InstanceType{test.InstanceType(ctr.RuntimeConstraints.VCPUs)}, nil
 }
 
 var _ = check.Suite(&SchedulerSuite{})
@@ -364,7 +364,7 @@ func (*SchedulerSuite) TestInstanceCapacity(c *check.C) {
        // type4, so we skip trying to create an instance for
        // container3, skip locking container2, but do try to create a
        // type1 instance for container1.
-       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)})
+       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.shutdowns, check.Equals, 0)
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
        c.Check(queue.StateChanges(), check.HasLen, 0)
index 2be8246bd619640ee7cd6ffd43bfacf42236370c..ea2b98236ffe34d325fc2c69f96c5be3eea2450e 100644 (file)
@@ -22,7 +22,7 @@ type Queue struct {
 
        // ChooseType will be called for each entry in Containers. It
        // must not be nil.
-       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+       ChooseType func(*arvados.Container) ([]arvados.InstanceType, error)
 
        // Mimic railsapi implementation of MaxDispatchAttempts config
        MaxDispatchAttempts int
@@ -167,12 +167,12 @@ func (q *Queue) Update() error {
                        ent.Container = ctr
                        upd[ctr.UUID] = ent
                } else {
-                       it, _ := q.ChooseType(&ctr)
+                       types, _ := q.ChooseType(&ctr)
                        ctr.Mounts = nil
                        upd[ctr.UUID] = container.QueueEnt{
-                               Container:    ctr,
-                               InstanceType: it,
-                               FirstSeenAt:  time.Now(),
+                               Container:     ctr,
+                               InstanceTypes: types,
+                               FirstSeenAt:   time.Now(),
                        }
                }
        }
index 6e0b1294875dfc9e50c2794b3b79cdd069c1e5ce..0a74d976063351ba8a3bdff0b35c91d360028c73 100644 (file)
@@ -34,7 +34,10 @@ type StubDriver struct {
        // SetupVM, if set, is called upon creation of each new
        // StubVM. This is the caller's opportunity to customize the
        // VM's error rate and other behaviors.
-       SetupVM func(*StubVM)
+       //
+       // If SetupVM returns an error, that error will be returned to
+       // the caller of Create(), and the new VM will be discarded.
+       SetupVM func(*StubVM) error
 
        // Bugf, if set, is called if a bug is detected in the caller
        // or stub. Typically set to (*check.C)Errorf. If unset,
@@ -152,7 +155,10 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                Exec:           svm.Exec,
        }
        if setup := sis.driver.SetupVM; setup != nil {
-               setup(svm)
+               err := setup(svm)
+               if err != nil {
+                       return nil, err
+               }
        }
        sis.servers[svm.id] = svm
        return svm.Instance(), nil
@@ -195,6 +201,12 @@ type RateLimitError struct{ Retry time.Time }
 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 
+type CapacityError struct{ InstanceTypeSpecific bool }
+
+func (e CapacityError) Error() string                { return "insufficient capacity" }
+func (e CapacityError) IsCapacityError() bool        { return true }
+func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
index 27f1ab773dd586faa03bf4b7d1c69e3c330b84ac..ea01cc3c6872a88adb341d953adbe75760b4f176 100644 (file)
@@ -517,6 +517,7 @@ type ContainersConfig struct {
        SupportedDockerImageFormats   StringSet
        AlwaysUsePreemptibleInstances bool
        PreemptiblePriceFactor        float64
+       MaximumPriceFactor            float64
        RuntimeEngine                 string
        LocalKeepBlobBuffersPerVCPU   int
        LocalKeepLogsToContainerLog   string
index 1c0f6ad28f5ba7b6d20bcc0cadbef0fb87fec634..5a9ef91c3d23784e7e1a020a60ef09ecaea1e538 100644 (file)
@@ -197,14 +197,16 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
        if disp.cluster == nil {
                // no instance types configured
                args = append(args, disp.slurmConstraintArgs(container)...)
-       } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+       } else if types, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
                // ditto
                args = append(args, disp.slurmConstraintArgs(container)...)
        } else if err != nil {
                return nil, err
        } else {
-               // use instancetype constraint instead of slurm mem/cpu/tmp specs
-               args = append(args, "--constraint=instancetype="+it.Name)
+               // use instancetype constraint instead of slurm
+               // mem/cpu/tmp specs (note types[0] is the lowest-cost
+               // suitable instance type)
+               args = append(args, "--constraint=instancetype="+types[0].Name)
        }
 
        if len(container.SchedulingParameters.Partitions) > 0 {