return nil, wrapAzureError(err)
}
- instances := make([]cloud.Instance, 0)
-
+ var instances []cloud.Instance
for ; result.NotDone(); err = result.Next() {
if err != nil {
return nil, wrapAzureError(err)
}
- if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &azureInstance{
- provider: az,
- vm: result.Value(),
- nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
- }
+ instances = append(instances, &azureInstance{
+ provider: az,
+ vm: result.Value(),
+ nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+ })
}
return instances, nil
}
"net"
"net/http"
"os"
+ "strings"
"testing"
"time"
var _ = check.Suite(&AzureInstanceSetSuite{})
+const testNamePrefix = "compute-test123-"
+
type VirtualMachinesClientStub struct{}
func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
BlobContainer: "vhds",
},
dispatcherID: "test123",
- namePrefix: "compute-test123-",
+ namePrefix: testNamePrefix,
logger: logrus.StandardLogger(),
deleteNIC: make(chan string),
deleteBlob: make(chan storage.Blob),
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
- for _, i := range l {
+ for _, i := range filterInstances(c, l) {
c.Check(i.Destroy(), check.IsNil)
}
}
if err != nil {
c.Fatal("Error making provider", err)
}
+
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
-
+ l = filterInstances(c, l)
if len(l) > 0 {
err = l[0].SetTags(map[string]string{"foo": "bar"})
if err != nil {
c.Fatal("Error setting tags", err)
}
}
+
l, err = ap.Instances(nil)
c.Assert(err, check.IsNil)
+ l = filterInstances(c, l)
if len(l) > 0 {
tg := l[0].Tags()
}
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
+ l = filterInstances(c, l)
if len(l) > 0 {
sshclient, err := SetupSSHClient(c, l[0])
return client, nil
}
+
+func filterInstances(c *check.C, instances []cloud.Instance) []cloud.Instance {
+ var r []cloud.Instance
+ for _, i := range instances {
+ if !strings.HasPrefix(i.String(), testNamePrefix) {
+ c.Logf("ignoring instance %s", i)
+ continue
+ }
+ r = append(r, i)
+ }
+ return r
+}
"golang.org/x/crypto/ssh"
)
-const tagKeyInstanceSetID = "arvados-dispatch-id"
-
// Driver is the ec2 implementation of the cloud.Driver interface.
var Driver = cloud.DriverFunc(newEC2InstanceSet)
}
instanceSet.keysMtx.Unlock()
- ec2tags := []*ec2.Tag{
- &ec2.Tag{
- Key: aws.String(tagKeyInstanceSetID),
- Value: aws.String(string(instanceSet.instanceSetID)),
- },
- }
+ ec2tags := []*ec2.Tag{}
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
Key: aws.String(k),
}, nil
}
-func (instanceSet *ec2InstanceSet) Instances(cloud.InstanceTags) (instances []cloud.Instance, err error) {
- dii := &ec2.DescribeInstancesInput{
- Filters: []*ec2.Filter{&ec2.Filter{
- Name: aws.String("tag:" + tagKeyInstanceSetID),
- Values: []*string{aws.String(string(instanceSet.instanceSetID))},
- }}}
-
+func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) {
+ var filters []*ec2.Filter
+ for k, v := range tags {
+ filters = append(filters, &ec2.Filter{
+ Name: aws.String("tag:" + k),
+ Values: []*string{aws.String(v)},
+ })
+ }
+ dii := &ec2.DescribeInstancesInput{Filters: filters}
for {
dio, err := instanceSet.client.DescribeInstances(dii)
if err != nil {
}
func (inst *ec2Instance) SetTags(newTags cloud.InstanceTags) error {
- ec2tags := []*ec2.Tag{
- &ec2.Tag{
- Key: aws.String(tagKeyInstanceSetID),
- Value: aws.String(string(inst.provider.instanceSetID)),
- },
- }
+ var ec2tags []*ec2.Tag
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
Key: aws.String(k),
// other mechanism. The tags must be visible to another instance of
// the same driver running on a different host.
//
-// The returned InstanceSet must ignore existing resources that are
-// visible but not tagged with the given id, except that it should log
-// a summary of such resources -- only once -- when it starts
-// up. Thus, two identically configured InstanceSets running on
-// different hosts with different ids should log about the existence
-// of each other's resources at startup, but will not interfere with
-// each other.
+// The returned InstanceSet must not modify or delete cloud resources
+// unless they are tagged with the given InstanceSetID or the caller
+// (dispatcher) calls Destroy() on them. It may log a summary of
+// untagged resources once at startup, though. Thus, two identically
+// configured InstanceSets running on different hosts with different
+// ids should log about the existence of each other's resources at
+// startup, but will not interfere with each other.
+//
+// The dispatcher always passes the InstanceSetID as a tag when
+// calling Create() and Instances(), so the driver does not need to
+// tag/filter VMs by InstanceSetID itself.
//
// Example:
//
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
}
is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, logger)
if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
- is = &rateLimitedInstanceSet{
+ is = rateLimitedInstanceSet{
InstanceSet: is,
ticker: time.NewTicker(time.Second / time.Duration(maxops)),
}
InstanceSet: is,
defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
}
+ is = filteringInstanceSet{
+ InstanceSet: is,
+ logger: logger,
+ }
return is, err
}
}
return is.InstanceSet.Create(it, image, allTags, init, pk)
}
+
+// Filters the instances returned by the wrapped InstanceSet's
+// Instances() method (in case the wrapped InstanceSet didn't do this
+// itself).
+type filteringInstanceSet struct {
+ cloud.InstanceSet
+ logger logrus.FieldLogger
+}
+
+func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+ instances, err := is.InstanceSet.Instances(tags)
+
+ skipped := 0
+ var returning []cloud.Instance
+nextInstance:
+ for _, inst := range instances {
+ instTags := inst.Tags()
+ for k, v := range tags {
+ if instTags[k] != v {
+ skipped++
+ continue nextInstance
+ }
+ }
+ returning = append(returning, inst)
+ }
+ is.logger.WithFields(logrus.Fields{
+ "returning": len(returning),
+ "skipped": skipped,
+ }).WithError(err).Debugf("filteringInstanceSet returning instances")
+ return returning, err
+}
tagKeyInstanceType = "InstanceType"
tagKeyIdleBehavior = "IdleBehavior"
tagKeyInstanceSecret = "InstanceSecret"
+ tagKeyInstanceSetID = "InstanceSetID"
)
// An InstanceView shows a worker's current state and recent activity.
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
+ instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
go func() {
defer wp.notify()
tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- tagKeyInstanceSecret: secret,
+ wp.tagPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
+ wp.tagPrefix + tagKeyInstanceType: it.Name,
+ wp.tagPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
+ wp.tagPrefix + tagKeyInstanceSecret: secret,
}
initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
}
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{tagKeyInstanceSetID: string(wp.instanceSetID)})
if err != nil {
wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
- is, err := driver.InstanceSet(nil, "", logger)
+ instanceSetID := cloud.InstanceSetID("test-instance-set-id")
+ is, err := driver.InstanceSet(nil, instanceSetID, logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
},
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)