16625: a-d-c: add support for Azure managed images. Update our packer
[arvados.git] / lib / cloud / azure / azure.go
index 8ae8a44811529f598cb8ecee0044919b76bf467e..b448bddd701180b00b78fa308f80791d35b6c084 100644 (file)
@@ -16,9 +16,9 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
        "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
        storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
        "github.com/Azure/azure-sdk-for-go/storage"
@@ -41,8 +41,10 @@ type azureInstanceSetConfig struct {
        TenantID                     string
        CloudEnvironment             string
        ResourceGroup                string
+       ImageResourceGroup           string
        Location                     string
        Network                      string
+       NetworkResourceGroup         string
        Subnet                       string
        StorageAccount               string
        BlobContainer                string
@@ -50,7 +52,10 @@ type azureInstanceSetConfig struct {
        AdminUsername                string
 }
 
-const tagKeyInstanceSecret = "InstanceSecret"
+type containerWrapper interface {
+       GetBlobReference(name string) *storage.Blob
+       ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
+}
 
 type virtualMachinesClientWrapper interface {
        createOrUpdate(ctx context.Context,
@@ -134,6 +139,25 @@ func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupN
        return r, wrapAzureError(err)
 }
 
+type disksClientWrapper interface {
+       listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
+       delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
+}
+
+type disksClientImpl struct {
+       inner compute.DisksClient
+}
+
+func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
+       r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
+       return r, wrapAzureError(err)
+}
+
+func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
+       r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
+       return r, wrapAzureError(err)
+}
+
 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
 
 type azureRateLimitError struct {
@@ -192,41 +216,47 @@ func wrapAzureError(err error) error {
 }
 
 type azureInstanceSet struct {
-       azconfig          azureInstanceSetConfig
-       vmClient          virtualMachinesClientWrapper
-       netClient         interfacesClientWrapper
-       storageAcctClient storageacct.AccountsClient
-       azureEnv          azure.Environment
-       interfaces        map[string]network.Interface
-       dispatcherID      string
-       namePrefix        string
-       ctx               context.Context
-       stopFunc          context.CancelFunc
-       stopWg            sync.WaitGroup
-       deleteNIC         chan string
-       deleteBlob        chan storage.Blob
-       logger            logrus.FieldLogger
-}
-
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+       azconfig           azureInstanceSetConfig
+       vmClient           virtualMachinesClientWrapper
+       netClient          interfacesClientWrapper
+       disksClient        disksClientWrapper
+       imageResourceGroup string
+       blobcont           containerWrapper
+       azureEnv           azure.Environment
+       interfaces         map[string]network.Interface
+       dispatcherID       string
+       namePrefix         string
+       ctx                context.Context
+       stopFunc           context.CancelFunc
+       stopWg             sync.WaitGroup
+       deleteNIC          chan string
+       deleteBlob         chan storage.Blob
+       deleteDisk         chan compute.Disk
+       logger             logrus.FieldLogger
+}
+
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
        azcfg := azureInstanceSetConfig{}
        err = json.Unmarshal(config, &azcfg)
        if err != nil {
                return nil, err
        }
 
-       ap := azureInstanceSet{logger: logger}
-       err = ap.setup(azcfg, string(dispatcherID))
+       az := azureInstanceSet{logger: logger}
+       az.ctx, az.stopFunc = context.WithCancel(context.Background())
+       err = az.setup(azcfg, string(dispatcherID))
        if err != nil {
+               az.stopFunc()
                return nil, err
        }
-       return &ap, nil
+       return &az, nil
 }
 
 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
        az.azconfig = azcfg
        vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
        netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
+       disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
        storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
 
        az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
@@ -247,16 +277,40 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
 
        vmClient.Authorizer = authorizer
        netClient.Authorizer = authorizer
+       disksClient.Authorizer = authorizer
        storageAcctClient.Authorizer = authorizer
 
        az.vmClient = &virtualMachinesClientImpl{vmClient}
        az.netClient = &interfacesClientImpl{netClient}
-       az.storageAcctClient = storageAcctClient
+       az.disksClient = &disksClientImpl{disksClient}
+
+       az.imageResourceGroup = az.azconfig.ImageResourceGroup
+       if az.imageResourceGroup == "" {
+               az.imageResourceGroup = az.azconfig.ResourceGroup
+       }
+
+       var client storage.Client
+       if az.azconfig.StorageAccount != "" {
+               result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't get account keys")
+                       return err
+               }
+
+               key1 := *(*result.Keys)[0].Value
+               client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't make client")
+                       return err
+               }
+
+               blobsvc := client.GetBlobService()
+               az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+       }
 
        az.dispatcherID = dispatcherID
        az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
 
