16625: a-d-c: add support for Azure managed images. Update our packer
[arvados.git] / lib / cloud / azure / azure.go
index ac7ff14cc2539ff7c1305fc7df393c7e36d0a795..b448bddd701180b00b78fa308f80791d35b6c084 100644 (file)
@@ -16,9 +16,9 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
        "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
        storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
        "github.com/Azure/azure-sdk-for-go/storage"
@@ -41,8 +41,10 @@ type azureInstanceSetConfig struct {
        TenantID                     string
        CloudEnvironment             string
        ResourceGroup                string
+       ImageResourceGroup           string
        Location                     string
        Network                      string
+       NetworkResourceGroup         string
        Subnet                       string
        StorageAccount               string
        BlobContainer                string
@@ -50,8 +52,6 @@ type azureInstanceSetConfig struct {
        AdminUsername                string
 }
 
-const tagKeyInstanceSecret = "InstanceSecret"
-
 type containerWrapper interface {
        GetBlobReference(name string) *storage.Blob
        ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
@@ -139,6 +139,25 @@ func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupN
        return r, wrapAzureError(err)
 }
 
+type disksClientWrapper interface {
+       listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
+       delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
+}
+
+type disksClientImpl struct {
+       inner compute.DisksClient
+}
+
+func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
+       r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
+       return r, wrapAzureError(err)
+}
+
+func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
+       r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
+       return r, wrapAzureError(err)
+}
+
 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
 
 type azureRateLimitError struct {
@@ -197,23 +216,26 @@ func wrapAzureError(err error) error {
 }
 
 type azureInstanceSet struct {
-       azconfig     azureInstanceSetConfig
-       vmClient     virtualMachinesClientWrapper
-       netClient    interfacesClientWrapper
-       blobcont     containerWrapper
-       azureEnv     azure.Environment
-       interfaces   map[string]network.Interface
-       dispatcherID string
-       namePrefix   string
-       ctx          context.Context
-       stopFunc     context.CancelFunc
-       stopWg       sync.WaitGroup
-       deleteNIC    chan string
-       deleteBlob   chan storage.Blob
-       logger       logrus.FieldLogger
-}
-
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+       azconfig           azureInstanceSetConfig
+       vmClient           virtualMachinesClientWrapper
+       netClient          interfacesClientWrapper
+       disksClient        disksClientWrapper
+       imageResourceGroup string
+       blobcont           containerWrapper
+       azureEnv           azure.Environment
+       interfaces         map[string]network.Interface
+       dispatcherID       string
+       namePrefix         string
+       ctx                context.Context
+       stopFunc           context.CancelFunc
+       stopWg             sync.WaitGroup
+       deleteNIC          chan string
+       deleteBlob         chan storage.Blob
+       deleteDisk         chan compute.Disk
+       logger             logrus.FieldLogger
+}
+
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
        azcfg := azureInstanceSetConfig{}
        err = json.Unmarshal(config, &azcfg)
        if err != nil {
@@ -234,6 +256,7 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
        az.azconfig = azcfg
        vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
        netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
+       disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
        storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
 
        az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
@@ -254,26 +277,36 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
 
        vmClient.Authorizer = authorizer
        netClient.Authorizer = authorizer
+       disksClient.Authorizer = authorizer
        storageAcctClient.Authorizer = authorizer
 
        az.vmClient = &virtualMachinesClientImpl{vmClient}
        az.netClient = &interfacesClientImpl{netClient}
+       az.disksClient = &disksClientImpl{disksClient}
 
-       result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't get account keys")
-               return err
+       az.imageResourceGroup = az.azconfig.ImageResourceGroup
+       if az.imageResourceGroup == "" {
+               az.imageResourceGroup = az.azconfig.ResourceGroup
        }
 
-       key1 := *(*result.Keys)[0].Value
-       client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't make client")
-               return err
-       }
+       var client storage.Client
+       if az.azconfig.StorageAccount != "" {
+               result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't get account keys")
+                       return err
+               }
+
+               key1 := *(*result.Keys)[0].Value
+               client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't make client")
+                       return err
+               }
 
