14931: Tag and filter instances by SetID, so driver doesn't need to.
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 29 May 2019 20:01:36 +0000 (16:01 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 31 May 2019 17:57:32 +0000 (13:57 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/cloud/azure/azure.go
lib/cloud/azure/azure_test.go
lib/cloud/ec2/ec2.go
lib/cloud/interfaces.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go

index 03d2550bb693665e5fc246a13b169cb73c63cba7..dce8b61b71d7e2d52c4865af7df218bf4c3e57c7 100644 (file)
@@ -477,18 +477,16 @@ func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
                return nil, wrapAzureError(err)
        }
 
-       instances := make([]cloud.Instance, 0)
-
+       var instances []cloud.Instance
        for ; result.NotDone(); err = result.Next() {
                if err != nil {
                        return nil, wrapAzureError(err)
                }
-               if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
-                       instances = append(instances, &azureInstance{
-                               provider: az,
-                               vm:       result.Value(),
-                               nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
-               }
+               instances = append(instances, &azureInstance{
+                       provider: az,
+                       vm:       result.Value(),
+                       nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+               })
        }
        return instances, nil
 }
index 96bfb4fefbfd8c8c13c199a5621977776f762505..8cedca295a7a9a2e4deab2b9627fb97b101ce1a9 100644 (file)
@@ -39,6 +39,7 @@ import (
        "net"
        "net/http"
        "os"
+       "strings"
        "testing"
        "time"
 
@@ -66,6 +67,8 @@ type AzureInstanceSetSuite struct{}
 
 var _ = check.Suite(&AzureInstanceSetSuite{})
 
+const testNamePrefix = "compute-test123-"
+
 type VirtualMachinesClientStub struct{}
 
 func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
@@ -149,7 +152,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
                        BlobContainer: "vhds",
                },
                dispatcherID: "test123",
-               namePrefix:   "compute-test123-",
+               namePrefix:   testNamePrefix,
                logger:       logrus.StandardLogger(),
                deleteNIC:    make(chan string),
                deleteBlob:   make(chan storage.Blob),
@@ -228,7 +231,7 @@ func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) {
        l, err := ap.Instances(nil)
        c.Assert(err, check.IsNil)
 
-       for _, i := range l {
+       for _, i := range filterInstances(c, l) {
                c.Check(i.Destroy(), check.IsNil)
        }
 }
@@ -287,17 +290,20 @@ func (*AzureInstanceSetSuite) TestSetTags(c *check.C) {
        if err != nil {
                c.Fatal("Error making provider", err)
        }
+
        l, err := ap.Instances(nil)
        c.Assert(err, check.IsNil)
-
+       l = filterInstances(c, l)
        if len(l) > 0 {
                err = l[0].SetTags(map[string]string{"foo": "bar"})
                if err != nil {
                        c.Fatal("Error setting tags", err)
                }
        }
+
        l, err = ap.Instances(nil)
        c.Assert(err, check.IsNil)
+       l = filterInstances(c, l)
 
        if len(l) > 0 {
                tg := l[0].Tags()
@@ -312,6 +318,7 @@ func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
        }
        l, err := ap.Instances(nil)
        c.Assert(err, check.IsNil)
+       l = filterInstances(c, l)
 
        if len(l) > 0 {
                sshclient, err := SetupSSHClient(c, l[0])
@@ -372,3 +379,15 @@ func SetupSSHClient(c *check.C, inst cloud.Instance) (*ssh.Client, error) {
 
        return client, nil
 }
+
+func filterInstances(c *check.C, instances []cloud.Instance) []cloud.Instance {
+       var r []cloud.Instance
+       for _, i := range instances {
+               if !strings.HasPrefix(i.String(), testNamePrefix) {
+                       c.Logf("ignoring instance %s", i)
+                       continue
+               }
+               r = append(r, i)
+       }
+       return r
+}
index e2ad6b42b287e9470161f08d1401992c2e54fced..c630e95418f03e590dc052ea0880104ea845b85c 100644 (file)
@@ -25,8 +25,6 @@ import (
        "golang.org/x/crypto/ssh"
 )
 
-const tagKeyInstanceSetID = "arvados-dispatch-id"
-
 // Driver is the ec2 implementation of the cloud.Driver interface.
 var Driver = cloud.DriverFunc(newEC2InstanceSet)
 
@@ -155,12 +153,7 @@ func (instanceSet *ec2InstanceSet) Create(
        }
        instanceSet.keysMtx.Unlock()
 
-       ec2tags := []*ec2.Tag{
-               &ec2.Tag{
-                       Key:   aws.String(tagKeyInstanceSetID),
-                       Value: aws.String(string(instanceSet.instanceSetID)),
-               },
-       }
+       ec2tags := []*ec2.Tag{}
        for k, v := range newTags {
                ec2tags = append(ec2tags, &ec2.Tag{
                        Key:   aws.String(k),
@@ -224,13 +217,15 @@ func (instanceSet *ec2InstanceSet) Create(
        }, nil
 }
 
-func (instanceSet *ec2InstanceSet) Instances(cloud.InstanceTags) (instances []cloud.Instance, err error) {
-       dii := &ec2.DescribeInstancesInput{
-               Filters: []*ec2.Filter{&ec2.Filter{
-                       Name:   aws.String("tag:" + tagKeyInstanceSetID),
-                       Values: []*string{aws.String(string(instanceSet.instanceSetID))},
-               }}}
-
+func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) {
+       var filters []*ec2.Filter
+       for k, v := range tags {
+               filters = append(filters, &ec2.Filter{
+                       Name:   aws.String("tag:" + k),
+                       Values: []*string{aws.String(v)},
+               })
+       }
+       dii := &ec2.DescribeInstancesInput{Filters: filters}
        for {
                dio, err := instanceSet.client.DescribeInstances(dii)
                if err != nil {
@@ -272,12 +267,7 @@ func (inst *ec2Instance) ProviderType() string {
 }
 
 func (inst *ec2Instance) SetTags(newTags cloud.InstanceTags) error {
-       ec2tags := []*ec2.Tag{
-               &ec2.Tag{
-                       Key:   aws.String(tagKeyInstanceSetID),
-                       Value: aws.String(string(inst.provider.instanceSetID)),
-               },
-       }
+       var ec2tags []*ec2.Tag
        for k, v := range newTags {
                ec2tags = append(ec2tags, &ec2.Tag{
                        Key:   aws.String(k),
index 792e737a914a1ce7d39d98c05c1a9428e77fb1ff..804de667ee010e81b5710ff140a3fa2f988e596f 100644 (file)
@@ -154,13 +154,17 @@ type InitCommand string
 // other mechanism. The tags must be visible to another instance of
 // the same driver running on a different host.
 //
-// The returned InstanceSet must ignore existing resources that are
-// visible but not tagged with the given id, except that it should log
-// a summary of such resources -- only once -- when it starts
-// up. Thus, two identically configured InstanceSets running on
-// different hosts with different ids should log about the existence
-// of each other's resources at startup, but will not interfere with
-// each other.
+// The returned InstanceSet must not modify or delete cloud resources
+// unless they are tagged with the given InstanceSetID or the caller
+// (dispatcher) calls Destroy() on them. It may log a summary of
+// untagged resources once at startup, though. Thus, two identically
+// configured InstanceSets running on different hosts with different
+// ids should log about the existence of each other's resources at
+// startup, but will not interfere with each other.
+//
+// The dispatcher always passes the InstanceSetID as a tag when
+// calling Create() and Instances(), so the driver does not need to
+// tag/filter VMs by InstanceSetID itself.
 //
 // Example:
 //
index 3bf0ee9bd558eeecc27919388874af786baca6c9..bc699d92804092d8dbbc37bdcd3d8180b67e70c1 100644 (file)
@@ -138,7 +138,7 @@ func (disp *dispatcher) initialize() {
        }
        disp.instanceSet = instanceSet
        disp.reg = prometheus.NewRegistry()
-       disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+       disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
 
        if disp.Cluster.ManagementToken == "" {
index 3f1601188062b9e99703412d4a737012ccef762f..36b8e8008249e0a37910642ff286c88309204c4c 100644 (file)
@@ -28,7 +28,7 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
        }
        is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, logger)
        if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
-               is = &rateLimitedInstanceSet{
+               is = rateLimitedInstanceSet{
                        InstanceSet: is,
                        ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
                }
@@ -37,6 +37,10 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
                InstanceSet: is,
                defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
        }
+       is = filteringInstanceSet{
+               InstanceSet: is,
+               logger:      logger,
+       }
        return is, err
 }
 
@@ -77,3 +81,34 @@ func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.
        }
        return is.InstanceSet.Create(it, image, allTags, init, pk)
 }
+
+// Filters the instances returned by the wrapped InstanceSet's
+// Instances() method (in case the wrapped InstanceSet didn't do this
+// itself).
+type filteringInstanceSet struct {
+       cloud.InstanceSet
+       logger logrus.FieldLogger
+}
+
+func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       instances, err := is.InstanceSet.Instances(tags)
+
+       skipped := 0
+       var returning []cloud.Instance
+nextInstance:
+       for _, inst := range instances {
+               instTags := inst.Tags()
+               for k, v := range tags {
+                       if instTags[k] != v {
+                               skipped++
+                               continue nextInstance
+                       }
+               }
+               returning = append(returning, inst)
+       }
+       is.logger.WithFields(logrus.Fields{
+               "returning": len(returning),
+               "skipped":   skipped,
+       }).WithError(err).Debugf("filteringInstanceSet returning instances")
+       return returning, err
+}
index 84b61fc006c239e3cc67e9a7828e6ebe29b6546e..8af1037125f7d31b08855971e032f8e50e8b7731 100644 (file)
@@ -25,6 +25,7 @@ const (
        tagKeyInstanceType   = "InstanceType"
        tagKeyIdleBehavior   = "IdleBehavior"
        tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
@@ -91,10 +92,11 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
@@ -128,6 +130,7 @@ type Pool struct {
        // configuration
        logger             logrus.FieldLogger
        arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
@@ -281,9 +284,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        go func() {
                defer wp.notify()
                tags := cloud.InstanceTags{
-                       tagKeyInstanceType:   it.Name,
-                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
-                       tagKeyInstanceSecret: secret,
+                       wp.tagPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagPrefix + tagKeyInstanceSecret: secret,
                }
                initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
@@ -728,7 +732,7 @@ func (wp *Pool) getInstancesAndSync() error {
        }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
                wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
index 69395366895d3eb515c45705032dd722894f083e..8ab4c987544e06e8535a9de2ca12df7052ff8edb 100644 (file)
@@ -65,7 +65,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
-       is, err := driver.InstanceSet(nil, "", logger)
+       instanceSetID := cloud.InstanceSetID("test-instance-set-id")
+       is, err := driver.InstanceSet(nil, instanceSetID, logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
@@ -91,7 +92,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                },
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -126,7 +127,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)