Merge branch 'master' into 13937-keepstore-prometheus
[arvados.git] / lib / cloud / azure / azure.go
index 2a63d28f6575145508c5bba21392be4b852346d5..d745e7e54d27473147e3f214a213a4514a596131 100644 (file)
@@ -7,6 +7,7 @@ package azure
 import (
        "context"
        "encoding/base64"
+       "encoding/json"
        "fmt"
        "net/http"
        "regexp"
@@ -26,128 +27,130 @@ import (
        "github.com/Azure/go-autorest/autorest/azure/auth"
        "github.com/Azure/go-autorest/autorest/to"
        "github.com/jmcvetta/randutil"
-       "github.com/mitchellh/mapstructure"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
 
-type AzureInstanceSetConfig struct {
-       SubscriptionID               string  "SubscriptionID"
-       ClientID                     string  "ClientID"
-       ClientSecret                 string  "ClientSecret"
-       TenantID                     string  "TenantID"
-       CloudEnvironment             string  "CloudEnvironment"
-       ResourceGroup                string  "ResourceGroup"
-       Location                     string  "Location"
-       Network                      string  "Network"
-       Subnet                       string  "Subnet"
-       StorageAccount               string  "StorageAccount"
-       BlobContainer                string  "BlobContainer"
-       DeleteDanglingResourcesAfter float64 "DeleteDanglingResourcesAfter"
-}
-
-type VirtualMachinesClientWrapper interface {
-       CreateOrUpdate(ctx context.Context,
+// Driver is the azure implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newAzureInstanceSet)
+
+type azureInstanceSetConfig struct {
+       SubscriptionID               string
+       ClientID                     string
+       ClientSecret                 string
+       TenantID                     string
+       CloudEnvironment             string
+       ResourceGroup                string
+       Location                     string
+       Network                      string
+       Subnet                       string
+       StorageAccount               string
+       BlobContainer                string
+       DeleteDanglingResourcesAfter arvados.Duration
+}
+
+type virtualMachinesClientWrapper interface {
+       createOrUpdate(ctx context.Context,
                resourceGroupName string,
                VMName string,
                parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
-       Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
-       ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
+       delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
+       listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
 }
 
-type VirtualMachinesClientImpl struct {
+type virtualMachinesClientImpl struct {
        inner compute.VirtualMachinesClient
 }
 
-func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
        resourceGroupName string,
        VMName string,
        parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
 
        future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
        if err != nil {
-               return compute.VirtualMachine{}, WrapAzureError(err)
+               return compute.VirtualMachine{}, wrapAzureError(err)
        }
        future.WaitForCompletionRef(ctx, cl.inner.Client)
        r, err := future.Result(cl.inner)
-       return r, WrapAzureError(err)
+       return r, wrapAzureError(err)
 }
 
-func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
        future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
        err = future.WaitForCompletionRef(ctx, cl.inner.Client)
-       return future.Response(), WrapAzureError(err)
+       return future.Response(), wrapAzureError(err)
 }
 
-func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
        r, err := cl.inner.ListComplete(ctx, resourceGroupName)
-       return r, WrapAzureError(err)
+       return r, wrapAzureError(err)
 }
 
-type InterfacesClientWrapper interface {
-       CreateOrUpdate(ctx context.Context,
+type interfacesClientWrapper interface {
+       createOrUpdate(ctx context.Context,
                resourceGroupName string,
                networkInterfaceName string,
                parameters network.Interface) (result network.Interface, err error)
-       Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
-       ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
+       delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
+       listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
 }
 
-type InterfacesClientImpl struct {
+type interfacesClientImpl struct {
        inner network.InterfacesClient
 }
 
-func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
        future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
        err = future.WaitForCompletionRef(ctx, cl.inner.Client)
-       return future.Response(), WrapAzureError(err)
+       return future.Response(), wrapAzureError(err)
 }
 
-func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
        resourceGroupName string,
        networkInterfaceName string,
        parameters network.Interface) (result network.Interface, err error) {
 
        future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
        if err != nil {
-               return network.Interface{}, WrapAzureError(err)
+               return network.Interface{}, wrapAzureError(err)
        }
        future.WaitForCompletionRef(ctx, cl.inner.Client)
        r, err := future.Result(cl.inner)
-       return r, WrapAzureError(err)
+       return r, wrapAzureError(err)
 }
 
-func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
        r, err := cl.inner.ListComplete(ctx, resourceGroupName)
-       return r, WrapAzureError(err)
+       return r, wrapAzureError(err)
 }
 
 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
 
-type AzureRateLimitError struct {
+type azureRateLimitError struct {
        azure.RequestError
-       earliestRetry time.Time
+       firstRetry time.Time
 }
 
-func (ar *AzureRateLimitError) EarliestRetry() time.Time {
-       return ar.earliestRetry
+func (ar *azureRateLimitError) EarliestRetry() time.Time {
+       return ar.firstRetry
 }
 
-type AzureQuotaError struct {
+type azureQuotaError struct {
        azure.RequestError
 }
 
-func (ar *AzureQuotaError) IsQuotaError() bool {
+func (ar *azureQuotaError) IsQuotaError() bool {
        return true
 }
 
-func WrapAzureError(err error) error {
+func wrapAzureError(err error) error {
        de, ok := err.(autorest.DetailedError)
        if !ok {
                return err
@@ -174,21 +177,21 @@ func WrapAzureError(err error) error {
                                earliestRetry = time.Now().Add(20 * time.Second)
                        }
                }
-               return &AzureRateLimitError{*rq, earliestRetry}
+               return &azureRateLimitError{*rq, earliestRetry}
        }
        if rq.ServiceError == nil {
                return err
        }
        if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
-               return &AzureQuotaError{*rq}
+               return &azureQuotaError{*rq}
        }
        return err
 }
 
-type AzureInstanceSet struct {
-       azconfig          AzureInstanceSetConfig
-       vmClient          VirtualMachinesClientWrapper
-       netClient         InterfacesClientWrapper
+type azureInstanceSet struct {
+       azconfig          azureInstanceSetConfig
+       vmClient          virtualMachinesClientWrapper
+       netClient         interfacesClientWrapper
        storageAcctClient storageacct.AccountsClient
        azureEnv          azure.Environment
        interfaces        map[string]network.Interface
@@ -202,12 +205,14 @@ type AzureInstanceSet struct {
        logger            logrus.FieldLogger
 }
 
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
-       azcfg := AzureInstanceSetConfig{}
-       if err = mapstructure.Decode(config, &azcfg); err != nil {
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+       azcfg := azureInstanceSetConfig{}
+       err = json.Unmarshal(config, &azcfg)
+       if err != nil {
                return nil, err
        }
-       ap := AzureInstanceSet{logger: logger}
+
+       ap := azureInstanceSet{logger: logger}
        err = ap.setup(azcfg, string(dispatcherID))
        if err != nil {
                return nil, err
@@ -215,7 +220,7 @@ func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.Insta
        return &ap, nil
 }
 
-func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
+func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
        az.azconfig = azcfg
        vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
        netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
@@ -241,8 +246,8 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
        netClient.Authorizer = authorizer
        storageAcctClient.Authorizer = authorizer
 
-       az.vmClient = &VirtualMachinesClientImpl{vmClient}
-       az.netClient = &InterfacesClientImpl{netClient}
+       az.vmClient = &virtualMachinesClientImpl{vmClient}
+       az.netClient = &interfacesClientImpl{netClient}
        az.storageAcctClient = storageAcctClient
 
        az.dispatcherID = dispatcherID
@@ -260,7 +265,7 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
                                tk.Stop()
                                return
                        case <-tk.C:
-                               az.ManageBlobs()
+                               az.manageBlobs()
                        }
                }
        }()
@@ -275,7 +280,7 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
                                if !ok {
                                        return
                                }
-                               _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+                               _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
                                if delerr != nil {
                                        az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
                                } else {
@@ -302,7 +307,7 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
        return nil
 }
 
-func (az *AzureInstanceSet) Create(
+func (az *azureInstanceSet) Create(
        instanceType arvados.InstanceType,
        imageID cloud.ImageID,
        newTags cloud.InstanceTags,
@@ -355,9 +360,9 @@ func (az *AzureInstanceSet) Create(
                        },
                },
        }
-       nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
+       nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
 
        instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
@@ -418,40 +423,40 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
                },
        }
 
-       vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
+       vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
 
-       return &AzureInstance{
+       return &azureInstance{
                provider: az,
                nic:      nic,
                vm:       vm,
        }, nil
 }
 
-func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
-       interfaces, err := az.ManageNics()
+       interfaces, err := az.manageNics()
        if err != nil {
                return nil, err
        }
 
-       result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+       result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
 
        instances := make([]cloud.Instance, 0)
 
        for ; result.NotDone(); err = result.Next() {
                if err != nil {
-                       return nil, WrapAzureError(err)
+                       return nil, wrapAzureError(err)
                }
                if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
-                       instances = append(instances, &AzureInstance{
+                       instances = append(instances, &azureInstance{
                                provider: az,
                                vm:       result.Value(),
                                nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
@@ -465,13 +470,13 @@ func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
 // not associated with a virtual machine and have a "create-at" time
 // more than DeleteDanglingResourcesAfter (to prevent racing and
 // deleting newly created NICs) in the past are deleted.
-func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
+func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
-       result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+       result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
        if err != nil {
-               return nil, WrapAzureError(err)
+               return nil, wrapAzureError(err)
        }
 
        interfaces := make(map[string]network.Interface)
@@ -489,7 +494,7 @@ func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
                                if result.Value().Tags["created-at"] != nil {
                                        createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
                                        if err == nil {
-                                               if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+                                               if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
                                                        az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
                                                        az.deleteNIC <- *result.Value().Name
                                                }
@@ -506,7 +511,7 @@ func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
 // have "namePrefix", are "available" (which means they are not
 // leased to a VM) and haven't been modified for
 // DeleteDanglingResourcesAfter seconds.
-func (az *AzureInstanceSet) ManageBlobs() {
+func (az *azureInstanceSet) manageBlobs() {
        result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
        if err != nil {
                az.logger.WithError(err).Warn("Couldn't get account keys")
@@ -537,7 +542,7 @@ func (az *AzureInstanceSet) ManageBlobs() {
                        if b.Properties.BlobType == storage.BlobTypePage &&
                                b.Properties.LeaseState == "available" &&
                                b.Properties.LeaseStatus == "unlocked" &&
-                               age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+                               age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
 
                                az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
                                az.deleteBlob <- b
@@ -551,32 +556,32 @@ func (az *AzureInstanceSet) ManageBlobs() {
        }
 }
 
-func (az *AzureInstanceSet) Stop() {
+func (az *azureInstanceSet) Stop() {
        az.stopFunc()
        az.stopWg.Wait()
        close(az.deleteNIC)
        close(az.deleteBlob)
 }
 
-type AzureInstance struct {
-       provider *AzureInstanceSet
+type azureInstance struct {
+       provider *azureInstanceSet
        nic      network.Interface
        vm       compute.VirtualMachine
 }
 
-func (ai *AzureInstance) ID() cloud.InstanceID {
+func (ai *azureInstance) ID() cloud.InstanceID {
        return cloud.InstanceID(*ai.vm.ID)
 }
 
-func (ai *AzureInstance) String() string {
+func (ai *azureInstance) String() string {
        return *ai.vm.Name
 }
 
-func (ai *AzureInstance) ProviderType() string {
+func (ai *azureInstance) ProviderType() string {
        return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
 }
 
-func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
+func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
        ai.provider.stopWg.Add(1)
        defer ai.provider.stopWg.Done()
 
@@ -596,16 +601,16 @@ func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
                Location: &ai.provider.azconfig.Location,
                Tags:     tags,
        }
-       vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
+       vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
        if err != nil {
-               return WrapAzureError(err)
+               return wrapAzureError(err)
        }
        ai.vm = vm
 
        return nil
 }
 
-func (ai *AzureInstance) Tags() cloud.InstanceTags {
+func (ai *azureInstance) Tags() cloud.InstanceTags {
        tags := make(map[string]string)
 
        for k, v := range ai.vm.Tags {
@@ -617,19 +622,19 @@ func (ai *AzureInstance) Tags() cloud.InstanceTags {
        return tags
 }
 
-func (ai *AzureInstance) Destroy() error {
+func (ai *azureInstance) Destroy() error {
        ai.provider.stopWg.Add(1)
        defer ai.provider.stopWg.Done()
 
-       _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
-       return WrapAzureError(err)
+       _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
+       return wrapAzureError(err)
 }
 
-func (ai *AzureInstance) Address() string {
+func (ai *azureInstance) Address() string {
        return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
 }
 
-func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
+func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
        ai.provider.stopWg.Add(1)
        defer ai.provider.stopWg.Done()
 
@@ -641,9 +646,8 @@ func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Cl
        if tg != "" {
                if remoteFingerprint == tg {
                        return nil
-               } else {
-                       return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
                }
+               return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
        }
 
        nodetokenTag := tags["node-token"]