-       blobsvc := client.GetBlobService()
-       az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+               blobsvc := client.GetBlobService()
+               az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+       }
 
        az.dispatcherID = dispatcherID
        az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
@@ -289,13 +322,17 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                tk.Stop()
                                return
                        case <-tk.C:
-                               az.manageBlobs()
+                               if az.blobcont != nil {
+                                       az.manageBlobs()
+                               }
+                               az.manageDisks()
                        }
                }
        }()
 
        az.deleteNIC = make(chan string)
        az.deleteBlob = make(chan storage.Blob)
+       az.deleteDisk = make(chan compute.Disk)
 
        for i := 0; i < 4; i++ {
                go func() {
@@ -326,6 +363,20 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                }
                        }
                }()
+               go func() {
+                       for {
+                               disk, ok := <-az.deleteDisk
+                               if !ok {
+                                       return
+                               }
+                               _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
+                               if err != nil {
+                                       az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
+                               } else {
+                                       az.logger.Printf("Deleted disk %v", *disk.Name)
+                               }
+                       }
+               }()
        }
 
        return nil
@@ -352,13 +403,15 @@ func (az *azureInstanceSet) Create(
 
        name = az.namePrefix + name
 
-       timestamp := time.Now().Format(time.RFC3339Nano)
-
-       tags := make(map[string]*string)
-       tags["created-at"] = &timestamp
+       tags := map[string]*string{}
        for k, v := range newTags {
-               newstr := v
-               tags["dispatch-"+k] = &newstr
+               tags[k] = to.StringPtr(v)
+       }
+       tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
+
+       networkResourceGroup := az.azconfig.NetworkResourceGroup
+       if networkResourceGroup == "" {
+               networkResourceGroup = az.azconfig.ResourceGroup
        }
 
        nicParameters := network.Interface{
@@ -373,7 +426,7 @@ func (az *azureInstanceSet) Create(
                                                        ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
                                                                "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
                                                                az.azconfig.SubscriptionID,
-                                                               az.azconfig.ResourceGroup,
+                                                               networkResourceGroup,
                                                                az.azconfig.Network,
                                                                az.azconfig.Subnet)),
                                                },
@@ -389,13 +442,44 @@ func (az *azureInstanceSet) Create(
        }
 
        blobname := fmt.Sprintf("%s-os.vhd", name)
-       instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
-               az.azconfig.StorageAccount,
-               az.azureEnv.StorageEndpointSuffix,
-               az.azconfig.BlobContainer,
-               blobname)
-
        customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
