import (
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/lib/cloud/ec2"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
-var drivers = map[string]cloud.Driver{
+// Map of available cloud drivers.
+// Clusters.*.Containers.CloudVMs.Driver configuration values
+// correspond to keys in this map.
+var Drivers = map[string]cloud.Driver{
"azure": azure.Driver,
"ec2": ec2.Driver,
}
-func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
- driver, ok := drivers[cluster.CloudVMs.Driver]
+func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
+ driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
if !ok {
- return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+ return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
+ }
+ sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+ is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
+ is = newInstrumentedInstanceSet(is, reg)
+ if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = rateLimitedInstanceSet{
+ InstanceSet: is,
+ ticker: time.NewTicker(time.Second / time.Duration(maxops)),
+ }
+ }
+ is = defaultTaggingInstanceSet{
+ InstanceSet: is,
+ defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
+ }
+ is = filteringInstanceSet{
+ InstanceSet: is,
+ logger: logger,
+ }
+ return is, err
+}
+
+type rateLimitedInstanceSet struct {
+ cloud.InstanceSet
+ ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ <-is.ticker.C
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+ cloud.Instance
+ ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+ <-inst.ticker.C
+ return inst.Instance.Destroy()
+}
+
+// Adds the specified defaultTags to every Create() call.
+type defaultTaggingInstanceSet struct {
+ cloud.InstanceSet
+ defaultTags cloud.InstanceTags
+}
+
+func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ allTags := cloud.InstanceTags{}
+ for k, v := range is.defaultTags {
+ allTags[k] = v
+ }
+ for k, v := range tags {
+ allTags[k] = v
+ }
+ return is.InstanceSet.Create(it, image, allTags, init, pk)
+}
+
+// Filter the instances returned by the wrapped InstanceSet's
+// Instances() method (in case the wrapped InstanceSet didn't do this
+// itself).
+type filteringInstanceSet struct {
+ cloud.InstanceSet
+ logger logrus.FieldLogger
+}
+
+func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+ instances, err := is.InstanceSet.Instances(tags)
+
+ skipped := 0
+ var returning []cloud.Instance
+nextInstance:
+ for _, inst := range instances {
+ instTags := inst.Tags()
+ for k, v := range tags {
+ if instTags[k] != v {
+ skipped++
+ continue nextInstance
+ }
+ }
+ returning = append(returning, inst)
+ }
+ is.logger.WithFields(logrus.Fields{
+ "returning": len(returning),
+ "skipped": skipped,
+ }).WithError(err).Debugf("filteringInstanceSet returning instances")
+ return returning, err
+}
+
+func newInstrumentedInstanceSet(is cloud.InstanceSet, reg *prometheus.Registry) cloud.InstanceSet {
+ cv := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "driver_operations",
+ Help: "Number of instance-create/destroy/list operations performed via cloud driver.",
+ }, []string{"operation", "error"})
+
+ // Create all counters, so they are reported with zero values
+ // (instead of being missing) until they are incremented.
+ for _, op := range []string{"Create", "List", "Destroy", "SetTags"} {
+ for _, error := range []string{"0", "1"} {
+ cv.WithLabelValues(op, error).Add(0)
+ }
+ }
+
+ reg.MustRegister(cv)
+ return instrumentedInstanceSet{is, cv}
+}
+
+type instrumentedInstanceSet struct {
+ cloud.InstanceSet
+ cv *prometheus.CounterVec
+}
+
+func (is instrumentedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ is.cv.WithLabelValues("Create", boolLabelValue(err != nil)).Inc()
+ return instrumentedInstance{inst, is.cv}, err
+}
+
+func (is instrumentedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+ instances, err := is.InstanceSet.Instances(tags)
+ is.cv.WithLabelValues("List", boolLabelValue(err != nil)).Inc()
+ var instrumented []cloud.Instance
+ for _, i := range instances {
+ instrumented = append(instrumented, instrumentedInstance{i, is.cv})
+ }
+ return instrumented, err
+}
+
+type instrumentedInstance struct {
+ cloud.Instance
+ cv *prometheus.CounterVec
+}
+
+func (inst instrumentedInstance) Destroy() error {
+ err := inst.Instance.Destroy()
+ inst.cv.WithLabelValues("Destroy", boolLabelValue(err != nil)).Inc()
+ return err
+}
+
+func (inst instrumentedInstance) SetTags(tags cloud.InstanceTags) error {
+ err := inst.Instance.SetTags(tags)
+ inst.cv.WithLabelValues("SetTags", boolLabelValue(err != nil)).Inc()
+ return err
+}
+
+func boolLabelValue(v bool) string {
+ if v {
+ return "1"
+ } else {
+ return "0"
}
- return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
}