import (
"encoding/json"
+ "errors"
"flag"
+ "fmt"
"sync/atomic"
"testing"
"time"
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/config"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
+ "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
check.TestingT(t)
}
+type sliceOrStringSuite struct{}
+
+var _ = check.Suite(&sliceOrStringSuite{})
+
+func (s *sliceOrStringSuite) TestUnmarshal(c *check.C) {
+ var conf ec2InstanceSetConfig
+ for _, trial := range []struct {
+ input string
+ output sliceOrSingleString
+ }{
+ {``, nil},
+ {`""`, nil},
+ {`[]`, nil},
+ {`"foo"`, sliceOrSingleString{"foo"}},
+ {`["foo"]`, sliceOrSingleString{"foo"}},
+ {`[foo]`, sliceOrSingleString{"foo"}},
+ {`["foo", "bar"]`, sliceOrSingleString{"foo", "bar"}},
+ {`[foo-bar, baz]`, sliceOrSingleString{"foo-bar", "baz"}},
+ } {
+ c.Logf("trial: %+v", trial)
+ err := yaml.Unmarshal([]byte("SubnetID: "+trial.input+"\n"), &conf)
+ if !c.Check(err, check.IsNil) {
+ continue
+ }
+ c.Check(conf.SubnetID, check.DeepEquals, trial.output)
+ }
+}
+
type EC2InstanceSetSuite struct{}
var _ = check.Suite(&EC2InstanceSetSuite{})
}
type ec2stub struct {
- c *check.C
- reftime time.Time
+ c *check.C
+ reftime time.Time
+ importKeyPairCalls []*ec2.ImportKeyPairInput
+ describeKeyPairsCalls []*ec2.DescribeKeyPairsInput
+ runInstancesCalls []*ec2.RunInstancesInput
+ // {subnetID => error}: RunInstances returns error if subnetID
+ // matches.
+ subnetErrorOnRunInstances map[string]error
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
+ e.importKeyPairCalls = append(e.importKeyPairCalls, input)
return nil, nil
}
func (e *ec2stub) DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error) {
+ e.describeKeyPairsCalls = append(e.describeKeyPairsCalls, input)
return &ec2.DescribeKeyPairsOutput{}, nil
}
func (e *ec2stub) RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error) {
+ e.runInstancesCalls = append(e.runInstancesCalls, input)
+ if len(input.NetworkInterfaces) > 0 && input.NetworkInterfaces[0].SubnetId != nil {
+ err := e.subnetErrorOnRunInstances[*input.NetworkInterfaces[0].SubnetId]
+ if err != nil {
+ return nil, err
+ }
+ }
return &ec2.Reservation{Instances: []*ec2.Instance{{
InstanceId: aws.String("i-123"),
InstanceType: aws.String("t2.micro"),
InstanceLifecycle: aws.String("spot"),
InstanceType: aws.String("t2.micro"),
PrivateIpAddress: aws.String("10.1.2.3"),
- State: &ec2.InstanceState{Name: aws.String("running")},
+ State: &ec2.InstanceState{Name: aws.String("running"), Code: aws.Int64(16)},
}, {
InstanceId: aws.String("i-124"),
InstanceLifecycle: aws.String("spot"),
InstanceType: aws.String("t2.micro"),
PrivateIpAddress: aws.String("10.1.2.4"),
- State: &ec2.InstanceState{Name: aws.String("running")},
+ State: &ec2.InstanceState{Name: aws.String("running"), Code: aws.Int64(16)},
}},
}},
}, nil
return nil, nil
}
-func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster) {
+type ec2stubError struct {
+ code string
+ message string
+}
+
+func (err *ec2stubError) Code() string { return err.code }
+func (err *ec2stubError) Message() string { return err.message }
+func (err *ec2stubError) Error() string { return fmt.Sprintf("%s: %s", err.code, err.message) }
+func (err *ec2stubError) OrigErr() error { return errors.New("stub OrigErr") }
+
+// Ensure ec2stubError satisfies the aws.Error interface
+var _ = awserr.Error(&ec2stubError{})
+
+func GetInstanceSet(c *check.C, conf string) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster, *prometheus.Registry) {
+ reg := prometheus.NewRegistry()
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
err := config.LoadFile(&exampleCfg, *live)
c.Assert(err, check.IsNil)
- ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
+ is, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger(), reg)
c.Assert(err, check.IsNil)
- return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
- }
- ap := ec2InstanceSet{
- ec2config: ec2InstanceSetConfig{
- SpotPriceUpdateInterval: arvados.Duration(time.Hour),
- },
- instanceSetID: "test123",
- logger: logrus.StandardLogger(),
- client: &ec2stub{c: c, reftime: time.Now().UTC()},
- keys: make(map[string]string),
+ return is.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, reg
+ } else {
+ is, err := newEC2InstanceSet(json.RawMessage(conf), "test123", nil, ctxlog.TestLogger(c), reg)
+ c.Assert(err, check.IsNil)
+ is.(*ec2InstanceSet).client = &ec2stub{c: c, reftime: time.Now().UTC()}
+ return is.(*ec2InstanceSet), cloud.ImageID("blob"), cluster, reg
}
- return &ap, cloud.ImageID("blob"), cluster
}
func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 1)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 1)
+ }
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
- pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
-
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
"TestTagName": "test tag value",
- }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", nil)
c.Assert(err, check.IsNil)
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ // Should not have called key pair APIs, because
+ // publickey arg was nil
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 0)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 0)
+ }
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"],
}
+func (*EC2InstanceSetSuite) TestCreateFailoverSecondSubnet(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-good"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ }
+ inst, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.IsNil)
+ c.Check(inst, check.NotNil)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 1\n`+
+ `.*`)
+
+ // Next RunInstances call should try the working subnet first
+ inst, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.IsNil)
+ c.Check(inst, check.NotNil)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 3)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 2\n`+
+ `.*`)
+}
+
+func (*EC2InstanceSetSuite) TestCreateAllSubnetsFailing(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ "subnet-broken": &ec2stubError{
+ code: "InvalidSubnetId.NotFound",
+ message: "bogus subnet id",
+ },
+ }
+ _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InvalidSubnetId\.NotFound.*`)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+
+ _, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InsufficientFreeAddressesInSubnet.*`)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 4)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+}
+
+func (*EC2InstanceSetSuite) TestCreateOneSubnetFailingCapacity(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ "subnet-broken": &ec2stubError{
+ code: "InsufficientInstanceCapacity",
+ message: "insufficient capacity",
+ },
+ }
+ for i := 0; i < 3; i++ {
+ _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InsufficientInstanceCapacity.*`)
+ }
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 6)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 3\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 3\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+}
+
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, reg := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
tg := i.Tags()
c.Logf("%v %v %v", i.String(), i.Address(), tg)
}
+
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instances{subnet_id="[^"]*"} \d+\n`+
+ `.*`)
}
func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
}
}()
+ ap.ec2config.SpotPriceUpdateInterval = arvados.Duration(time.Hour)
ap.ec2config.EBSPrice = 0.1 // $/GiB/month
inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
c.Assert(err, check.IsNil)
instances, err = ap.Instances(tags)
running := 0
for _, inst := range instances {
- if *inst.(*ec2Instance).instance.InstanceLifecycle == "spot" {
+ ec2i := inst.(*ec2Instance).instance
+ if *ec2i.InstanceLifecycle == "spot" && *ec2i.State.Code&16 != 0 {
running++
}
}
c.Logf("instances are running, and identifiable as spot instances")
break
}
- c.Logf("waiting for instances to be identifiable as spot instances...")
+ c.Logf("waiting for instances to reach running state so their availability zone becomes visible...")
time.Sleep(10 * time.Second)
}
_, ok := wrapped.(cloud.RateLimitError)
c.Check(ok, check.Equals, true)
- quotaError := awserr.New("InsufficientInstanceCapacity", "", nil)
+ quotaError := awserr.New("InstanceLimitExceeded", "", nil)
wrapped = wrapError(quotaError, nil)
_, ok = wrapped.(cloud.QuotaError)
c.Check(ok, check.Equals, true)
+
+ for _, trial := range []struct {
+ code string
+ msg string
+ }{
+ {"InsufficientInstanceCapacity", ""},
+ {"Unsupported", "Your requested instance type (t3.micro) is not supported in your requested Availability Zone (us-east-1e). Please retry your request by not specifying an Availability Zone or choosing us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f."},
+ } {
+ capacityError := awserr.New(trial.code, trial.msg, nil)
+ wrapped = wrapError(capacityError, nil)
+ caperr, ok := wrapped.(cloud.CapacityError)
+ c.Check(ok, check.Equals, true)
+ c.Check(caperr.IsCapacityError(), check.Equals, true)
+ c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
+ }
}