"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
logger logrus.FieldLogger
}
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
azcfg := azureInstanceSetConfig{}
err = json.Unmarshal(config, &azcfg)
if err != nil {
return nil, cloud.ImageID(""), cluster, err
}
- ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
+ ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger(), nil)
return ap.(*azureInstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
ap := azureInstanceSet{
failed bool
}
+// Run the test suite once for each applicable permutation of
+// DriverParameters. Return true if everything worked.
+//
+// Currently this means run once for each configured SubnetID.
+func (t *tester) Run() bool {
+ var dp map[string]interface{}
+ if len(t.DriverParameters) > 0 {
+ err := json.Unmarshal(t.DriverParameters, &dp)
+ if err != nil {
+ t.Logger.WithError(err).Error("error decoding configured CloudVMs.DriverParameters")
+ return false
+ }
+ }
+ subnets, ok := dp["SubnetID"].([]interface{})
+ if !ok || len(subnets) <= 1 {
+ // Easy, only one SubnetID to test.
+ return t.runWithDriverParameters(t.DriverParameters)
+ }
+
+ deferredError := false
+ for i, subnet := range subnets {
+ subnet, ok := subnet.(string)
+ if !ok {
+ t.Logger.Errorf("CloudVMs.DriverParameters.SubnetID[%d] is invalid -- must be a string", i)
+ deferredError = true
+ continue
+ }
+ dp["SubnetID"] = subnet
+ t.Logger.Infof("running tests using SubnetID[%d] %q", i, subnet)
+ dpjson, err := json.Marshal(dp)
+ if err != nil {
+ t.Logger.WithError(err).Error("error encoding driver parameters")
+ deferredError = true
+ continue
+ }
+ ok = t.runWithDriverParameters(dpjson)
+ if !ok {
+ t.Logger.Infof("failed tests using SubnetID[%d] %q", i, subnet)
+ deferredError = true
+ }
+ }
+ return !deferredError
+}
+
// Run the test suite as specified, clean up as needed, and return
// true (everything is OK) or false (something went wrong).
-func (t *tester) Run() bool {
+func (t *tester) runWithDriverParameters(driverParameters json.RawMessage) bool {
// This flag gets set when we encounter a non-fatal error, so
// we can continue doing more tests but remember to return
// false (failure) at the end.
deferredError := false
var err error
- t.is, err = t.Driver.InstanceSet(t.DriverParameters, t.SetID, t.Tags, t.Logger)
+ t.is, err = t.Driver.InstanceSet(driverParameters, t.SetID, t.Tags, t.Logger, nil)
if err != nil {
t.Logger.WithError(err).Info("error initializing driver")
return false
"fmt"
"math/big"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
SecretAccessKey string
Region string
SecurityGroupIDs arvados.StringSet
- SubnetID string
+ SubnetID sliceOrSingleString
AdminUsername string
EBSVolumeType string
EBSPrice float64
SpotPriceUpdateInterval arvados.Duration
}
+type sliceOrSingleString []string
+
+// UnmarshalJSON unmarshals an array of strings, and also accepts ""
+// as [], and "foo" as ["foo"].
+func (ss *sliceOrSingleString) UnmarshalJSON(data []byte) error {
+ if len(data) == 0 {
+ *ss = nil
+ } else if data[0] == '[' {
+ var slice []string
+ err := json.Unmarshal(data, &slice)
+ if err != nil {
+ return err
+ }
+ if len(slice) == 0 {
+ *ss = nil
+ } else {
+ *ss = slice
+ }
+ } else {
+ var str string
+ err := json.Unmarshal(data, &str)
+ if err != nil {
+ return err
+ }
+ if str == "" {
+ *ss = nil
+ } else {
+ *ss = []string{str}
+ }
+ }
+ return nil
+}
+
type ec2Interface interface {
DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error)
ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error)
type ec2InstanceSet struct {
ec2config ec2InstanceSetConfig
+ currentSubnetIDIndex int32
instanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
client ec2Interface
prices map[priceKey][]cloud.InstancePrice
pricesLock sync.Mutex
pricesUpdated map[priceKey]time.Time
+
+ mInstances *prometheus.GaugeVec
+ mInstanceStarts *prometheus.CounterVec
}
-func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
instanceSet := &ec2InstanceSet{
instanceSetID: instanceSetID,
logger: logger,
if instanceSet.ec2config.EBSVolumeType == "" {
instanceSet.ec2config.EBSVolumeType = "gp2"
}
+
+ // Set up metrics
+ instanceSet.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instances",
+ Help: "Number of instances running",
+ }, []string{"subnet_id"})
+ instanceSet.mInstanceStarts = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instance_starts_total",
+ Help: "Number of attempts to start a new instance",
+ }, []string{"subnet_id", "success"})
+ // Initialize all of the series we'll be reporting. Otherwise
+ // the {subnet=A, success=0} series doesn't appear in metrics
+ // at all until there's a failure in subnet A.
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "1").Add(0)
+ }
+ if len(instanceSet.ec2config.SubnetID) == 0 {
+ instanceSet.mInstanceStarts.WithLabelValues("", "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues("", "1").Add(0)
+ }
+ if reg != nil {
+ reg.MustRegister(instanceSet.mInstances)
+ reg.MustRegister(instanceSet.mInstanceStarts)
+ }
+
return instanceSet, nil
}
DeleteOnTermination: aws.Bool(true),
DeviceIndex: aws.Int64(0),
Groups: aws.StringSlice(groups),
- SubnetId: &instanceSet.ec2config.SubnetID,
}},
DisableApiTermination: aws.Bool(false),
InstanceInitiatedShutdownBehavior: aws.String("terminate"),
}
}
- rsv, err := instanceSet.client.RunInstances(&rii)
+ var rsv *ec2.Reservation
+ var err error
+ subnets := instanceSet.ec2config.SubnetID
+ currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
+ for tryOffset := 0; ; tryOffset++ {
+ tryIndex := 0
+ trySubnet := ""
+ if len(subnets) > 0 {
+ tryIndex = (currentSubnetIDIndex + tryOffset) % len(subnets)
+ trySubnet = subnets[tryIndex]
+ rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
+ }
+ rsv, err = instanceSet.client.RunInstances(&rii)
+ instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
+ if isErrorSubnetSpecific(err) &&
+ tryOffset < len(subnets)-1 {
+ instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
+ Warn("RunInstances failed, trying next subnet")
+ continue
+ }
+ // Succeeded, or exhausted all subnets, or got a
+ // non-subnet-related error.
+ //
+ // We intentionally update currentSubnetIDIndex even
+ // in the non-retryable-failure case here to avoid a
+ // situation where successive calls to Create() keep
+ // returning errors for the same subnet (perhaps
+ // "subnet full") and never reveal the errors for the
+ // other configured subnets (perhaps "subnet ID
+ // invalid").
+ atomic.StoreInt32(&instanceSet.currentSubnetIDIndex, int32(tryIndex))
+ break
+ }
err = wrapError(err, &instanceSet.throttleDelayCreate)
if err != nil {
return nil, err
}
instanceSet.updateSpotPrices(instances)
}
+
+ // Count instances in each subnet, and report in metrics.
+ subnetInstances := map[string]int{"": 0}
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ subnetInstances[subnet] = 0
+ }
+ for _, inst := range instances {
+ subnet := inst.(*ec2Instance).instance.SubnetId
+ if subnet != nil {
+ subnetInstances[*subnet]++
+ } else {
+ subnetInstances[""]++
+ }
+ }
+ for subnet, count := range subnetInstances {
+ instanceSet.mInstances.WithLabelValues(subnet).Set(float64(count))
+ }
+
return instances, err
}
}
var isCodeCapacity = map[string]bool{
+ "InstanceLimitExceeded": true,
+ "InsufficientAddressCapacity": true,
"InsufficientFreeAddressesInSubnet": true,
"InsufficientInstanceCapacity": true,
"InsufficientVolumeCapacity": true,
return false
}
+// isErrorSubnetSpecific returns true if the problem encountered by
+// RunInstances might be avoided by trying a different subnet.
+func isErrorSubnetSpecific(err error) bool {
+ aerr, ok := err.(awserr.Error)
+ if !ok {
+ return false
+ }
+ code := aerr.Code()
+ return strings.Contains(code, "Subnet") ||
+ code == "InsufficientInstanceCapacity" ||
+ code == "InsufficientVolumeCapacity"
+}
+
type ec2QuotaError struct {
error
}
throttleValue.Store(time.Duration(0))
return nil
}
+
+var boolLabelValue = map[bool]string{false: "0", true: "1"}
import (
"encoding/json"
+ "errors"
"flag"
+ "fmt"
"sync/atomic"
"testing"
"time"
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/config"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
+ "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
check.TestingT(t)
}
+type sliceOrStringSuite struct{}
+
+var _ = check.Suite(&sliceOrStringSuite{})
+
+func (s *sliceOrStringSuite) TestUnmarshal(c *check.C) {
+ var conf ec2InstanceSetConfig
+ for _, trial := range []struct {
+ input string
+ output sliceOrSingleString
+ }{
+ {``, nil},
+ {`""`, nil},
+ {`[]`, nil},
+ {`"foo"`, sliceOrSingleString{"foo"}},
+ {`["foo"]`, sliceOrSingleString{"foo"}},
+ {`[foo]`, sliceOrSingleString{"foo"}},
+ {`["foo", "bar"]`, sliceOrSingleString{"foo", "bar"}},
+ {`[foo-bar, baz]`, sliceOrSingleString{"foo-bar", "baz"}},
+ } {
+ c.Logf("trial: %+v", trial)
+ err := yaml.Unmarshal([]byte("SubnetID: "+trial.input+"\n"), &conf)
+ if !c.Check(err, check.IsNil) {
+ continue
+ }
+ c.Check(conf.SubnetID, check.DeepEquals, trial.output)
+ }
+}
+
type EC2InstanceSetSuite struct{}
var _ = check.Suite(&EC2InstanceSetSuite{})
reftime time.Time
importKeyPairCalls []*ec2.ImportKeyPairInput
describeKeyPairsCalls []*ec2.DescribeKeyPairsInput
+ runInstancesCalls []*ec2.RunInstancesInput
+ // {subnetID => error}: RunInstances returns error if subnetID
+ // matches.
+ subnetErrorOnRunInstances map[string]error
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
}
func (e *ec2stub) RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error) {
+ e.runInstancesCalls = append(e.runInstancesCalls, input)
+ if len(input.NetworkInterfaces) > 0 && input.NetworkInterfaces[0].SubnetId != nil {
+ err := e.subnetErrorOnRunInstances[*input.NetworkInterfaces[0].SubnetId]
+ if err != nil {
+ return nil, err
+ }
+ }
return &ec2.Reservation{Instances: []*ec2.Instance{{
InstanceId: aws.String("i-123"),
InstanceType: aws.String("t2.micro"),
return nil, nil
}
-func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster) {
+type ec2stubError struct {
+ code string
+ message string
+}
+
+func (err *ec2stubError) Code() string { return err.code }
+func (err *ec2stubError) Message() string { return err.message }
+func (err *ec2stubError) Error() string { return fmt.Sprintf("%s: %s", err.code, err.message) }
+func (err *ec2stubError) OrigErr() error { return errors.New("stub OrigErr") }
+
+// Ensure ec2stubError satisfies the aws.Error interface
+var _ = awserr.Error(&ec2stubError{})
+
+func GetInstanceSet(c *check.C, conf string) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster, *prometheus.Registry) {
+ reg := prometheus.NewRegistry()
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
err := config.LoadFile(&exampleCfg, *live)
c.Assert(err, check.IsNil)
- ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
+ is, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger(), reg)
c.Assert(err, check.IsNil)
- return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
- }
- ap := ec2InstanceSet{
- instanceSetID: "test123",
- logger: logrus.StandardLogger(),
- client: &ec2stub{c: c, reftime: time.Now().UTC()},
- keys: make(map[string]string),
+ return is.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, reg
+ } else {
+ is, err := newEC2InstanceSet(json.RawMessage(conf), "test123", nil, ctxlog.TestLogger(c), reg)
+ c.Assert(err, check.IsNil)
+ is.(*ec2InstanceSet).client = &ec2stub{c: c, reftime: time.Now().UTC()}
+ return is.(*ec2InstanceSet), cloud.ImageID("blob"), cluster, reg
}
- return &ap, cloud.ImageID("blob"), cluster
}
func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
"TestTagName": "test tag value",
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"],
}
+func (*EC2InstanceSetSuite) TestCreateFailoverSecondSubnet(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-good"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ }
+ inst, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.IsNil)
+ c.Check(inst, check.NotNil)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 1\n`+
+ `.*`)
+
+ // Next RunInstances call should try the working subnet first
+ inst, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.IsNil)
+ c.Check(inst, check.NotNil)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 3)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 2\n`+
+ `.*`)
+}
+
+func (*EC2InstanceSetSuite) TestCreateAllSubnetsFailing(c *check.C) {
+ if *live != "" {
+ c.Skip("not applicable in live mode")
+ return
+ }
+
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
+ ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
+ "subnet-full": &ec2stubError{
+ code: "InsufficientFreeAddressesInSubnet",
+ message: "subnet is full",
+ },
+ "subnet-broken": &ec2stubError{
+ code: "InvalidSubnetId.NotFound",
+ message: "bogus subnet id",
+ },
+ }
+ _, err := ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InvalidSubnetId\.NotFound.*`)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+
+ _, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*InsufficientFreeAddressesInSubnet.*`)
+ c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 4)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
+}
+
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, reg := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
tg := i.Tags()
c.Logf("%v %v %v", i.String(), i.Address(), tg)
}
+
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instances{subnet_id="[^"]*"} \d+\n`+
+ `.*`)
}
func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
//
// type exampleDriver struct {}
//
-// func (*exampleDriver) InstanceSet(config json.RawMessage, id cloud.InstanceSetID, tags cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+// func (*exampleDriver) InstanceSet(config json.RawMessage, id cloud.InstanceSetID, tags cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
// var is exampleInstanceSet
// if err := json.Unmarshal(config, &is); err != nil {
// return nil, err
// is.ownID = id
// return &is, nil
// }
-//
-// var _ = registerCloudDriver("example", &exampleDriver{})
type Driver interface {
- InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)
+ InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (InstanceSet, error)
}
// DriverFunc makes a Driver using the provided function as its
// InstanceSet method. This is similar to http.HandlerFunc.
-func DriverFunc(fn func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)) Driver {
+func DriverFunc(fn func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (InstanceSet, error)) Driver {
return driverFunc(fn)
}
-type driverFunc func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)
+type driverFunc func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (InstanceSet, error)
-func (df driverFunc) InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error) {
- return df(config, id, tags, logger)
+func (df driverFunc) InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (InstanceSet, error) {
+ return df(config, id, tags, logger, reg)
}
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
mtx sync.Mutex
}
-func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
is := &instanceSet{
instanceSetID: instanceSetID,
logger: logger,
func (*suite) TestCreateListExecDestroy(c *check.C) {
logger := ctxlog.TestLogger(c)
- is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger)
+ is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger, nil)
c.Assert(err, check.IsNil)
clientRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
SecretAccessKey: ""
# (ec2) Instance configuration.
+
+ # (ec2) Region, like "us-east-1".
+ Region: ""
+
+ # (ec2) Security group IDs. Omit or use {} to use the
+ # default security group.
SecurityGroupIDs:
"SAMPLE": {}
+
+ # (ec2) One or more subnet IDs. Omit or leave empty to let
+ # AWS choose a default subnet from your default VPC. If
+ # multiple subnets are configured here (enclosed in brackets
+ # like [subnet-abc123, subnet-def456]) the cloud dispatcher
+ # will detect subnet-related errors and retry using a
+ # different subnet. Most sites specify one subnet.
SubnetID: ""
- Region: ""
+
EBSVolumeType: gp2
AdminUsername: debian
# (ec2) name of the IAMInstanceProfile for instances started by
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
"github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/common/expfmt"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
check "gopkg.in/check.v1"
c.Check(int(cfg.SourceTimestamp.Sub(trial.expectTime).Seconds()), check.Equals, 0)
c.Check(int(ldr.loadTimestamp.Sub(time.Now()).Seconds()), check.Equals, 0)
- var buf bytes.Buffer
reg := prometheus.NewRegistry()
ldr.RegisterMetrics(reg)
- enc := expfmt.NewEncoder(&buf, expfmt.FmtText)
- got, _ := reg.Gather()
- for _, mf := range got {
- enc.Encode(mf)
- }
- c.Check(buf.String(), check.Matches, `# HELP .*
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `# HELP .*
# TYPE .*
arvados_config_load_timestamp_seconds{sha256="83aea5d82eb1d53372cd65c936c60acc1c6ef946e61977bbca7cfea709d201a8"} \Q`+fmt.Sprintf("%g", float64(ldr.loadTimestamp.UnixNano())/1e9)+`\E
# HELP .*
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
}
sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
- is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
+ is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger, reg)
is = newInstrumentedInstanceSet(is, reg)
if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
is = rateLimitedInstanceSet{
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
driver := &test.StubDriver{}
instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger, nil)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
func (suite *PoolSuite) TestDrain(c *check.C) {
driver := test.StubDriver{}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
c.Assert(err, check.IsNil)
ac := arvados.NewClientFromEnv()
func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
c.Assert(err, check.IsNil)
type1 := test.InstanceType(1)
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
probeTimeout := time.Second
ac := arvados.NewClientFromEnv()
- is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger, nil)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+ "bytes"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/common/expfmt"
+)
+
+func GatherMetricsAsString(reg *prometheus.Registry) string {
+ buf := bytes.NewBuffer(nil)
+ enc := expfmt.NewEncoder(buf, expfmt.FmtText)
+ got, _ := reg.Gather()
+ for _, mf := range got {
+ enc.Encode(mf)
+ }
+ return buf.String()
+}
package keepbalance
import (
- "bytes"
"context"
"encoding/json"
"fmt"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/common/expfmt"
check "gopkg.in/check.v1"
)
c.Assert(err, check.IsNil)
c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
- buf, err := s.getMetrics(c, srv)
- c.Check(err, check.IsNil)
- bufstr := buf.String()
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
+ metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
func (s *runSuite) TestChunkPrefix(c *check.C) {
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
- buf, err := s.getMetrics(c, srv)
- c.Check(err, check.IsNil)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
-}
-
-func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
- mfs, err := srv.Metrics.reg.Gather()
- if err != nil {
- return nil, err
- }
-
- var buf bytes.Buffer
- for _, mf := range mfs {
- if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
- return nil, err
- }
- }
-
- return &buf, nil
+ metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
}
package keepweb
import (
- "bytes"
"net/http"
"net/http/httptest"
"regexp"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
- "github.com/prometheus/common/expfmt"
"gopkg.in/check.v1"
)
func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
s.handler.Cache.updateGauges()
- reg := s.handler.Cache.registry
- mfs, err := reg.Gather()
- c.Check(err, check.IsNil)
- buf := &bytes.Buffer{}
- enc := expfmt.NewEncoder(buf, expfmt.FmtText)
- for _, mf := range mfs {
- c.Check(enc.Encode(mf), check.IsNil)
- }
- mm := buf.String()
+ mm := arvadostest.GatherMetricsAsString(s.handler.Cache.registry)
// Remove comments to make the "value vs. regexp" failure
// output easier to read.
mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")