lib/cmd
lib/controller
lib/crunchstat
+lib/cloud
lib/dispatchcloud
lib/dispatchcloud/container
lib/dispatchcloud/scheduler
lib/cmd
lib/controller
lib/crunchstat
+ lib/cloud
lib/dispatchcloud
lib/dispatchcloud/container
lib/dispatchcloud/scheduler
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
+ "github.com/mitchellh/mapstructure"
"golang.org/x/crypto/ssh"
)
namePrefix string
}
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID string) (prv InstanceProvider, err error) {
+func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID) (prv InstanceSet, err error) {
azcfg := AzureInstanceSetConfig{}
- err = mapstructure.Decode(config, &azcfg)
- if err != nil {
+ if err = mapstructure.Decode(config, &azcfg); err != nil {
return nil, err
}
ap := AzureInstanceSet{}
- err = ap.setup(azcfg, dispatcherID)
+ err = ap.setup(azcfg, string(dispatcherID))
if err != nil {
return nil, err
}
}, nil
}
-func (az *AzureInstanceSet) Instances(ctx context.Context) ([]Instance, error) {
+func (az *AzureInstanceSet) Instances(ctx context.Context, _ InstanceTags) ([]Instance, error) {
interfaces, err := az.ManageNics(ctx)
if err != nil {
return nil, err
return *ai.vm.Name
}
+func (ai *AzureInstance) ProviderType() string {
+ return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
+}
+
func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
tags := make(map[string]*string)
return nil
}
-func (ai *AzureInstance) Tags(ctx context.Context) (InstanceTags, error) {
+func (ai *AzureInstance) Tags() InstanceTags {
tags := make(map[string]string)
for k, v := range ai.vm.Tags {
}
}
- return tags, nil
+ return tags
}
func (ai *AzureInstance) Destroy(ctx context.Context) error {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
-func (ai *AzureInstance) VerifyPublicKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
+func (ai *AzureInstance) VerifyHostKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
- tags, _ := ai.Tags(ctx)
+ tags := ai.Tags()
tg := tags["ssh-pubkey-fingerprint"]
if tg != "" {
check "gopkg.in/check.v1"
)
-type AzureProviderSuite struct{}
+type AzureInstanceSetSuite struct{}
-var _ = check.Suite(&AzureProviderSuite{})
+var _ = check.Suite(&AzureInstanceSetSuite{})
type VirtualMachinesClientStub struct{}
var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
-func GetProvider() (InstanceProvider, ImageID, arvados.Cluster, error) {
+func GetInstanceSet() (InstanceSet, ImageID, arvados.Cluster, error) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": arvados.InstanceType{
},
})}
if *live != "" {
- cfg := AzureProviderConfig{}
+ cfg := make(map[string]interface{})
err := config.LoadFile(&cfg, *live)
if err != nil {
return nil, ImageID(""), cluster, err
}
- ap, err := NewAzureProvider(cfg, "test123")
- return ap, ImageID(cfg.Image), cluster, err
+ ap, err := NewAzureInstanceSet(cfg, "test123")
+ return ap, ImageID(cfg["image"].(string)), cluster, err
} else {
- ap := AzureProvider{
- azconfig: AzureProviderConfig{
+ ap := AzureInstanceSet{
+ azconfig: AzureInstanceSetConfig{
BlobContainer: "vhds",
},
dispatcherID: "test123",
}
}
-func (*AzureProviderSuite) TestCreate(c *check.C) {
- ap, img, cluster, err := GetProvider()
+func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
+ ap, img, cluster, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
c.Assert(err, check.IsNil)
- tg, _ := inst.Tags(context.Background())
+ tg := inst.Tags()
log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
}
-func (*AzureProviderSuite) TestListInstances(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestListInstances(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background())
+ l, err := ap.Instances(context.Background(), nil)
c.Assert(err, check.IsNil)
for _, i := range l {
- tg, _ := i.Tags(context.Background())
+ tg := i.Tags()
log.Printf("%v %v %v", i.String(), i.Address(), tg)
}
}
-func (*AzureProviderSuite) TestManageNics(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestManageNics(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- ap.(*AzureProvider).ManageNics(context.Background())
+ ap.(*AzureInstanceSet).ManageNics(context.Background())
}
-func (*AzureProviderSuite) TestManageBlobs(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestManageBlobs(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- ap.(*AzureProvider).ManageBlobs(context.Background())
+ ap.(*AzureInstanceSet).ManageBlobs(context.Background())
}
-func (*AzureProviderSuite) TestDestroyInstances(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background())
+ l, err := ap.Instances(context.Background(), nil)
c.Assert(err, check.IsNil)
for _, i := range l {
}
}
-func (*AzureProviderSuite) TestDeleteFake(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestDeleteFake(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- _, err = ap.(*AzureProvider).netClient.Delete(context.Background(), "fakefakefake", "fakefakefake")
+ _, err = ap.(*AzureInstanceSet).netClient.Delete(context.Background(), "fakefakefake", "fakefakefake")
de, ok := err.(autorest.DetailedError)
if ok {
}
}
-func (*AzureProviderSuite) TestWrapError(c *check.C) {
+func (*AzureInstanceSetSuite) TestWrapError(c *check.C) {
retryError := autorest.DetailedError{
Original: &azure.RequestError{
DetailedError: autorest.DetailedError{
c.Check(ok, check.Equals, true)
}
-func (*AzureProviderSuite) TestSetTags(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestSetTags(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background())
+ l, err := ap.Instances(context.Background(), nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
c.Fatal("Error setting tags", err)
}
}
- l, err = ap.Instances(context.Background())
+ l, err = ap.Instances(context.Background(), nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
- tg, _ := l[0].Tags(context.Background())
+ tg := l[0].Tags()
log.Printf("tags are %v", tg)
}
}
-func (*AzureProviderSuite) TestSSH(c *check.C) {
- ap, _, _, err := GetProvider()
+func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
+ ap, _, _, err := GetInstanceSet()
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background())
+ l, err := ap.Instances(context.Background(), nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
return nil, errors.New("BUG: key was never provided to HostKeyCallback")
}
- err = inst.VerifyPublicKey(context.Background(), receivedKey, client)
+ err = inst.VerifyHostKey(context.Background(), receivedKey, client)
c.Assert(err, check.IsNil)
return client, nil
package cloud
import (
+ "context"
"io"
"time"
// VerifyHostKey can use it to make outgoing network
// connections from the instance -- e.g., to use the cloud's
// "this instance's metadata" API.
- VerifyHostKey(ssh.PublicKey, *ssh.Client) error
+ VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error
}
// Instance is implemented by the provider-specific instance types.
Tags() InstanceTags
// Replace tags with the given tags
- SetTags(InstanceTags) error
+ SetTags(context.Context, InstanceTags) error
// Shut down the node
- Destroy() error
+ Destroy(context.Context) error
}
// An InstanceSet manages a set of VM instances created by an elastic
//
// The returned error should implement RateLimitError and
// QuotaError where applicable.
- Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+ Create(context.Context, arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
// Return all instances, including ones that are booting or
// shutting down. Optionally, filter out nodes that don't have
// Instance object each time. Thus, the caller is responsible
// for de-duplicating the returned instances by comparing the
// InstanceIDs returned by the instances' ID() methods.
- Instances(InstanceTags) ([]Instance, error)
+ Instances(context.Context, InstanceTags) ([]Instance, error)
// Stop any background tasks and release other resources.
Stop()
package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
deadline := time.Now().Add(time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
- insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
+ insts, err := s.stubDriver.InstanceSets()[0].Instances(context.TODO(), nil)
c.Check(err, check.IsNil)
queue.Update()
ents, _ := queue.Entries()
)
var drivers = map[string]cloud.Driver{
- "azure": "",
+ "azure": cloud.DriverFunc(cloud.NewAzureInstanceSet),
}
func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) {
package dispatchcloud
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"golang.org/x/crypto/ssh"
cloud.InstanceSet
}
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+func (is *instanceSetProxy) Create(ctx context.Context, it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
// TODO: return if Create failed recently with a RateLimitError or QuotaError
- return is.InstanceSet.Create(it, id, tags, pk)
+ return is.InstanceSet.Create(ctx, it, id, tags, pk)
}
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+func (is *instanceSetProxy) Instances(ctx context.Context, tags cloud.InstanceTags) ([]cloud.Instance, error) {
// TODO: return if Instances failed recently with a RateLimitError
- return is.InstanceSet.Instances(tags)
+ return is.InstanceSet.Instances(ctx, tags)
}
import (
"bytes"
+ "context"
"errors"
"io"
"net"
}
if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
- err = target.VerifyHostKey(receivedKey, client)
+ err = target.VerifyHostKey(context.TODO(), receivedKey, client)
if err != nil {
return nil, err
}
package test
import (
+ "context"
"fmt"
"math/rand"
"sync"
}
// Create returns a new instance.
-func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+func (p *LameInstanceSet) Create(_ context.Context, instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
inst := &lameInstance{
p: p,
id: cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
providerType: instType.ProviderType,
}
- inst.SetTags(tags)
+ inst.SetTags(context.TODO(), tags)
if p.Hold != nil {
p.Hold <- true
}
}
// Instances returns the instances that haven't been destroyed.
-func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (p *LameInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
var instances []cloud.Instance
return "0.0.0.0:1234"
}
-func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+func (inst *lameInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
inst.p.mtx.Lock()
defer inst.p.mtx.Unlock()
inst.tags = cloud.InstanceTags{}
return nil
}
-func (inst *lameInstance) Destroy() error {
+func (inst *lameInstance) Destroy(context.Context) error {
if inst.p.Hold != nil {
inst.p.Hold <- true
}
return inst.tags
}
-func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+func (inst *lameInstance) VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error {
return nil
}
package test
import (
+ "context"
"crypto/rand"
"errors"
"fmt"
stopped bool
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
sis.mtx.Lock()
defer sis.mtx.Unlock()
if sis.stopped {
return svm.Instance(), nil
}
-func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (sis *StubInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
sis.mtx.RLock()
defer sis.mtx.RUnlock()
var r []cloud.Instance
return si.addr
}
-func (si stubInstance) Destroy() error {
+func (si stubInstance) Destroy(_ context.Context) error {
if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
return si.svm.providerType
}
-func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
+func (si stubInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
tags = copyTags(tags)
svm := si.svm
go func() {
return string(si.svm.id)
}
-func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+func (si stubInstance) VerifyHostKey(_ context.Context, key ssh.PublicKey, client *ssh.Client) error {
buf := make([]byte, 512)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
package worker
import (
+ "context"
"io"
"sort"
"strings"
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ inst, err := wp.instanceSet.Create(context.TODO(), it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
wp.setupOnce.Do(wp.setup)
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(context.TODO(), cloud.InstanceTags{})
if err != nil {
return err
}
import (
"bytes"
+ "context"
"strings"
"sync"
"time"
wkr.state = StateShutdown
go wkr.wp.notify()
go func() {
- err := wkr.instance.Destroy()
+ err := wkr.instance.Destroy(context.TODO())
if err != nil {
wkr.logger.WithError(err).Warn("shutdown failed")
return