prices map[priceKey][]cloud.InstancePrice
pricesLock sync.Mutex
pricesUpdated map[priceKey]time.Time
+
+ mInstances *prometheus.GaugeVec
+ mInstanceStarts *prometheus.CounterVec
}
func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
if instanceSet.ec2config.EBSVolumeType == "" {
instanceSet.ec2config.EBSVolumeType = "gp2"
}
+
+ // Set up metrics
+ instanceSet.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instances",
+ Help: "Number of instances running",
+ }, []string{"subnet_id"})
+ instanceSet.mInstanceStarts = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "ec2_instance_starts_total",
+ Help: "Number of attempts to start a new instance",
+ }, []string{"subnet_id", "success"})
+ // Initialize all of the series we'll be reporting. Otherwise
+ // the {subnet=A, success=0} series doesn't appear in metrics
+ // at all until there's a failure in subnet A.
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues(subnet, "1").Add(0)
+ }
+ if len(instanceSet.ec2config.SubnetID) == 0 {
+ instanceSet.mInstanceStarts.WithLabelValues("", "0").Add(0)
+ instanceSet.mInstanceStarts.WithLabelValues("", "1").Add(0)
+ }
+ if reg != nil {
+ reg.MustRegister(instanceSet.mInstances)
+ reg.MustRegister(instanceSet.mInstanceStarts)
+ }
+
return instanceSet, nil
}
currentSubnetIDIndex := int(atomic.LoadInt32(&instanceSet.currentSubnetIDIndex))
for tryOffset := 0; ; tryOffset++ {
tryIndex := 0
+ trySubnet := ""
if len(subnets) > 0 {
tryIndex = (currentSubnetIDIndex + tryOffset) % len(subnets)
- rii.NetworkInterfaces[0].SubnetId = aws.String(subnets[tryIndex])
+ trySubnet = subnets[tryIndex]
+ rii.NetworkInterfaces[0].SubnetId = aws.String(trySubnet)
}
rsv, err = instanceSet.client.RunInstances(&rii)
+ instanceSet.mInstanceStarts.WithLabelValues(trySubnet, boolLabelValue[err == nil]).Add(1)
if isErrorSubnetSpecific(err) &&
tryOffset < len(subnets)-1 {
instanceSet.logger.WithError(err).WithField("SubnetID", subnets[tryIndex]).
}
instanceSet.updateSpotPrices(instances)
}
+
+ // Count instances in each subnet, and report in metrics.
+ subnetInstances := map[string]int{"": 0}
+ for _, subnet := range instanceSet.ec2config.SubnetID {
+ subnetInstances[subnet] = 0
+ }
+ for _, inst := range instances {
+ subnet := inst.(*ec2Instance).instance.SubnetId
+ if subnet != nil {
+ subnetInstances[*subnet]++
+ } else {
+ subnetInstances[""]++
+ }
+ }
+ for subnet, count := range subnetInstances {
+ instanceSet.mInstances.WithLabelValues(subnet).Set(float64(count))
+ }
+
return instances, err
}
throttleValue.Store(time.Duration(0))
return nil
}
+
+var boolLabelValue = map[bool]string{false: "0", true: "1"}
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/config"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
// Ensure ec2stubError satisfies the aws.Error interface
var _ = awserr.Error(&ec2stubError{})
-func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster) {
+func GetInstanceSet(c *check.C, conf string) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster, *prometheus.Registry) {
+ reg := prometheus.NewRegistry()
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
err := config.LoadFile(&exampleCfg, *live)
c.Assert(err, check.IsNil)
- ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger(), nil)
+ is, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger(), reg)
c.Assert(err, check.IsNil)
- return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
- }
- ap := ec2InstanceSet{
- instanceSetID: "test123",
- logger: ctxlog.TestLogger(c),
- client: &ec2stub{c: c, reftime: time.Now().UTC()},
- keys: make(map[string]string),
+ return is.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, reg
+ } else {
+ is, err := newEC2InstanceSet(json.RawMessage(conf), "test123", nil, ctxlog.TestLogger(c), reg)
+ c.Assert(err, check.IsNil)
+ is.(*ec2InstanceSet).client = &ec2stub{c: c, reftime: time.Now().UTC()}
+ return is.(*ec2InstanceSet), cloud.ImageID("blob"), cluster, reg
}
- return &ap, cloud.ImageID("blob"), cluster
}
func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
"TestTagName": "test tag value",
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
inst, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"],
return
}
- ap, img, cluster := GetInstanceSet(c)
- ap.ec2config.SubnetID = sliceOrSingleString{"subnet-full", "subnet-good"}
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-good"]}`)
ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
"subnet-full": &ec2stubError{
code: "InsufficientFreeAddressesInSubnet",
c.Check(err, check.IsNil)
c.Check(inst, check.NotNil)
c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 1\n`+
+ `.*`)
// Next RunInstances call should try the working subnet first
inst, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
c.Check(err, check.IsNil)
c.Check(inst, check.NotNil)
c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 3)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="0"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-good",success="1"} 2\n`+
+ `.*`)
}
func (*EC2InstanceSetSuite) TestCreateAllSubnetsFailing(c *check.C) {
return
}
- ap, img, cluster := GetInstanceSet(c)
- ap.ec2config.SubnetID = sliceOrSingleString{"subnet-full", "subnet-broken"}
+ ap, img, cluster, reg := GetInstanceSet(c, `{"SubnetID":["subnet-full","subnet-broken"]}`)
ap.client.(*ec2stub).subnetErrorOnRunInstances = map[string]error{
"subnet-full": &ec2stubError{
code: "InsufficientFreeAddressesInSubnet",
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, `.*InvalidSubnetId\.NotFound.*`)
c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 2)
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 1\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
_, err = ap.Create(cluster.InstanceTypes["tiny"], img, nil, "", nil)
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, `.*InsufficientFreeAddressesInSubnet.*`)
c.Check(ap.client.(*ec2stub).runInstancesCalls, check.HasLen, 4)
+ metrics = arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-broken",success="1"} 0\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="0"} 2\n`+
+ `arvados_dispatchcloud_ec2_instance_starts_total{subnet_id="subnet-full",success="1"} 0\n`+
+ `.*`)
}
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, reg := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
tg := i.Tags()
c.Logf("%v %v %v", i.String(), i.Address(), tg)
}
+
+ metrics := arvadostest.GatherMetricsAsString(reg)
+ c.Check(metrics, check.Matches, `(?ms).*`+
+ `arvados_dispatchcloud_ec2_instances{subnet_id="[^"]*"} \d+\n`+
+ `.*`)
}
func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
- ap, _, _ := GetInstanceSet(c)
+ ap, _, _, _ := GetInstanceSet(c, "{}")
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
}
func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
- ap, img, cluster := GetInstanceSet(c)
+ ap, img, cluster, _ := GetInstanceSet(c, "{}")
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
package keepbalance
import (
- "bytes"
"context"
"encoding/json"
"fmt"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/common/expfmt"
check "gopkg.in/check.v1"
)
c.Assert(err, check.IsNil)
c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
- buf, err := s.getMetrics(c, srv)
- c.Check(err, check.IsNil)
- bufstr := buf.String()
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
- c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
+ metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
func (s *runSuite) TestChunkPrefix(c *check.C) {
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
- buf, err := s.getMetrics(c, srv)
- c.Check(err, check.IsNil)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
-}
-
-func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
- mfs, err := srv.Metrics.reg.Gather()
- if err != nil {
- return nil, err
- }
-
- var buf bytes.Buffer
- for _, mf := range mfs {
- if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
- return nil, err
- }
- }
-
- return &buf, nil
+ metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
+ c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
}