prices map[priceKey][]cloud.InstancePrice
pricesLock sync.Mutex
pricesUpdated map[priceKey]time.Time
+
+ mInstances *prometheus.GaugeVec
+ mInstanceStarts *prometheus.CounterVec
}
func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
if instanceSet.ec2config.EBSVolumeType == "" {
instanceSet.ec2config.EBSVolumeType = "gp2"
}
+
+ // Set up metrics
+ instanceSet.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instances",
+ Help: "Number of instances running",
+ }, []string{"subnet_id"})
+ instanceSet.mInstanceStarts = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instance_starts_total",
+ Help: "Number of attempts to start a new instance",
+ }, []string{"subnet_id", "success"})
+ // Initialize all of the series we'll be reporting. Otherwise
+ // the {subnet=A, success=0} series doesn't appear in metrics
+ // at all until there's a failure in subnet A.
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "1").Add(0)
+ }
+ if len(instanceSet.ec2config.SubnetID) == 0 {
+ instanceSet.mInstanceStarts.WithLabelValues("", "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues("", "1").Add(0)
+ }
+ if reg != nil {
+ reg.MustRegister(instanceSet.mInstances)
+ reg.MustRegister(instanceSet.mInstanceStarts)
+ }
+
return instanceSet, nil
}
currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
for tryOffset := 0; ; tryOffset++ {
tryIndex := 0
+ trySubnet := ""
if len(subnets) > 0 {
tryIndex = (currentSubnetIDIndex + tryOffset) % len(subnets)
- rii.NetworkInterfaces[0].SubnetId = aws.String(subnets[tryIndex])
+ trySubnet = subnets[tryIndex]
+ rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
}
rsv, err = instanceSet.client.RunInstances(&rii)
+ instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
if isErrorSubnetSpecific(err) &&
tryOffset < len(subnets)-1 {
instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
}
instanceSet.updateSpotPrices(instances)
}
+
+ // Count instances in each subnet, and report in metrics.
+ subnetInstances := map[string]int{"": 0}
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ subnetInstances[subnet] = 0
+ }
+ for _, inst := range instances {
+ subnet := inst.(*ec2Instance).instance.SubnetId
+ if subnet != nil {
+ subnetInstances[*subnet]++
+ } else {
+ subnetInstances[""]++
+ }
+ }
+ for subnet, count := range subnetInstances {
+ instanceSet.mInstances.WithLabelValues(subnet).Set(float64(count))
+ }
+
return instances, err
}
return err.earliestRetry
}
-var isCodeCapacity = map[string]bool{
+type capacityError struct {
+ error
+ isInstanceTypeSpecific bool
+}
+
+func (er *capacityError) IsCapacityError() bool {
+ return true
+}
+
+func (er *capacityError) IsInstanceTypeSpecific() bool {
+ return er.isInstanceTypeSpecific
+}
+
+var isCodeQuota = map[string]bool{
"InstanceLimitExceeded": true,
"InsufficientAddressCapacity": true,
"InsufficientFreeAddressesInSubnet": true,
- "InsufficientInstanceCapacity": true,
"InsufficientVolumeCapacity": true,
"MaxSpotInstanceCountExceeded": true,
"VcpuLimitExceeded": true,
}
-// isErrorCapacity returns whether the error is to be throttled based on its code.
+// isErrorQuota returns whether the error indicates we have reached
+// some usage quota/limit -- i.e., immediately retrying with an equal
+// or larger instance type will probably not work.
+//
// Returns false if error is nil.
-func isErrorCapacity(err error) bool {
+func isErrorQuota(err error) bool {
if aerr, ok := err.(awserr.Error); ok && aerr != nil {
- if _, ok := isCodeCapacity[aerr.Code()]; ok {
+ if _, ok := isCodeQuota[aerr.Code()]; ok {
return true
}
}
}
throttleValue.Store(d)
return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
- } else if isErrorCapacity(err) {
+ } else if isErrorQuota(err) {
return &ec2QuotaError{err}
+ } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+ return &capacityError{err, true}
} else if err != nil {
throttleValue.Store(time.Duration(0))
return err
throttleValue.Store(time.Duration(0))
return nil
}
+
+var boolLabelValue = map[bool]string{false: "0", true: "1"}