-       az.ctx, az.stopFunc = context.WithCancel(context.Background())
        go func() {
                az.stopWg.Add(1)
                defer az.stopWg.Done()
@@ -268,13 +322,17 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                tk.Stop()
                                return
                        case <-tk.C:
-                               az.manageBlobs()
+                               if az.blobcont != nil {
+                                       az.manageBlobs()
+                               }
+                               az.manageDisks()
                        }
                }
        }()
 
        az.deleteNIC = make(chan string)
        az.deleteBlob = make(chan storage.Blob)
+       az.deleteDisk = make(chan compute.Disk)
 
        for i := 0; i < 4; i++ {
                go func() {
@@ -305,6 +363,20 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                }
                        }
                }()
+               go func() {
+                       for {
+                               disk, ok := <-az.deleteDisk
+                               if !ok {
+                                       return
+                               }
+                               _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
+                               if err != nil {
+                                       az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
+                               } else {
+                                       az.logger.Printf("Deleted disk %v", *disk.Name)
+                               }
+                       }
+               }()
        }
 
        return nil
@@ -320,6 +392,10 @@ func (az *azureInstanceSet) Create(
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
+       if instanceType.AddedScratch > 0 {
+               return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
+       }
+
        name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
        if err != nil {
                return nil, err
@@ -327,13 +403,15 @@ func (az *azureInstanceSet) Create(
 
        name = az.namePrefix + name
 
-       timestamp := time.Now().Format(time.RFC3339Nano)
-
-       tags := make(map[string]*string)
-       tags["created-at"] = &timestamp
+       tags := map[string]*string{}
        for k, v := range newTags {
-               newstr := v
-               tags["dispatch-"+k] = &newstr
+               tags[k] = to.StringPtr(v)
+       }
+       tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
+
+       networkResourceGroup := az.azconfig.NetworkResourceGroup
+       if networkResourceGroup == "" {
+               networkResourceGroup = az.azconfig.ResourceGroup
        }
 
        nicParameters := network.Interface{
@@ -348,7 +426,7 @@ func (az *azureInstanceSet) Create(
                                                        ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
                                                                "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
                                                                az.azconfig.SubscriptionID,
-                                                               az.azconfig.ResourceGroup,
+                                                               networkResourceGroup,
                                                                az.azconfig.Network,
                                                                az.azconfig.Subnet)),
                                                },
@@ -363,13 +441,45 @@ func (az *azureInstanceSet) Create(
                return nil, wrapAzureError(err)
        }
 
-       instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
-               az.azconfig.StorageAccount,
-               az.azureEnv.StorageEndpointSuffix,
-               az.azconfig.BlobContainer,
-               name)
-
+       blobname := fmt.Sprintf("%s-os.vhd", name)
        customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
+       var storageProfile *compute.StorageProfile
+
+       re := regexp.MustCompile(`^http(s?)://`)
+       if re.MatchString(string(imageID)) {
+               instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
+                       az.azconfig.StorageAccount,
+                       az.azureEnv.StorageEndpointSuffix,
+                       az.azconfig.BlobContainer,
+                       blobname)
+               az.logger.Info("using deprecated VHD image")
+               storageProfile = &compute.StorageProfile{
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                               Image: &compute.VirtualHardDisk{
+                                       URI: to.StringPtr(string(imageID)),
+                               },
+                               Vhd: &compute.VirtualHardDisk{
+                                       URI: &instanceVhd,
+                               },
+                       },
+               }
+       } else {
+               az.logger.Info("using managed image")
+
+               storageProfile = &compute.StorageProfile{
+                       ImageReference: &compute.ImageReference{
+                               ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
+                       },
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                       },
+               }
+       }
 
        vmParameters := compute.VirtualMachine{
                Location: &az.azconfig.Location,
@@ -378,19 +488,7 @@ func (az *azureInstanceSet) Create(
                        HardwareProfile: &compute.HardwareProfile{
                                VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
                        },
-                       StorageProfile: &compute.StorageProfile{
-                               OsDisk: &compute.OSDisk{
-                                       OsType:       compute.Linux,
-                                       Name:         to.StringPtr(name + "-os"),
-                                       CreateOption: compute.FromImage,
-                                       Image: &compute.VirtualHardDisk{
-                                               URI: to.StringPtr(string(imageID)),
-                                       },
-                                       Vhd: &compute.VirtualHardDisk{
-                                               URI: &instanceVhd,
-                                       },
-                               },
-                       },
+                       StorageProfile: storageProfile,
                        NetworkProfile: &compute.NetworkProfile{
                                NetworkInterfaces: &[]compute.NetworkInterfaceReference{
                                        compute.NetworkInterfaceReference{
@@ -422,6 +520,18 @@ func (az *azureInstanceSet) Create(
 
        vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
        if err != nil {
+               if az.blobcont != nil {
+                       _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+                       if delerr != nil {
+                               az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+                       }
+               }
+
+               _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+               if delerr != nil {
+                       az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+               }
+
                return nil, wrapAzureError(err)
        }
 
@@ -446,26 +556,24 @@ func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
                return nil, wrapAzureError(err)
        }
 
-       instances := make([]cloud.Instance, 0)
-
+       var instances []cloud.Instance
        for ; result.NotDone(); err = result.Next() {
                if err != nil {
                        return nil, wrapAzureError(err)
                }
-               if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
-                       instances = append(instances, &azureInstance{
-                               provider: az,
-                               vm:       result.Value(),
-                               nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
-               }
+               instances = append(instances, &azureInstance{
+                       provider: az,
+                       vm:       result.Value(),
+                       nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+               })
        }
        return instances, nil
 }
 
 // ManageNics returns a list of Azure network interface resources.
-// Also performs garbage collection of NICs which have "namePrefix", are
-// not associated with a virtual machine and have a "create-at" time
-// more than DeleteDanglingResourcesAfter (to prevent racing and
+// Also performs garbage collection of NICs which have "namePrefix",
+// are not associated with a virtual machine and have a "created-at"
+// time more than DeleteDanglingResourcesAfter (to prevent racing and
 // deleting newly created NICs) in the past are deleted.
 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
        az.stopWg.Add(1)
@@ -509,27 +617,12 @@ func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
 // leased to a VM) and haven't been modified for
 // DeleteDanglingResourcesAfter seconds.
 func (az *azureInstanceSet) manageBlobs() {
-       result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't get account keys")
-               return
-       }
-
-       key1 := *(*result.Keys)[0].Value
-       client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't make client")
-               return
-       }
-
-       blobsvc := client.GetBlobService()
-       blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
 
        page := storage.ListBlobsParameters{Prefix: az.namePrefix}
        timestamp := time.Now()
 
        for {
-               response, err := blobcont.ListBlobs(page)
+               response, err := az.blobcont.ListBlobs(page)
                if err != nil {
                        az.logger.WithError(err).Warn("Error listing blobs")
                        return
@@ -553,6 +646,41 @@ func (az *azureInstanceSet) manageBlobs() {
        }
 }
 
+// ManageDisks garbage collects managed compute disks (VM disk images) in the
+// configured resource group.  It will delete disks which
+// have "namePrefix", are "available" (which means they are not
+// leased to a VM) and were created more than DeleteDanglingResourcesAfter seconds ago.
+// (Azure provides no modification timestamp on managed disks, there is only a
+// creation timestamp)
+func (az *azureInstanceSet) manageDisks() {
+
+       re := regexp.MustCompile(`^` + az.namePrefix + `.*-os$`)
+       timestamp := time.Now()
+
+       for {
+               response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Error listing disks")
+                       return
+               }
+               for _, d := range response.Values() {
+                       age := timestamp.Sub(d.DiskProperties.TimeCreated.ToTime())
+                       if d.DiskProperties.DiskState == "Unattached" &&
+                               re.MatchString(*d.Name) &&
+                               age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
+
+                               az.logger.Printf("Disk %v is unlocked and not modified for %v seconds, will delete", *d.Name, age.Seconds())
+                               az.deleteDisk <- d
+                       }
+               }
+               if response.Values() != nil {
+                       response.Next()
+               } else {
+                       break
+               }
+       }
+}
+
 func (az *azureInstanceSet) Stop() {
        az.stopFunc()
        az.stopWg.Wait()
@@ -582,16 +710,12 @@ func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
        ai.provider.stopWg.Add(1)
        defer ai.provider.stopWg.Done()
 
-       tags := make(map[string]*string)
-
+       tags := map[string]*string{}
        for k, v := range ai.vm.Tags {
-               if !strings.HasPrefix(k, "dispatch-") {
-                       tags[k] = v
-               }
+               tags[k] = v
        }
        for k, v := range newTags {
-               newstr := v
-               tags["dispatch-"+k] = &newstr
+               tags[k] = to.StringPtr(v)
        }
 
        vmParameters := compute.VirtualMachine{
@@ -608,14 +732,10 @@ func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
 }
 
 func (ai *azureInstance) Tags() cloud.InstanceTags {
-       tags := make(map[string]string)
-
+       tags := cloud.InstanceTags{}
        for k, v := range ai.vm.Tags {
-               if strings.HasPrefix(k, "dispatch-") {
-                       tags[k[9:]] = *v
-               }
+               tags[k] = *v
        }
-
        return tags
 }
 
@@ -628,7 +748,17 @@ func (ai *azureInstance) Destroy() error {
 }
 
 func (ai *azureInstance) Address() string {
-       return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
+       if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
+               return ""
+       } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
+               return ""
+       } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
+               return ""
+       } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
+               return ""
+       } else {
+               return *addr
+       }
 }
 
 func (ai *azureInstance) RemoteUser() string {