Merge branch '20953-installer-tls-cert-monitoring'. Closes #20953
[arvados.git] / lib / cloud / ec2 / ec2.go
index 526fc1307dc29e8d77d588cf6dae4c9489c0464d..816df48d90c7dced2bacc1c864ea71da8dccd3d2 100644 (file)
@@ -29,6 +29,7 @@ import (
        "github.com/aws/aws-sdk-go/aws/request"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/ec2"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
@@ -112,9 +113,12 @@ type ec2InstanceSet struct {
        prices        map[priceKey][]cloud.InstancePrice
        pricesLock    sync.Mutex
        pricesUpdated map[priceKey]time.Time
+
+       mInstances      *prometheus.GaugeVec
+       mInstanceStarts *prometheus.CounterVec
 }
 
-func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
        instanceSet := &ec2InstanceSet{
                instanceSetID: instanceSetID,
                logger:        logger,
@@ -141,6 +145,36 @@ func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID
        if instanceSet.ec2config.EBSVolumeType == "" {
                instanceSet.ec2config.EBSVolumeType = "gp2"
        }
+
+       // Set up metrics
+       instanceSet.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "ec2_instances",
+               Help:      "Number of instances running",
+       }, []string{"subnet_id"})
+       instanceSet.mInstanceStarts = prometheus.NewCounterVec(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "ec2_instance_starts_total",
+               Help:      "Number of attempts to start a new instance",
+       }, []string{"subnet_id", "success"})
+       // Initialize all of the series we'll be reporting.  Otherwise
+       // the {subnet=A, success=0} series doesn't appear in metrics
+       // at all until there's a failure in subnet A.
+       for _, subnet := range instanceSet.ec2config.SubnetID {
+               instanceSet.mInstanceStarts.WithLabelValues(subnet, "0").Add(0)
+               instanceSet.mInstanceStarts.WithLabelValues(subnet, "1").Add(0)
+       }
+       if len(instanceSet.ec2config.SubnetID) == 0 {
+               instanceSet.mInstanceStarts.WithLabelValues("", "0").Add(0)
+               instanceSet.mInstanceStarts.WithLabelValues("", "1").Add(0)
+       }
+       if reg != nil {
+               reg.MustRegister(instanceSet.mInstances)
+               reg.MustRegister(instanceSet.mInstanceStarts)
+       }
+
        return instanceSet, nil
 }
 
@@ -259,11 +293,14 @@ func (instanceSet *ec2InstanceSet) Create(
        currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
        for tryOffset := 0; ; tryOffset++ {
                tryIndex := 0
+               trySubnet := ""
                if len(subnets) > 0 {
                        tryIndex = (currentSubnetIDIndex + tryOffset) % len(subnets)
-                       rii.NetworkInterfaces[0].SubnetId = aws.String(subnets[tryIndex])
+                       trySubnet = subnets[tryIndex]
+                       rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
                }
                rsv, err = instanceSet.client.RunInstances(&rii)
+               instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
                if isErrorSubnetSpecific(err) &&
                        tryOffset < len(subnets)-1 {
                        instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
@@ -381,6 +418,24 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
                }
                instanceSet.updateSpotPrices(instances)
        }
+
+       // Count instances in each subnet, and report in metrics.
+       subnetInstances := map[string]int{"": 0}
+       for _, subnet := range instanceSet.ec2config.SubnetID {
+               subnetInstances[subnet] = 0
+       }
+       for _, inst := range instances {
+               subnet := inst.(*ec2Instance).instance.SubnetId
+               if subnet != nil {
+                       subnetInstances[*subnet]++
+               } else {
+                       subnetInstances[""]++
+               }
+       }
+       for subnet, count := range subnetInstances {
+               instanceSet.mInstances.WithLabelValues(subnet).Set(float64(count))
+       }
+
        return instances, err
 }
 
@@ -610,21 +665,36 @@ func (err rateLimitError) EarliestRetry() time.Time {
        return err.earliestRetry
 }
 
-var isCodeCapacity = map[string]bool{
+type capacityError struct {
+       error
+       isInstanceTypeSpecific bool
+}
+
+func (er *capacityError) IsCapacityError() bool {
+       return true
+}
+
+func (er *capacityError) IsInstanceTypeSpecific() bool {
+       return er.isInstanceTypeSpecific
+}
+
+var isCodeQuota = map[string]bool{
        "InstanceLimitExceeded":             true,
        "InsufficientAddressCapacity":       true,
        "InsufficientFreeAddressesInSubnet": true,
-       "InsufficientInstanceCapacity":      true,
        "InsufficientVolumeCapacity":        true,
        "MaxSpotInstanceCountExceeded":      true,
        "VcpuLimitExceeded":                 true,
 }
 
-// isErrorCapacity returns whether the error is to be throttled based on its code.
+// isErrorQuota returns whether the error indicates we have reached
+// some usage quota/limit -- i.e., immediately retrying with an equal
+// or larger instance type will probably not work.
+//
 // Returns false if error is nil.
-func isErrorCapacity(err error) bool {
+func isErrorQuota(err error) bool {
        if aerr, ok := err.(awserr.Error); ok && aerr != nil {
-               if _, ok := isCodeCapacity[aerr.Code()]; ok {
+               if _, ok := isCodeQuota[aerr.Code()]; ok {
                        return true
                }
        }
@@ -665,8 +735,10 @@ func wrapError(err error, throttleValue *atomic.Value) error {
                }
                throttleValue.Store(d)
                return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
-       } else if isErrorCapacity(err) {
+       } else if isErrorQuota(err) {
                return &ec2QuotaError{err}
+       } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+               return &capacityError{err, true}
        } else if err != nil {
                throttleValue.Store(time.Duration(0))
                return err
@@ -674,3 +746,5 @@ func wrapError(err error, throttleValue *atomic.Value) error {
        throttleValue.Store(time.Duration(0))
        return nil
 }
+
+var boolLabelValue = map[bool]string{false: "0", true: "1"}