Merge branch '19792-pysdk-cookbook'
[arvados.git] / lib / cloud / ec2 / ec2.go
index 52b73f781c6bc63c2e4e2d3242ddc33c642157dc..b90eff6d571301085c2acd3d9bb8df56d1cfe54c 100644 (file)
@@ -13,6 +13,7 @@ import (
        "encoding/json"
        "fmt"
        "math/big"
+       "strconv"
        "sync"
        "sync/atomic"
        "time"
@@ -40,14 +41,16 @@ const (
 )
 
 type ec2InstanceSetConfig struct {
-       AccessKeyID        string
-       SecretAccessKey    string
-       Region             string
-       SecurityGroupIDs   arvados.StringSet
-       SubnetID           string
-       AdminUsername      string
-       EBSVolumeType      string
-       IAMInstanceProfile string
+       AccessKeyID             string
+       SecretAccessKey         string
+       Region                  string
+       SecurityGroupIDs        arvados.StringSet
+       SubnetID                string
+       AdminUsername           string
+       EBSVolumeType           string
+       EBSPrice                float64
+       IAMInstanceProfile      string
+       SpotPriceUpdateInterval arvados.Duration
 }
 
 type ec2Interface interface {
@@ -55,6 +58,8 @@ type ec2Interface interface {
        ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error)
        RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error)
        DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
+       DescribeInstanceStatusPages(input *ec2.DescribeInstanceStatusInput, fn func(*ec2.DescribeInstanceStatusOutput, bool) bool) error
+       DescribeSpotPriceHistoryPages(input *ec2.DescribeSpotPriceHistoryInput, fn func(*ec2.DescribeSpotPriceHistoryOutput, bool) bool) error
        CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
        TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error)
 }
@@ -68,6 +73,10 @@ type ec2InstanceSet struct {
        keys                   map[string]string
        throttleDelayCreate    atomic.Value
        throttleDelayInstances atomic.Value
+
+       prices        map[priceKey][]cloud.InstancePrice
+       pricesLock    sync.Mutex
+       pricesUpdated map[priceKey]time.Time
 }
 
 func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
@@ -242,7 +251,6 @@ func (instanceSet *ec2InstanceSet) Create(
        if err != nil {
                return nil, err
        }
-
        return &ec2Instance{
                provider: instanceSet,
                instance: rsv.Instances[0],
@@ -257,6 +265,7 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
                        Values: []*string{aws.String(v)},
                })
        }
+       needAZs := false
        dii := &ec2.DescribeInstancesInput{Filters: filters}
        for {
                dio, err := instanceSet.client.DescribeInstances(dii)
@@ -268,23 +277,150 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
                for _, rsv := range dio.Reservations {
                        for _, inst := range rsv.Instances {
                                if *inst.State.Name != "shutting-down" && *inst.State.Name != "terminated" {
-                                       instances = append(instances, &ec2Instance{instanceSet, inst})
+                                       instances = append(instances, &ec2Instance{
+                                               provider: instanceSet,
+                                               instance: inst,
+                                       })
+                                       if aws.StringValue(inst.InstanceLifecycle) == "spot" {
+                                               needAZs = true
+                                       }
                                }
                        }
                }
                if dio.NextToken == nil {
-                       return instances, err
+                       break
                }
                dii.NextToken = dio.NextToken
        }