+       var storageProfile *compute.StorageProfile
+
+       re := regexp.MustCompile(`^http(s?)://`)
+       if re.MatchString(string(imageID)) {
+               instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
+                       az.azconfig.StorageAccount,
+                       az.azureEnv.StorageEndpointSuffix,
+                       az.azconfig.BlobContainer,
+                       blobname)
+               az.logger.Info("using deprecated VHD image")
+               storageProfile = &compute.StorageProfile{
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                               Image: &compute.VirtualHardDisk{
+                                       URI: to.StringPtr(string(imageID)),
+                               },
+                               Vhd: &compute.VirtualHardDisk{
+                                       URI: &instanceVhd,
+                               },
+                       },
+               }
+       } else {
+               az.logger.Info("using managed image")
+
+               storageProfile = &compute.StorageProfile{
+                       ImageReference: &compute.ImageReference{
+                               ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
+                       },
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                       },
+               }
+       }
 
        vmParameters := compute.VirtualMachine{
                Location: &az.azconfig.Location,
@@ -404,19 +488,7 @@ func (az *azureInstanceSet) Create(
                        HardwareProfile: &compute.HardwareProfile{
                                VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
                        },
-                       StorageProfile: &compute.StorageProfile{
-                               OsDisk: &compute.OSDisk{
-                                       OsType:       compute.Linux,
-                                       Name:         to.StringPtr(name + "-os"),
-                                       CreateOption: compute.FromImage,
-                                       Image: &compute.VirtualHardDisk{
-                                               URI: to.StringPtr(string(imageID)),
-                                       },
-                                       Vhd: &compute.VirtualHardDisk{
-                                               URI: &instanceVhd,
-                                       },
-                               },
-                       },
+                       StorageProfile: storageProfile,
                        NetworkProfile: &compute.NetworkProfile{
                                NetworkInterfaces: &[]compute.NetworkInterfaceReference{
                                        compute.NetworkInterfaceReference{
@@ -448,12 +520,14 @@ func (az *azureInstanceSet) Create(
 
        vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
        if err != nil {
-               _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
-               if delerr != nil {
-                       az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+               if az.blobcont != nil {
+                       _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+                       if delerr != nil {
+                               az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+                       }
                }
 
-               _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+               _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
                if delerr != nil {
                        az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
                }
@@ -482,26 +556,24 @@ func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
                return nil, wrapAzureError(err)
        }
 
-       instances := make([]cloud.Instance, 0)
-
+       var instances []cloud.Instance
        for ; result.NotDone(); err = result.Next() {
                if err != nil {
                        return nil, wrapAzureError(err)
                }
-               if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
-                       instances = append(instances, &azureInstance{
-                               provider: az,
-                               vm:       result.Value(),
-                               nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
-               }
+               instances = append(instances, &azureInstance{
+                       provider: az,
+                       vm:       result.Value(),
+                       nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+               })
        }
        return instances, nil
 }
 
 // ManageNics returns a list of Azure network interface resources.
-// Also performs garbage collection of NICs which have "namePrefix", are
-// not associated with a virtual machine and have a "create-at" time
-// more than DeleteDanglingResourcesAfter (to prevent racing and
+// Also performs garbage collection of NICs which have "namePrefix",
+// are not associated with a virtual machine and have a "created-at"
+// time more than DeleteDanglingResourcesAfter (to prevent racing and
 // deleting newly created NICs) in the past are deleted.
 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
        az.stopWg.Add(1)
@@ -574,6 +646,41 @@ func (az *azureInstanceSet) manageBlobs() {
        }
 }
 
+// ManageDisks garbage collects managed compute disks (VM disk images) in the
+// configured resource group.  It will delete disks which
+// have "namePrefix", are "available" (which means they are not
+// leased to a VM) and were created more than DeleteDanglingResourcesAfter seconds ago.
+// (Azure provides no modification timestamp on managed disks, there is only a
+// creation timestamp)
+func (az *azureInstanceSet) manageDisks() {
+
+       re := regexp.MustCompile(`^` + az.namePrefix + `.*-os$`)
+       timestamp := time.Now()
+
+       for {
+               response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Error listing disks")
+                       return
+               }
+               for _, d := range response.Values() {
+                       age := timestamp.Sub(d.DiskProperties.TimeCreated.ToTime())
+                       if d.DiskProperties.DiskState == "Unattached" &&
+                               re.MatchString(*d.Name) &&
+                               age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
+
+                               az.logger.Printf("Disk %v is unlocked and not modified for %v seconds, will delete", *d.Name, age.Seconds())
+                               az.deleteDisk <- d
+                       }
+               }
+               if response.Values() != nil {
+                       response.Next()
+               } else {
+                       break
+               }
+       }
+}
+
 func (az *azureInstanceSet) Stop() {
        az.stopFunc()
        az.stopWg.Wait()
@@ -603,16 +710,12 @@ func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
        ai.provider.stopWg.Add(1)
        defer ai.provider.stopWg.Done()
 
-       tags := make(map[string]*string)
-
+       tags := map[string]*string{}
        for k, v := range ai.vm.Tags {
-               if !strings.HasPrefix(k, "dispatch-") {
-                       tags[k] = v
-               }
+               tags[k] = v
        }
        for k, v := range newTags {
-               newstr := v
-               tags["dispatch-"+k] = &newstr
+               tags[k] = to.StringPtr(v)
        }
 
        vmParameters := compute.VirtualMachine{
@@ -629,14 +732,10 @@ func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
 }
 
 func (ai *azureInstance) Tags() cloud.InstanceTags {
-       tags := make(map[string]string)
-
+       tags := cloud.InstanceTags{}
        for k, v := range ai.vm.Tags {
-               if strings.HasPrefix(k, "dispatch-") {
-                       tags[k[9:]] = *v
-               }
+               tags[k] = *v
        }
-
        return tags
 }