"fmt"
"math/big"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
SecretAccessKey string
Region string
SecurityGroupIDs arvados.StringSet
- SubnetID string
+ SubnetID sliceOrSingleString
AdminUsername string
EBSVolumeType string
EBSPrice float64
SpotPriceUpdateInterval arvados.Duration
}
+type sliceOrSingleString []string
+
+// UnmarshalJSON unmarshals an array of strings, and also accepts ""
+// as [], and "foo" as ["foo"].
+func (ss *sliceOrSingleString) UnmarshalJSON(data []byte) error {
+ if len(data) == 0 {
+ *ss = nil
+ } else if data[0] == '[' {
+ var slice []string
+ err := json.Unmarshal(data, &slice)
+ if err != nil {
+ return err
+ }
+ if len(slice) == 0 {
+ *ss = nil
+ } else {
+ *ss = slice
+ }
+ } else {
+ var str string
+ err := json.Unmarshal(data, &str)
+ if err != nil {
+ return err
+ }
+ if str == "" {
+ *ss = nil
+ } else {
+ *ss = []string{str}
+ }
+ }
+ return nil
+}
+
type ec2Interface interface {
DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error)
ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error)
type ec2InstanceSet struct {
ec2config ec2InstanceSetConfig
+ currentSubnetIDIndex int32
instanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
client ec2Interface
prices map[priceKey][]cloud.InstancePrice
pricesLock sync.Mutex
pricesUpdated map[priceKey]time.Time
+
+ mInstances *prometheus.GaugeVec
+ mInstanceStarts *prometheus.CounterVec
}
-func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
instanceSet := &ec2InstanceSet{
instanceSetID: instanceSetID,
logger: logger,
if instanceSet.ec2config.EBSVolumeType == "" {
instanceSet.ec2config.EBSVolumeType = "gp2"
}
+
+ // Set up metrics
+ instanceSet.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instances",
+ Help: "Number of instances running",
+ }, []string{"subnet_id"})
+ instanceSet.mInstanceStarts = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instance_starts_total",
+ Help: "Number of attempts to start a new instance",
+ }, []string{"subnet_id", "success"})
+ // Initialize all of the series we'll be reporting. Otherwise
+ // the {subnet=A, success=0} series doesn't appear in metrics
+ // at all until there's a failure in subnet A.
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "1").Add(0)
+ }
+ if len(instanceSet.ec2config.SubnetID) == 0 {
+ instanceSet.mInstanceStarts.WithLabelValues("", "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues("", "1").Add(0)
+ }
+ if reg != nil {
+ reg.MustRegister(instanceSet.mInstances)
+ reg.MustRegister(instanceSet.mInstanceStarts)
+ }
+
return instanceSet, nil
}
initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
- md5keyFingerprint, sha1keyFingerprint, err := awsKeyFingerprint(publicKey)
- if err != nil {
- return nil, fmt.Errorf("Could not make key fingerprint: %v", err)
- }
- instanceSet.keysMtx.Lock()
- var keyname string
- var ok bool
- if keyname, ok = instanceSet.keys[md5keyFingerprint]; !ok {
- keyout, err := instanceSet.client.DescribeKeyPairs(&ec2.DescribeKeyPairsInput{
- Filters: []*ec2.Filter{{
- Name: aws.String("fingerprint"),
- Values: []*string{&md5keyFingerprint, &sha1keyFingerprint},
- }},
- })
- if err != nil {
- return nil, fmt.Errorf("Could not search for keypair: %v", err)
- }
-
- if len(keyout.KeyPairs) > 0 {
- keyname = *(keyout.KeyPairs[0].KeyName)
- } else {
- keyname = "arvados-dispatch-keypair-" + md5keyFingerprint
- _, err := instanceSet.client.ImportKeyPair(&ec2.ImportKeyPairInput{
- KeyName: &keyname,
- PublicKeyMaterial: ssh.MarshalAuthorizedKey(publicKey),
- })
- if err != nil {
- return nil, fmt.Errorf("Could not import keypair: %v", err)
- }
- }
- instanceSet.keys[md5keyFingerprint] = keyname
- }
- instanceSet.keysMtx.Unlock()
-
ec2tags := []*ec2.Tag{}
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
InstanceType: &instanceType.ProviderType,
MaxCount: aws.Int64(1),
MinCount: aws.Int64(1),
- KeyName: &keyname,
NetworkInterfaces: []*ec2.InstanceNetworkInterfaceSpecification{
{
DeleteOnTermination: aws.Bool(true),
DeviceIndex: aws.Int64(0),
Groups: aws.StringSlice(groups),
- SubnetId: &instanceSet.ec2config.SubnetID,
}},
DisableApiTermination: aws.Bool(false),
InstanceInitiatedShutdownBehavior: aws.String("terminate"),
UserData: aws.String(base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))),
}
+ if publicKey != nil {
+ keyname, err := instanceSet.getKeyName(publicKey)
+ if err != nil {
+ return nil, err
+ }
+ rii.KeyName = &keyname
+ }
+
if instanceType.AddedScratch > 0 {
rii.BlockDeviceMappings = []*ec2.BlockDeviceMapping{{
DeviceName: aws.String("/dev/xvdt"),
}
}
- rsv, err := instanceSet.client.RunInstances(&rii)
+ var rsv *ec2.Reservation
+ var err error
+ subnets := instanceSet.ec2config.SubnetID
+ currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
+ for tryOffset := 0; ; tryOffset++ {
+ tryIndex := 0
+ trySubnet := ""
+ if len(subnets) > 0 {
+ tryIndex = (currentSubnetIDIndex + tryOffset) % len(subnets)
+ trySubnet = subnets[tryIndex]
+ rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
+ }
+ rsv, err = instanceSet.client.RunInstances(&rii)
+ instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
+ if isErrorSubnetSpecific(err) &&
+ tryOffset < len(subnets)-1 {
+ instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
+ Warn("RunInstances failed, trying next subnet")
+ continue
+ }
+ // Succeeded, or exhausted all subnets, or got a
+ // non-subnet-related error.
+ //
+ // We intentionally update currentSubnetIDIndex even
+ // in the non-retryable-failure case here to avoid a
+ // situation where successive calls to Create() keep
+ // returning errors for the same subnet (perhaps
+ // "subnet full") and never reveal the errors for the
+ // other configured subnets (perhaps "subnet ID
+ // invalid").
+ atomic.StoreInt32(&instanceSet.currentSubnetIDIndex, int32(tryIndex))
+ break
+ }
err = wrapError(err, &instanceSet.throttleDelayCreate)
if err != nil {
return nil, err
}, nil
}
+func (instanceSet *ec2InstanceSet) getKeyName(publicKey ssh.PublicKey) (string, error) {
+ instanceSet.keysMtx.Lock()
+ defer instanceSet.keysMtx.Unlock()
+ md5keyFingerprint, sha1keyFingerprint, err := awsKeyFingerprint(publicKey)
+ if err != nil {
+ return "", fmt.Errorf("Could not make key fingerprint: %v", err)
+ }
+ if keyname, ok := instanceSet.keys[md5keyFingerprint]; ok {
+ return keyname, nil
+ }
+ keyout, err := instanceSet.client.DescribeKeyPairs(&ec2.DescribeKeyPairsInput{
+ Filters: []*ec2.Filter{{
+ Name: aws.String("fingerprint"),
+ Values: []*string{&md5keyFingerprint, &sha1keyFingerprint},
+ }},
+ })
+ if err != nil {
+ return "", fmt.Errorf("Could not search for keypair: %v", err)
+ }
+ if len(keyout.KeyPairs) > 0 {
+ return *(keyout.KeyPairs[0].KeyName), nil
+ }
+ keyname := "arvados-dispatch-keypair-" + md5keyFingerprint
+ _, err = instanceSet.client.ImportKeyPair(&ec2.ImportKeyPairInput{
+ KeyName: &keyname,
+ PublicKeyMaterial: ssh.MarshalAuthorizedKey(publicKey),
+ })
+ if err != nil {
+ return "", fmt.Errorf("Could not import keypair: %v", err)
+ }
+ instanceSet.keys[md5keyFingerprint] = keyname
+ return keyname, nil
+}
+
func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) {
var filters []*ec2.Filter
for k, v := range tags {
}
instanceSet.updateSpotPrices(instances)
}
+
+ // Count instances in each subnet, and report in metrics.
+ subnetInstances := map[string]int{"": 0}
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ subnetInstances[subnet] = 0
+ }
+ for _, inst := range instances {
+ subnet := inst.(*ec2Instance).instance.SubnetId
+ if subnet != nil {
+ subnetInstances[*subnet]++
+ } else {
+ subnetInstances[""]++
+ }
+ }
+ for subnet, count := range subnetInstances {
+ instanceSet.mInstances.WithLabelValues(subnet).Set(float64(count))
+ }
+
return instances, err
}
}
var isCodeCapacity = map[string]bool{
- "InsufficientInstanceCapacity": true,
- "VcpuLimitExceeded": true,
- "MaxSpotInstanceCountExceeded": true,
+ "InstanceLimitExceeded": true,
+ "InsufficientAddressCapacity": true,
+ "InsufficientFreeAddressesInSubnet": true,
+ "InsufficientInstanceCapacity": true,
+ "InsufficientVolumeCapacity": true,
+ "MaxSpotInstanceCountExceeded": true,
+ "VcpuLimitExceeded": true,
}
// isErrorCapacity returns whether the error is to be throttled based on its code.
return false
}
+// isErrorSubnetSpecific returns true if the problem encountered by
+// RunInstances might be avoided by trying a different subnet.
+func isErrorSubnetSpecific(err error) bool {
+ aerr, ok := err.(awserr.Error)
+ if !ok {
+ return false
+ }
+ code := aerr.Code()
+ return strings.Contains(code, "Subnet") ||
+ code == "InsufficientInstanceCapacity" ||
+ code == "InsufficientVolumeCapacity"
+}
+
type ec2QuotaError struct {
error
}
throttleValue.Store(time.Duration(0))
return nil
}
+
+var boolLabelValue = map[bool]string{false: "0", true: "1"}