Merge branch 'master' into 15106-trgm-text-search
[arvados.git] / lib / dispatchcloud / driver.go
index 2ac69e04c17bb94ce706979547c8501b3f80b609..b67b5d054b57d172b940255a8318b76dd21af3b8 100644 (file)
@@ -6,21 +6,110 @@ package dispatchcloud
 
 import (
        "fmt"
+       "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/cloud/azure"
+       "git.curoverse.com/arvados.git/lib/cloud/ec2"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 var drivers = map[string]cloud.Driver{
        "azure": azure.Driver,
+       "ec2":   ec2.Driver,
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
-       driver, ok := drivers[cluster.CloudVMs.Driver]
+       driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
        if !ok {
-               return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+               return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
        }
-       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+       sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+       is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
+       if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+               is = rateLimitedInstanceSet{
+                       InstanceSet: is,
+                       ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
+               }
+       }
+       is = defaultTaggingInstanceSet{
+               InstanceSet: is,
+               defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
+       }
+       is = filteringInstanceSet{
+               InstanceSet: is,
+               logger:      logger,
+       }
+       return is, err
+}
+
+type rateLimitedInstanceSet struct {
+       cloud.InstanceSet
+       ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+       <-is.ticker.C
+       inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+       return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+       cloud.Instance
+       ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+       <-inst.ticker.C
+       return inst.Instance.Destroy()
+}
+
+// Adds the specified defaultTags to every Create() call.
+type defaultTaggingInstanceSet struct {
+       cloud.InstanceSet
+       defaultTags cloud.InstanceTags
+}
+
+func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+       allTags := cloud.InstanceTags{}
+       for k, v := range is.defaultTags {
+               allTags[k] = v
+       }
+       for k, v := range tags {
+               allTags[k] = v
+       }
+       return is.InstanceSet.Create(it, image, allTags, init, pk)
+}
+
+// Filters the instances returned by the wrapped InstanceSet's
+// Instances() method (in case the wrapped InstanceSet didn't do this
+// itself).
+type filteringInstanceSet struct {
+       cloud.InstanceSet
+       logger logrus.FieldLogger
+}
+
+func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       instances, err := is.InstanceSet.Instances(tags)
+
+       skipped := 0
+       var returning []cloud.Instance
+nextInstance:
+       for _, inst := range instances {
+               instTags := inst.Tags()
+               for k, v := range tags {
+                       if instTags[k] != v {
+                               skipped++
+                               continue nextInstance
+                       }
+               }
+               returning = append(returning, inst)
+       }
+       is.logger.WithFields(logrus.Fields{
+               "returning": len(returning),
+               "skipped":   skipped,
+       }).WithError(err).Debugf("filteringInstanceSet returning instances")
+       return returning, err
 }