+       if needAZs && instanceSet.ec2config.SpotPriceUpdateInterval > 0 {
+               az := map[string]string{}
+               err := instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{
+                       IncludeAllInstances: aws.Bool(true),
+               }, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
+                       for _, ent := range page.InstanceStatuses {
+                               az[*ent.InstanceId] = *ent.AvailabilityZone
+                       }
+                       return true
+               })
+               if err != nil {
+                       instanceSet.logger.Warnf("error getting instance statuses: %s", err)
+               }
+               for _, inst := range instances {
+                       inst := inst.(*ec2Instance)
+                       inst.availabilityZone = az[*inst.instance.InstanceId]
+               }
+               instanceSet.updateSpotPrices(instances)
+       }
+       return instances, err
+}
+
+type priceKey struct {
+       instanceType     string
+       spot             bool
+       availabilityZone string
+}
+
+// Refresh recent spot instance pricing data for the given instances,
+// unless we already have recent pricing data for all relevant types.
+func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance) {
+       if len(instances) == 0 {
+               return
+       }
+
+       instanceSet.pricesLock.Lock()
+       defer instanceSet.pricesLock.Unlock()
+       if instanceSet.prices == nil {
+               instanceSet.prices = map[priceKey][]cloud.InstancePrice{}
+               instanceSet.pricesUpdated = map[priceKey]time.Time{}
+       }
+
+       updateTime := time.Now()
+       staleTime := updateTime.Add(-instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+       needUpdate := false
+       allTypes := map[string]bool{}
+
+       for _, inst := range instances {
+               ec2inst := inst.(*ec2Instance).instance
+               if aws.StringValue(ec2inst.InstanceLifecycle) == "spot" {
+                       pk := priceKey{
+                               instanceType:     *ec2inst.InstanceType,
+                               spot:             true,
+                               availabilityZone: inst.(*ec2Instance).availabilityZone,
+                       }
+                       if instanceSet.pricesUpdated[pk].Before(staleTime) {
+                               needUpdate = true
+                       }
+                       allTypes[*ec2inst.InstanceType] = true
+               }
+       }
+       if !needUpdate {
+               return
+       }
+       var typeFilterValues []*string
+       for instanceType := range allTypes {
+               typeFilterValues = append(typeFilterValues, aws.String(instanceType))
+       }
+       // Get 3x update interval worth of pricing data. (Ideally the
+       // AWS API would tell us "we have shown you all of the price
+       // changes up to time T", but it doesn't, so we'll just ask
+       // for 3 intervals worth of data on each update, de-duplicate
+       // the data points, and not worry too much about occasionally
+       // missing some data points when our lookups fail twice in a
+       // row.
+       dsphi := &ec2.DescribeSpotPriceHistoryInput{
+               StartTime: aws.Time(updateTime.Add(-3 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())),
+               Filters: []*ec2.Filter{
+                       &ec2.Filter{Name: aws.String("instance-type"), Values: typeFilterValues},
+                       &ec2.Filter{Name: aws.String("product-description"), Values: []*string{aws.String("Linux/UNIX")}},
+               },
+       }
+       err := instanceSet.client.DescribeSpotPriceHistoryPages(dsphi, func(page *ec2.DescribeSpotPriceHistoryOutput, lastPage bool) bool {
+               for _, ent := range page.SpotPriceHistory {
+                       if ent.InstanceType == nil || ent.SpotPrice == nil || ent.Timestamp == nil {
+                               // bogus record?
+                               continue
+                       }
+                       price, err := strconv.ParseFloat(*ent.SpotPrice, 64)
+                       if err != nil {
+                               // bogus record?
+                               continue
+                       }
+                       pk := priceKey{
+                               instanceType:     *ent.InstanceType,
+                               spot:             true,
+                               availabilityZone: *ent.AvailabilityZone,
+                       }
+                       instanceSet.prices[pk] = append(instanceSet.prices[pk], cloud.InstancePrice{
+                               StartTime: *ent.Timestamp,
+                               Price:     price,
+                       })
+                       instanceSet.pricesUpdated[pk] = updateTime
+               }
+               return true
+       })
+       if err != nil {
+               instanceSet.logger.Warnf("error retrieving spot instance prices: %s", err)
+       }
+
+       expiredTime := updateTime.Add(-64 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+       for pk, last := range instanceSet.pricesUpdated {
+               if last.Before(expiredTime) {
+                       delete(instanceSet.pricesUpdated, pk)
+                       delete(instanceSet.prices, pk)
+               }
+       }
+       for pk, prices := range instanceSet.prices {
+               instanceSet.prices[pk] = cloud.NormalizePriceHistory(prices)
+       }
 }
 
 func (instanceSet *ec2InstanceSet) Stop() {
 }
 
 type ec2Instance struct {
-       provider *ec2InstanceSet
-       instance *ec2.Instance
+       provider         *ec2InstanceSet
+       instance         *ec2.Instance
+       availabilityZone string // sometimes available for spot instances
 }
 
 func (inst *ec2Instance) ID() cloud.InstanceID {
@@ -348,6 +484,53 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
        return cloud.ErrNotImplemented
 }
 
+// PriceHistory returns the price history for this specific instance.
+//
+// AWS documentation is elusive about whether the hourly cost of a
+// given spot instance changes as the current spot price changes for
+// the corresponding instance type and availability zone. Our
+// implementation assumes the answer is yes, based on the following
+// hints.
+//
+// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-requests.html
+// says: "After your Spot Instance is running, if the Spot price rises
+// above your maximum price, Amazon EC2 interrupts your Spot
+// Instance." (This doesn't address what happens when the spot price
+// rises *without* exceeding your maximum price.)
+//
+// https://docs.aws.amazon.com/whitepapers/latest/cost-optimization-leveraging-ec2-spot-instances/how-spot-instances-work.html
+// says: "You pay the Spot price that's in effect, billed to the
+// nearest second." (But it's not explicitly stated whether "the price
+// in effect" changes over time for a given instance.)
+//
+// The same page also says, in a discussion about the effect of
+// specifying a maximum price: "Note that you never pay more than the
+// Spot price that is in effect when your Spot Instance is running."
+// (The use of the phrase "is running", as opposed to "was launched",
+// hints that pricing is dynamic.)
+func (inst *ec2Instance) PriceHistory(instType arvados.InstanceType) []cloud.InstancePrice {
+       inst.provider.pricesLock.Lock()
+       defer inst.provider.pricesLock.Unlock()
+       // Note updateSpotPrices currently populates
+       // inst.provider.prices only for spot instances, so if
+       // spot==false here, we will return no data.
+       pk := priceKey{
+               instanceType:     *inst.instance.InstanceType,
+               spot:             aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
+               availabilityZone: inst.availabilityZone,
+       }
+       var prices []cloud.InstancePrice
+       for _, price := range inst.provider.prices[pk] {
+               // ceil(added scratch space in GiB)
+               gib := (instType.AddedScratch + 1<<30 - 1) >> 30
+               monthly := inst.provider.ec2config.EBSPrice * float64(gib)
+               hourly := monthly / 30 / 24
+               price.Price += hourly
+               prices = append(prices, price)
+       }
+       return prices
+}
+
 type rateLimitError struct {
        error
        earliestRetry time.Time