14324: Azure driver for crunch-dispatch-cloud
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 9 Jan 2019 21:13:56 +0000 (16:13 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 9 Jan 2019 21:28:16 +0000 (16:28 -0500)
* Adds context parameter to methods of InstanceSet interface that may block
* Fix tests

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

12 files changed:
build/run-tests.sh
lib/cloud/azure.go
lib/cloud/azure_test.go
lib/cloud/interfaces.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/instance_set_proxy.go
lib/dispatchcloud/ssh_executor/executor.go
lib/dispatchcloud/test/lame_instance_set.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go

index cb44372566f8eb36ec6163f9108c97e464d6fca8..0a26e851365237234262bb44c549edf5116735c7 100755 (executable)
@@ -76,6 +76,7 @@ lib/cli
 lib/cmd
 lib/controller
 lib/crunchstat
+lib/cloud
 lib/dispatchcloud
 lib/dispatchcloud/container
 lib/dispatchcloud/scheduler
@@ -929,6 +930,7 @@ gostuff=(
     lib/cmd
     lib/controller
     lib/crunchstat
+    lib/cloud
     lib/dispatchcloud
     lib/dispatchcloud/container
     lib/dispatchcloud/scheduler
index 734be7bef20225c28d6fa684c253f4ac76a6a787..2ee66837218ed9e40017b0ccb8d00b96a6c042de 100644 (file)
@@ -26,6 +26,7 @@ import (
        "github.com/Azure/go-autorest/autorest/azure/auth"
        "github.com/Azure/go-autorest/autorest/to"
        "github.com/jmcvetta/randutil"
+       "github.com/mitchellh/mapstructure"
        "golang.org/x/crypto/ssh"
 )
 
@@ -196,14 +197,13 @@ type AzureInstanceSet struct {
        namePrefix        string
 }
 
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID string) (prv InstanceProvider, err error) {
+func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID) (prv InstanceSet, err error) {
        azcfg := AzureInstanceSetConfig{}
-       err = mapstructure.Decode(config, &azcfg)
-       if err != nil {
+       if err = mapstructure.Decode(config, &azcfg); err != nil {
                return nil, err
        }
        ap := AzureInstanceSet{}
-       err = ap.setup(azcfg, dispatcherID)
+       err = ap.setup(azcfg, string(dispatcherID))
        if err != nil {
                return nil, err
        }
@@ -376,7 +376,7 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
        }, nil
 }
 
-func (az *AzureInstanceSet) Instances(ctx context.Context) ([]Instance, error) {
+func (az *AzureInstanceSet) Instances(ctx context.Context, _ InstanceTags) ([]Instance, error) {
        interfaces, err := az.ManageNics(ctx)
        if err != nil {
                return nil, err
@@ -549,6 +549,10 @@ func (ai *AzureInstance) String() string {
        return *ai.vm.Name
 }
 
+func (ai *AzureInstance) ProviderType() string {
+       return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
+}
+
 func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
        tags := make(map[string]*string)
 
@@ -575,7 +579,7 @@ func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) erro
        return nil
 }
 
-func (ai *AzureInstance) Tags(ctx context.Context) (InstanceTags, error) {
+func (ai *AzureInstance) Tags() InstanceTags {
        tags := make(map[string]string)
 
        for k, v := range ai.vm.Tags {
@@ -584,7 +588,7 @@ func (ai *AzureInstance) Tags(ctx context.Context) (InstanceTags, error) {
                }
        }
 
-       return tags, nil
+       return tags
 }
 
 func (ai *AzureInstance) Destroy(ctx context.Context) error {
@@ -596,10 +600,10 @@ func (ai *AzureInstance) Address() string {
        return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
 }
 
-func (ai *AzureInstance) VerifyPublicKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
+func (ai *AzureInstance) VerifyHostKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
        remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
 
-       tags, _ := ai.Tags(ctx)
+       tags := ai.Tags()
 
        tg := tags["ssh-pubkey-fingerprint"]
        if tg != "" {
index aaa3d2846a28be0ab7fbfe55be5c619a298ba86c..a123bc4bb565ceefc5d1dc7ea71f82ce869f8353 100644 (file)
@@ -27,9 +27,9 @@ import (
        check "gopkg.in/check.v1"
 )
 
-type AzureProviderSuite struct{}
+type AzureInstanceSetSuite struct{}
 
-var _ = check.Suite(&AzureProviderSuite{})
+var _ = check.Suite(&AzureInstanceSetSuite{})
 
 type VirtualMachinesClientStub struct{}
 
@@ -71,7 +71,7 @@ func (*InterfacesClientStub) ListComplete(ctx context.Context, resourceGroupName
 
 var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
 
-func GetProvider() (InstanceProvider, ImageID, arvados.Cluster, error) {
+func GetInstanceSet() (InstanceSet, ImageID, arvados.Cluster, error) {
        cluster := arvados.Cluster{
                InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
                        "tiny": arvados.InstanceType{
@@ -85,16 +85,16 @@ func GetProvider() (InstanceProvider, ImageID, arvados.Cluster, error) {
                        },
                })}
        if *live != "" {
-               cfg := AzureProviderConfig{}
+               cfg := make(map[string]interface{})
                err := config.LoadFile(&cfg, *live)
                if err != nil {
                        return nil, ImageID(""), cluster, err
                }
-               ap, err := NewAzureProvider(cfg, "test123")
-               return ap, ImageID(cfg.Image), cluster, err
+               ap, err := NewAzureInstanceSet(cfg, "test123")
+               return ap, ImageID(cfg["image"].(string)), cluster, err
        } else {
-               ap := AzureProvider{
-                       azconfig: AzureProviderConfig{
+               ap := AzureInstanceSet{
+                       azconfig: AzureInstanceSetConfig{
                                BlobContainer: "vhds",
                        },
                        dispatcherID: "test123",
@@ -106,8 +106,8 @@ func GetProvider() (InstanceProvider, ImageID, arvados.Cluster, error) {
        }
 }
 
-func (*AzureProviderSuite) TestCreate(c *check.C) {
-       ap, img, cluster, err := GetProvider()
+func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
+       ap, img, cluster, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
@@ -132,52 +132,52 @@ func (*AzureProviderSuite) TestCreate(c *check.C) {
 
        c.Assert(err, check.IsNil)
 
-       tg, _ := inst.Tags(context.Background())
+       tg := inst.Tags()
        log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
 
 }
 
-func (*AzureProviderSuite) TestListInstances(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestListInstances(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
 
-       l, err := ap.Instances(context.Background())
+       l, err := ap.Instances(context.Background(), nil)
 
        c.Assert(err, check.IsNil)
 
        for _, i := range l {
-               tg, _ := i.Tags(context.Background())
+               tg := i.Tags()
                log.Printf("%v %v %v", i.String(), i.Address(), tg)
        }
 }
 
-func (*AzureProviderSuite) TestManageNics(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestManageNics(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
 
-       ap.(*AzureProvider).ManageNics(context.Background())
+       ap.(*AzureInstanceSet).ManageNics(context.Background())
 }
 
-func (*AzureProviderSuite) TestManageBlobs(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestManageBlobs(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
 
-       ap.(*AzureProvider).ManageBlobs(context.Background())
+       ap.(*AzureInstanceSet).ManageBlobs(context.Background())
 }
 
-func (*AzureProviderSuite) TestDestroyInstances(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
 
-       l, err := ap.Instances(context.Background())
+       l, err := ap.Instances(context.Background(), nil)
        c.Assert(err, check.IsNil)
 
        for _, i := range l {
@@ -185,13 +185,13 @@ func (*AzureProviderSuite) TestDestroyInstances(c *check.C) {
        }
 }
 
-func (*AzureProviderSuite) TestDeleteFake(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestDeleteFake(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
 
-       _, err = ap.(*AzureProvider).netClient.Delete(context.Background(), "fakefakefake", "fakefakefake")
+       _, err = ap.(*AzureInstanceSet).netClient.Delete(context.Background(), "fakefakefake", "fakefakefake")
 
        de, ok := err.(autorest.DetailedError)
        if ok {
@@ -201,7 +201,7 @@ func (*AzureProviderSuite) TestDeleteFake(c *check.C) {
        }
 }
 
-func (*AzureProviderSuite) TestWrapError(c *check.C) {
+func (*AzureInstanceSetSuite) TestWrapError(c *check.C) {
        retryError := autorest.DetailedError{
                Original: &azure.RequestError{
                        DetailedError: autorest.DetailedError{
@@ -234,12 +234,12 @@ func (*AzureProviderSuite) TestWrapError(c *check.C) {
        c.Check(ok, check.Equals, true)
 }
 
-func (*AzureProviderSuite) TestSetTags(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestSetTags(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
-       l, err := ap.Instances(context.Background())
+       l, err := ap.Instances(context.Background(), nil)
        c.Assert(err, check.IsNil)
 
        if len(l) > 0 {
@@ -248,21 +248,21 @@ func (*AzureProviderSuite) TestSetTags(c *check.C) {
                        c.Fatal("Error setting tags", err)
                }
        }
-       l, err = ap.Instances(context.Background())
+       l, err = ap.Instances(context.Background(), nil)
        c.Assert(err, check.IsNil)
 
        if len(l) > 0 {
-               tg, _ := l[0].Tags(context.Background())
+               tg := l[0].Tags()
                log.Printf("tags are %v", tg)
        }
 }
 
-func (*AzureProviderSuite) TestSSH(c *check.C) {
-       ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
+       ap, _, _, err := GetInstanceSet()
        if err != nil {
                c.Fatal("Error making provider", err)
        }
-       l, err := ap.Instances(context.Background())
+       l, err := ap.Instances(context.Background(), nil)
        c.Assert(err, check.IsNil)
 
        if len(l) > 0 {
@@ -316,7 +316,7 @@ func SetupSSHClient(c *check.C, inst Instance) (*ssh.Client, error) {
                return nil, errors.New("BUG: key was never provided to HostKeyCallback")
        }
 
-       err = inst.VerifyPublicKey(context.Background(), receivedKey, client)
+       err = inst.VerifyHostKey(context.Background(), receivedKey, client)
        c.Assert(err, check.IsNil)
 
        return client, nil
index e3a072582beb32fe59e4f40335186fd0706c45a9..c2b1acbabacb727d25be48b8a2b0c7b9d01916e5 100644 (file)
@@ -5,6 +5,7 @@
 package cloud
 
 import (
+       "context"
        "io"
        "time"
 
@@ -66,7 +67,7 @@ type ExecutorTarget interface {
        // VerifyHostKey can use it to make outgoing network
        // connections from the instance -- e.g., to use the cloud's
        // "this instance's metadata" API.
-       VerifyHostKey(ssh.PublicKey, *ssh.Client) error
+       VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error
 }
 
 // Instance is implemented by the provider-specific instance types.
@@ -88,10 +89,10 @@ type Instance interface {
        Tags() InstanceTags
 
        // Replace tags with the given tags
-       SetTags(InstanceTags) error
+       SetTags(context.Context, InstanceTags) error
 
        // Shut down the node
-       Destroy() error
+       Destroy(context.Context) error
 }
 
 // An InstanceSet manages a set of VM instances created by an elastic
@@ -105,7 +106,7 @@ type InstanceSet interface {
        //
        // The returned error should implement RateLimitError and
        // QuotaError where applicable.
-       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+       Create(context.Context, arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
 
        // Return all instances, including ones that are booting or
        // shutting down. Optionally, filter out nodes that don't have
@@ -117,7 +118,7 @@ type InstanceSet interface {
        // Instance object each time. Thus, the caller is responsible
        // for de-duplicating the returned instances by comparing the
        // InstanceIDs returned by the instances' ID() methods.
-       Instances(InstanceTags) ([]Instance, error)
+       Instances(context.Context, InstanceTags) ([]Instance, error)
 
        // Stop any background tasks and release other resources.
        Stop()
index 33823a828d30c610f749388f42e45c0bb692a1c2..87122a85c8bd6bc985f19ecab516ec9a82d4eb36 100644 (file)
@@ -5,6 +5,7 @@
 package dispatchcloud
 
 import (
+       "context"
        "encoding/json"
        "io/ioutil"
        "math/rand"
@@ -165,7 +166,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 
        deadline := time.Now().Add(time.Second)
        for range time.NewTicker(10 * time.Millisecond).C {
-               insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
+               insts, err := s.stubDriver.InstanceSets()[0].Instances(context.TODO(), nil)
                c.Check(err, check.IsNil)
                queue.Update()
                ents, _ := queue.Entries()
index fe5362437299bb00ac04dac2b444cb0551afe202..97c1bc773a8e420b8e6abd88c07e63bf20a4ad01 100644 (file)
@@ -12,7 +12,7 @@ import (
 )
 
 var drivers = map[string]cloud.Driver{
-       "azure": "",
+       "azure": cloud.DriverFunc(cloud.NewAzureInstanceSet),
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) {
index e728b67cd21b013bb4b75649bc8505f21be6312a..80c5104580d9f44de9f392d197839c4b9c06a21c 100644 (file)
@@ -5,6 +5,8 @@
 package dispatchcloud
 
 import (
+       "context"
+
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "golang.org/x/crypto/ssh"
@@ -14,12 +16,12 @@ type instanceSetProxy struct {
        cloud.InstanceSet
 }
 
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+func (is *instanceSetProxy) Create(ctx context.Context, it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
        // TODO: return if Create failed recently with a RateLimitError or QuotaError
-       return is.InstanceSet.Create(it, id, tags, pk)
+       return is.InstanceSet.Create(ctx, it, id, tags, pk)
 }
 
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+func (is *instanceSetProxy) Instances(ctx context.Context, tags cloud.InstanceTags) ([]cloud.Instance, error) {
        // TODO: return if Instances failed recently with a RateLimitError
-       return is.InstanceSet.Instances(tags)
+       return is.InstanceSet.Instances(ctx, tags)
 }
index b5dba9870dc041bf730e71e2a3f0dd2bca1ab7fc..c9b0101a7775d98e6f184232d807143c66fd6bb3 100644 (file)
@@ -8,6 +8,7 @@ package ssh_executor
 
 import (
        "bytes"
+       "context"
        "errors"
        "io"
        "net"
@@ -180,7 +181,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        }
 
        if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
-               err = target.VerifyHostKey(receivedKey, client)
+               err = target.VerifyHostKey(context.TODO(), receivedKey, client)
                if err != nil {
                        return nil, err
                }
index baab407a7d0e89c2eee7bbfecae71d4e58cbdb70..c52f7c645cd672bb1ecf6f212aab49d1bf58dfd0 100644 (file)
@@ -5,6 +5,7 @@
 package test
 
 import (
+       "context"
        "fmt"
        "math/rand"
        "sync"
@@ -24,13 +25,13 @@ type LameInstanceSet struct {
 }
 
 // Create returns a new instance.
-func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+func (p *LameInstanceSet) Create(_ context.Context, instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
        inst := &lameInstance{
                p:            p,
                id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
                providerType: instType.ProviderType,
        }
-       inst.SetTags(tags)
+       inst.SetTags(context.TODO(), tags)
        if p.Hold != nil {
                p.Hold <- true
        }
@@ -44,7 +45,7 @@ func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.Im
 }
 
 // Instances returns the instances that haven't been destroyed.
-func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (p *LameInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        var instances []cloud.Instance
@@ -89,7 +90,7 @@ func (inst *lameInstance) Address() string {
        return "0.0.0.0:1234"
 }
 
-func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+func (inst *lameInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
        inst.p.mtx.Lock()
        defer inst.p.mtx.Unlock()
        inst.tags = cloud.InstanceTags{}
@@ -99,7 +100,7 @@ func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
        return nil
 }
 
-func (inst *lameInstance) Destroy() error {
+func (inst *lameInstance) Destroy(context.Context) error {
        if inst.p.Hold != nil {
                inst.p.Hold <- true
        }
@@ -113,6 +114,6 @@ func (inst *lameInstance) Tags() cloud.InstanceTags {
        return inst.tags
 }
 
-func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+func (inst *lameInstance) VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error {
        return nil
 }
index 8bdfaa947dfb2b9dcb0185dbe8eb2ab19a601bf3..8f40b85eb9a2ea9d63621c64446d6aff736c9dff 100644 (file)
@@ -5,6 +5,7 @@
 package test
 
 import (
+       "context"
        "crypto/rand"
        "errors"
        "fmt"
@@ -68,7 +69,7 @@ type StubInstanceSet struct {
        stopped bool
 }
 
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        if sis.stopped {
@@ -96,7 +97,7 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
        return svm.Instance(), nil
 }
 
-func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (sis *StubInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
        sis.mtx.RLock()
        defer sis.mtx.RUnlock()
        var r []cloud.Instance
@@ -261,7 +262,7 @@ func (si stubInstance) Address() string {
        return si.addr
 }
 
-func (si stubInstance) Destroy() error {
+func (si stubInstance) Destroy(_ context.Context) error {
        if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
                return errors.New("instance could not be destroyed")
        }
@@ -277,7 +278,7 @@ func (si stubInstance) ProviderType() string {
        return si.svm.providerType
 }
 
-func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
+func (si stubInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
        tags = copyTags(tags)
        svm := si.svm
        go func() {
@@ -296,7 +297,7 @@ func (si stubInstance) String() string {
        return string(si.svm.id)
 }
 
-func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+func (si stubInstance) VerifyHostKey(_ context.Context, key ssh.PublicKey, client *ssh.Client) error {
        buf := make([]byte, 512)
        _, err := io.ReadFull(rand.Reader, buf)
        if err != nil {
index ff5f762c1d225575f0ad4eeb1b69bb4de463d281..30a1312d3ecfa81d974eb59fa5b2f8281041eab5 100644 (file)
@@ -5,6 +5,7 @@
 package worker
 
 import (
+       "context"
        "io"
        "sort"
        "strings"
@@ -225,7 +226,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
        wp.creating[it] = append(wp.creating[it], now)
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               inst, err := wp.instanceSet.Create(context.TODO(), it, wp.imageID, tags, nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
                // Remove our timestamp marker from wp.creating
@@ -626,7 +627,7 @@ func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(context.TODO(), cloud.InstanceTags{})
        if err != nil {
                return err
        }
index c26186309ad013f2926ba351eee4e2326595920e..a4578b26da50dab9257df1fc4977c862848076fc 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "context"
        "strings"
        "sync"
        "time"
@@ -311,7 +312,7 @@ func (wkr *worker) shutdown() {
        wkr.state = StateShutdown
        go wkr.wp.notify()
        go func() {
-               err := wkr.instance.Destroy()
+               err := wkr.instance.Destroy(context.TODO())
                if err != nil {
                        wkr.logger.WithError(err).Warn("shutdown failed")
                        return