From 7109ea1b2a49bc7fdbdbfd2302eb2457750ce5cd Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 29 May 2019 16:01:36 -0400 Subject: [PATCH] 14931: Tag and filter instances by SetID, so driver doesn't need to. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/cloud/azure/azure.go | 14 +++++----- lib/cloud/azure/azure_test.go | 25 +++++++++++++++--- lib/cloud/ec2/ec2.go | 32 ++++++++--------------- lib/cloud/interfaces.go | 18 ++++++++----- lib/dispatchcloud/dispatcher.go | 2 +- lib/dispatchcloud/driver.go | 37 ++++++++++++++++++++++++++- lib/dispatchcloud/worker/pool.go | 14 ++++++---- lib/dispatchcloud/worker/pool_test.go | 7 ++--- 8 files changed, 100 insertions(+), 49 deletions(-) diff --git a/lib/cloud/azure/azure.go b/lib/cloud/azure/azure.go index 03d2550bb6..dce8b61b71 100644 --- a/lib/cloud/azure/azure.go +++ b/lib/cloud/azure/azure.go @@ -477,18 +477,16 @@ func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err return nil, wrapAzureError(err) } - instances := make([]cloud.Instance, 0) - + var instances []cloud.Instance for ; result.NotDone(); err = result.Next() { if err != nil { return nil, wrapAzureError(err) } - if strings.HasPrefix(*result.Value().Name, az.namePrefix) { - instances = append(instances, &azureInstance{ - provider: az, - vm: result.Value(), - nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]}) - } + instances = append(instances, &azureInstance{ + provider: az, + vm: result.Value(), + nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID], + }) } return instances, nil } diff --git a/lib/cloud/azure/azure_test.go b/lib/cloud/azure/azure_test.go index 96bfb4fefb..8cedca295a 100644 --- a/lib/cloud/azure/azure_test.go +++ b/lib/cloud/azure/azure_test.go @@ -39,6 +39,7 @@ import ( "net" "net/http" "os" + "strings" "testing" "time" @@ -66,6 +67,8 @@ type AzureInstanceSetSuite struct{} var _ = check.Suite(&AzureInstanceSetSuite{}) +const testNamePrefix = "compute-test123-" + type VirtualMachinesClientStub struct{} func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context, @@ -149,7 +152,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) BlobContainer: "vhds", }, dispatcherID: "test123", - namePrefix: "compute-test123-", + namePrefix: testNamePrefix, logger: logrus.StandardLogger(), deleteNIC: make(chan string), deleteBlob: make(chan storage.Blob), @@ -228,7 +231,7 @@ func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) { l, err := ap.Instances(nil) c.Assert(err, check.IsNil) - for _, i := range l { + for _, i := range filterInstances(c, l) { c.Check(i.Destroy(), check.IsNil) } } @@ -287,17 +290,20 @@ func (*AzureInstanceSetSuite) TestSetTags(c *check.C) { if err != nil { c.Fatal("Error making provider", err) } + l, err := ap.Instances(nil) c.Assert(err, check.IsNil) - + l = filterInstances(c, l) if len(l) > 0 { err = l[0].SetTags(map[string]string{"foo": "bar"}) if err != nil { c.Fatal("Error setting tags", err) } } + l, err = ap.Instances(nil) c.Assert(err, check.IsNil) + l = filterInstances(c, l) if len(l) > 0 { tg := l[0].Tags() @@ -312,6 +318,7 @@ func (*AzureInstanceSetSuite) TestSSH(c *check.C) { } l, err := ap.Instances(nil) c.Assert(err, check.IsNil) + l = filterInstances(c, l) if len(l) > 0 { sshclient, err := SetupSSHClient(c, l[0]) @@ -372,3 +379,15 @@ func SetupSSHClient(c *check.C, inst cloud.Instance) (*ssh.Client, error) { return client, nil } + +func filterInstances(c *check.C, instances []cloud.Instance) []cloud.Instance { + var r []cloud.Instance + for _, i := range instances { + if !strings.HasPrefix(i.String(), testNamePrefix) { + c.Logf("ignoring instance %s", i) + continue + } + r = append(r, i) + } + return r +} diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go index e2ad6b42b2..c630e95418 100644 --- a/lib/cloud/ec2/ec2.go +++ b/lib/cloud/ec2/ec2.go @@ -25,8 +25,6 @@ import ( "golang.org/x/crypto/ssh" ) -const tagKeyInstanceSetID = "arvados-dispatch-id" - // Driver is the ec2 implementation of the cloud.Driver interface. var Driver = cloud.DriverFunc(newEC2InstanceSet) @@ -155,12 +153,7 @@ func (instanceSet *ec2InstanceSet) Create( } instanceSet.keysMtx.Unlock() - ec2tags := []*ec2.Tag{ - &ec2.Tag{ - Key: aws.String(tagKeyInstanceSetID), - Value: aws.String(string(instanceSet.instanceSetID)), - }, - } + ec2tags := []*ec2.Tag{} for k, v := range newTags { ec2tags = append(ec2tags, &ec2.Tag{ Key: aws.String(k), @@ -224,13 +217,15 @@ func (instanceSet *ec2InstanceSet) Create( }, nil } -func (instanceSet *ec2InstanceSet) Instances(cloud.InstanceTags) (instances []cloud.Instance, err error) { - dii := &ec2.DescribeInstancesInput{ - Filters: []*ec2.Filter{&ec2.Filter{ - Name: aws.String("tag:" + tagKeyInstanceSetID), - Values: []*string{aws.String(string(instanceSet.instanceSetID))}, - }}} - +func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) { + var filters []*ec2.Filter + for k, v := range tags { + filters = append(filters, &ec2.Filter{ + Name: aws.String("tag:" + k), + Values: []*string{aws.String(v)}, + }) + } + dii := &ec2.DescribeInstancesInput{Filters: filters} for { dio, err := instanceSet.client.DescribeInstances(dii) if err != nil { @@ -272,12 +267,7 @@ func (inst *ec2Instance) ProviderType() string { } func (inst *ec2Instance) SetTags(newTags cloud.InstanceTags) error { - ec2tags := []*ec2.Tag{ - &ec2.Tag{ - Key: aws.String(tagKeyInstanceSetID), - Value: aws.String(string(inst.provider.instanceSetID)), - }, - } + var ec2tags []*ec2.Tag for k, v := range newTags { ec2tags = append(ec2tags, &ec2.Tag{ Key: aws.String(k), diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go index 792e737a91..804de667ee 100644 --- a/lib/cloud/interfaces.go +++ b/lib/cloud/interfaces.go @@ -154,13 +154,17 @@ type InitCommand string // other mechanism. The tags must be visible to another instance of // the same driver running on a different host. // -// The returned InstanceSet must ignore existing resources that are -// visible but not tagged with the given id, except that it should log -// a summary of such resources -- only once -- when it starts -// up. Thus, two identically configured InstanceSets running on -// different hosts with different ids should log about the existence -// of each other's resources at startup, but will not interfere with -// each other. +// The returned InstanceSet must not modify or delete cloud resources +// unless they are tagged with the given InstanceSetID or the caller +// (dispatcher) calls Destroy() on them. It may log a summary of +// untagged resources once at startup, though. Thus, two identically +// configured InstanceSets running on different hosts with different +// ids should log about the existence of each other's resources at +// startup, but will not interfere with each other. +// +// The dispatcher always passes the InstanceSetID as a tag when +// calling Create() and Instances(), so the driver does not need to +// tag/filter VMs by InstanceSetID itself. // // Example: // diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index 3bf0ee9bd5..bc699d9280 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -138,7 +138,7 @@ func (disp *dispatcher) initialize() { } disp.instanceSet = instanceSet disp.reg = prometheus.NewRegistry() - disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster) + disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster) disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient) if disp.Cluster.ManagementToken == "" { diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go index 3f16011880..36b8e80082 100644 --- a/lib/dispatchcloud/driver.go +++ b/lib/dispatchcloud/driver.go @@ -28,7 +28,7 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger } is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, logger) if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 { - is = &rateLimitedInstanceSet{ + is = rateLimitedInstanceSet{ InstanceSet: is, ticker: time.NewTicker(time.Second / time.Duration(maxops)), } @@ -37,6 +37,10 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger InstanceSet: is, defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags), } + is = filteringInstanceSet{ + InstanceSet: is, + logger: logger, + } return is, err } @@ -77,3 +81,34 @@ func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud. } return is.InstanceSet.Create(it, image, allTags, init, pk) } + +// Filters the instances returned by the wrapped InstanceSet's +// Instances() method (in case the wrapped InstanceSet didn't do this +// itself). +type filteringInstanceSet struct { + cloud.InstanceSet + logger logrus.FieldLogger +} + +func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) { + instances, err := is.InstanceSet.Instances(tags) + + skipped := 0 + var returning []cloud.Instance +nextInstance: + for _, inst := range instances { + instTags := inst.Tags() + for k, v := range tags { + if instTags[k] != v { + skipped++ + continue nextInstance + } + } + returning = append(returning, inst) + } + is.logger.WithFields(logrus.Fields{ + "returning": len(returning), + "skipped": skipped, + }).WithError(err).Debugf("filteringInstanceSet returning instances") + return returning, err +} diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 84b61fc006..8af1037125 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -25,6 +25,7 @@ const ( tagKeyInstanceType = "InstanceType" tagKeyIdleBehavior = "IdleBehavior" tagKeyInstanceSecret = "InstanceSecret" + tagKeyInstanceSetID = "InstanceSetID" ) // An InstanceView shows a worker's current state and recent activity. @@ -91,10 +92,11 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, arvClient: arvClient, + instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, @@ -128,6 +130,7 @@ type Pool struct { // configuration logger logrus.FieldLogger arvClient *arvados.Client + instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string @@ -281,9 +284,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { go func() { defer wp.notify() tags := cloud.InstanceTags{ - tagKeyInstanceType: it.Name, - tagKeyIdleBehavior: string(IdleBehaviorRun), - tagKeyInstanceSecret: secret, + wp.tagPrefix + tagKeyInstanceSetID: string(wp.instanceSetID), + wp.tagPrefix + tagKeyInstanceType: it.Name, + wp.tagPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), + wp.tagPrefix + tagKeyInstanceSecret: secret, } initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) @@ -728,7 +732,7 @@ func (wp *Pool) getInstancesAndSync() error { } wp.logger.Debug("getting instance list") threshold := time.Now() - instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{tagKeyInstanceSetID: string(wp.instanceSetID)}) if err != nil { wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 6939536689..8ab4c98754 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -65,7 +65,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { logger := ctxlog.TestLogger(c) driver := &test.StubDriver{} - is, err := driver.InstanceSet(nil, "", logger) + instanceSetID := cloud.InstanceSetID("test-instance-set-id") + is, err := driver.InstanceSet(nil, instanceSetID, logger) c.Assert(err, check.IsNil) newExecutor := func(cloud.Instance) Executor { @@ -91,7 +92,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { }, } - pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster) + pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) notify := pool.Subscribe() defer pool.Unsubscribe(notify) pool.Create(type1) @@ -126,7 +127,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { c.Log("------- starting new pool, waiting to recover state") - pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster) + pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) notify2 := pool2.Subscribe() defer pool2.Unsubscribe(notify2) waitForIdle(pool2, notify2) -- 2.30.2