Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / cloud / azure / azure.go
index 6de367aa251c4c034b77331befde540782dc89d5..71f2a23dc963baa747f173d446135175754ffeec 100644 (file)
@@ -8,6 +8,7 @@ import (
        "context"
        "encoding/base64"
        "encoding/json"
+       "errors"
        "fmt"
        "net/http"
        "regexp"
@@ -18,7 +19,7 @@ import (
 
        "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+       "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
        "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
        storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
        "github.com/Azure/azure-sdk-for-go/storage"
@@ -27,6 +28,7 @@ import (
        "github.com/Azure/go-autorest/autorest/azure/auth"
        "github.com/Azure/go-autorest/autorest/to"
        "github.com/jmcvetta/randutil"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
@@ -35,20 +37,23 @@ import (
 var Driver = cloud.DriverFunc(newAzureInstanceSet)
 
 type azureInstanceSetConfig struct {
-       SubscriptionID               string
-       ClientID                     string
-       ClientSecret                 string
-       TenantID                     string
-       CloudEnvironment             string
-       ResourceGroup                string
-       Location                     string
-       Network                      string
-       NetworkResourceGroup         string
-       Subnet                       string
-       StorageAccount               string
-       BlobContainer                string
-       DeleteDanglingResourcesAfter arvados.Duration
-       AdminUsername                string
+       SubscriptionID                 string
+       ClientID                       string
+       ClientSecret                   string
+       TenantID                       string
+       CloudEnvironment               string
+       ResourceGroup                  string
+       ImageResourceGroup             string
+       Location                       string
+       Network                        string
+       NetworkResourceGroup           string
+       Subnet                         string
+       StorageAccount                 string
+       BlobContainer                  string
+       SharedImageGalleryName         string
+       SharedImageGalleryImageVersion string
+       DeleteDanglingResourcesAfter   arvados.Duration
+       AdminUsername                  string
 }
 
 type containerWrapper interface {
@@ -138,6 +143,25 @@ func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupN
        return r, wrapAzureError(err)
 }
 
+type disksClientWrapper interface {
+       listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
+       delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
+}
+
+type disksClientImpl struct {
+       inner compute.DisksClient
+}
+
+func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
+       r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
+       return r, wrapAzureError(err)
+}
+
+func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
+       r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
+       return r, wrapAzureError(err)
+}
+
 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
 
 type azureRateLimitError struct {
@@ -196,23 +220,26 @@ func wrapAzureError(err error) error {
 }
 
 type azureInstanceSet struct {
-       azconfig     azureInstanceSetConfig
-       vmClient     virtualMachinesClientWrapper
-       netClient    interfacesClientWrapper
-       blobcont     containerWrapper
-       azureEnv     azure.Environment
-       interfaces   map[string]network.Interface
-       dispatcherID string
-       namePrefix   string
-       ctx          context.Context
-       stopFunc     context.CancelFunc
-       stopWg       sync.WaitGroup
-       deleteNIC    chan string
-       deleteBlob   chan storage.Blob
-       logger       logrus.FieldLogger
-}
-
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+       azconfig           azureInstanceSetConfig
+       vmClient           virtualMachinesClientWrapper
+       netClient          interfacesClientWrapper
+       disksClient        disksClientWrapper
+       imageResourceGroup string
+       blobcont           containerWrapper
+       azureEnv           azure.Environment
+       interfaces         map[string]network.Interface
+       dispatcherID       string
+       namePrefix         string
+       ctx                context.Context
+       stopFunc           context.CancelFunc
+       stopWg             sync.WaitGroup
+       deleteNIC          chan string
+       deleteBlob         chan storage.Blob
+       deleteDisk         chan compute.Disk
+       logger             logrus.FieldLogger
+}
+
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
        azcfg := azureInstanceSetConfig{}
        err = json.Unmarshal(config, &azcfg)
        if err != nil {
@@ -233,6 +260,7 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
        az.azconfig = azcfg
        vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
        netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
+       disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
        storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
 
        az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
@@ -253,26 +281,38 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
 
        vmClient.Authorizer = authorizer
        netClient.Authorizer = authorizer
+       disksClient.Authorizer = authorizer
        storageAcctClient.Authorizer = authorizer
 
        az.vmClient = &virtualMachinesClientImpl{vmClient}
        az.netClient = &interfacesClientImpl{netClient}
+       az.disksClient = &disksClientImpl{disksClient}
 
-       result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't get account keys")
-               return err
+       az.imageResourceGroup = az.azconfig.ImageResourceGroup
+       if az.imageResourceGroup == "" {
+               az.imageResourceGroup = az.azconfig.ResourceGroup
        }
 
-       key1 := *(*result.Keys)[0].Value
-       client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
-       if err != nil {
-               az.logger.WithError(err).Warn("Couldn't make client")
-               return err
-       }
+       var client storage.Client
+       if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
+               result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't get account keys")
+                       return err
+               }
 
-       blobsvc := client.GetBlobService()
-       az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+               key1 := *(*result.Keys)[0].Value
+               client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+               if err != nil {
+                       az.logger.WithError(err).Warn("Couldn't make client")
+                       return err
+               }
+
+               blobsvc := client.GetBlobService()
+               az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+       } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
+               az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
+       }
 
        az.dispatcherID = dispatcherID
        az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
@@ -288,21 +328,21 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                tk.Stop()
                                return
                        case <-tk.C:
-                               az.manageBlobs()
+                               if az.blobcont != nil {
+                                       az.manageBlobs()
+                               }
+                               az.manageDisks()
                        }
                }
        }()
 
        az.deleteNIC = make(chan string)
        az.deleteBlob = make(chan storage.Blob)
+       az.deleteDisk = make(chan compute.Disk)
 
        for i := 0; i < 4; i++ {
                go func() {
-                       for {
-                               nicname, ok := <-az.deleteNIC
-                               if !ok {
-                                       return
-                               }
+                       for nicname := range az.deleteNIC {
                                _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
                                if delerr != nil {
                                        az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
@@ -312,11 +352,7 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                        }
                }()
                go func() {
-                       for {
-                               blob, ok := <-az.deleteBlob
-                               if !ok {
-                                       return
-                               }
+                       for blob := range az.deleteBlob {
                                err := blob.Delete(nil)
                                if err != nil {
                                        az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
@@ -325,11 +361,28 @@ func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID str
                                }
                        }
                }()
+               go func() {
+                       for disk := range az.deleteDisk {
+                               _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
+                               if err != nil {
+                                       az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
+                               } else {
+                                       az.logger.Printf("Deleted disk %v", *disk.Name)
+                               }
+                       }
+               }()
        }
 
        return nil
 }
 
+func (az *azureInstanceSet) cleanupNic(nic network.Interface) {
+       _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+       if delerr != nil {
+               az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+       }
+}
+
 func (az *azureInstanceSet) Create(
        instanceType arvados.InstanceType,
        imageID cloud.ImageID,
@@ -367,7 +420,7 @@ func (az *azureInstanceSet) Create(
                Tags:     tags,
                InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
                        IPConfigurations: &[]network.InterfaceIPConfiguration{
-                               network.InterfaceIPConfiguration{
+                               {
                                        Name: to.StringPtr("ip1"),
                                        InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
                                                Subnet: &network.Subnet{
@@ -389,14 +442,55 @@ func (az *azureInstanceSet) Create(
                return nil, wrapAzureError(err)
        }
 
-       blobname := fmt.Sprintf("%s-os.vhd", name)
-       instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
-               az.azconfig.StorageAccount,
-               az.azureEnv.StorageEndpointSuffix,
-               az.azconfig.BlobContainer,
-               blobname)
-
+       var blobname string
        customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
+       var storageProfile *compute.StorageProfile
+
+       re := regexp.MustCompile(`^http(s?)://`)
+       if re.MatchString(string(imageID)) {
+               if az.blobcont == nil {
+                       az.cleanupNic(nic)
+                       return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
+               }
+               blobname = fmt.Sprintf("%s-os.vhd", name)
+               instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
+                       az.azconfig.StorageAccount,
+                       az.azureEnv.StorageEndpointSuffix,
+                       az.azconfig.BlobContainer,
+                       blobname)
+               az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
+               storageProfile = &compute.StorageProfile{
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                               Image: &compute.VirtualHardDisk{
+                                       URI: to.StringPtr(string(imageID)),
+                               },
+                               Vhd: &compute.VirtualHardDisk{
+                                       URI: &instanceVhd,
+                               },
+                       },
+               }
+       } else {
+               id := to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID))
+               if az.azconfig.SharedImageGalleryName != "" && az.azconfig.SharedImageGalleryImageVersion != "" {
+                       id = to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/galleries/" + az.azconfig.SharedImageGalleryName + "/images/" + string(imageID) + "/versions/" + az.azconfig.SharedImageGalleryImageVersion)
+               } else if az.azconfig.SharedImageGalleryName != "" || az.azconfig.SharedImageGalleryImageVersion != "" {
+                       az.cleanupNic(nic)
+                       return nil, wrapAzureError(errors.New("Invalid configuration: SharedImageGalleryName and SharedImageGalleryImageVersion must both be set or both be empty"))
+               }
+               storageProfile = &compute.StorageProfile{
+                       ImageReference: &compute.ImageReference{
+                               ID: id,
+                       },
+                       OsDisk: &compute.OSDisk{
+                               OsType:       compute.Linux,
+                               Name:         to.StringPtr(name + "-os"),
+                               CreateOption: compute.DiskCreateOptionTypesFromImage,
+                       },
+               }
+       }
 
        vmParameters := compute.VirtualMachine{
                Location: &az.azconfig.Location,
@@ -405,22 +499,10 @@ func (az *azureInstanceSet) Create(
                        HardwareProfile: &compute.HardwareProfile{
                                VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
                        },
-                       StorageProfile: &compute.StorageProfile{
-                               OsDisk: &compute.OSDisk{
-                                       OsType:       compute.Linux,
-                                       Name:         to.StringPtr(name + "-os"),
-                                       CreateOption: compute.FromImage,
-                                       Image: &compute.VirtualHardDisk{
-                                               URI: to.StringPtr(string(imageID)),
-                                       },
-                                       Vhd: &compute.VirtualHardDisk{
-                                               URI: &instanceVhd,
-                                       },
-                               },
-                       },
+                       StorageProfile: storageProfile,
                        NetworkProfile: &compute.NetworkProfile{
                                NetworkInterfaces: &[]compute.NetworkInterfaceReference{
-                                       compute.NetworkInterfaceReference{
+                                       {
                                                ID: nic.ID,
                                                NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
                                                        Primary: to.BoolPtr(true),
@@ -433,31 +515,51 @@ func (az *azureInstanceSet) Create(
                                AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
                                LinuxConfiguration: &compute.LinuxConfiguration{
                                        DisablePasswordAuthentication: to.BoolPtr(true),
-                                       SSH: &compute.SSHConfiguration{
-                                               PublicKeys: &[]compute.SSHPublicKey{
-                                                       {
-                                                               Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
-                                                               KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
-                                                       },
-                                               },
-                                       },
                                },
                                CustomData: &customData,
                        },
                },
        }
 
+       if publicKey != nil {
+               vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH = &compute.SSHConfiguration{
+                       PublicKeys: &[]compute.SSHPublicKey{
+                               {
+                                       Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
+                                       KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
+                               },
+                       },
+               }
+       }
+
+       if instanceType.Preemptible {
+               // Setting maxPrice to -1 is the equivalent of paying spot price, up to the
+               // normal price. This means the node will not be pre-empted for price
+               // reasons. It may still be pre-empted for capacity reasons though. And
+               // Azure offers *no* SLA on spot instances.
+               var maxPrice float64 = -1
+               vmParameters.VirtualMachineProperties.Priority = compute.Spot
+               vmParameters.VirtualMachineProperties.EvictionPolicy = compute.Delete
+               vmParameters.VirtualMachineProperties.BillingProfile = &compute.BillingProfile{MaxPrice: &maxPrice}
+       }
+
        vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
        if err != nil {
-               _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
-               if delerr != nil {
-                       az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+               // Do some cleanup. Otherwise, an unbounded number of new unused nics and
+               // blobs can pile up during times when VMs can't be created and the
+               // dispatcher keeps retrying, because the garbage collection in manageBlobs
+               // and manageNics is only triggered periodically. This is most important
+               // for nics, because those are subject to a quota.
+               az.cleanupNic(nic)
+
+               if blobname != "" {
+                       _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+                       if delerr != nil {
+                               az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+                       }
                }
 
-               _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
-               if delerr != nil {
-                       az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
-               }
+               // Leave cleaning up of managed disks to the garbage collection in manageDisks()
 
                return nil, wrapAzureError(err)
        }
@@ -497,7 +599,7 @@ func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, err
        return instances, nil
 }
 
-// ManageNics returns a list of Azure network interface resources.
+// manageNics returns a list of Azure network interface resources.
 // Also performs garbage collection of NICs which have "namePrefix",
 // are not associated with a virtual machine and have a "created-at"
 // time more than DeleteDanglingResourcesAfter (to prevent racing and
@@ -538,7 +640,7 @@ func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
        return interfaces, nil
 }
 
-// ManageBlobs garbage collects blobs (VM disk images) in the
+// manageBlobs garbage collects blobs (VM disk images) in the
 // configured storage account container.  It will delete blobs which
 // have "namePrefix", are "available" (which means they are not
 // leased to a VM) and haven't been modified for
@@ -573,11 +675,45 @@ func (az *azureInstanceSet) manageBlobs() {
        }
 }
 
+// manageDisks garbage collects managed compute disks (VM disk images) in the
+// configured resource group.  It will delete disks which have "namePrefix",
+// are "unattached" (which means they are not leased to a VM) and were created
+// more than DeleteDanglingResourcesAfter seconds ago.  (Azure provides no
+// modification timestamp on managed disks, there is only a creation timestamp)
+func (az *azureInstanceSet) manageDisks() {
+
+       re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
+       threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
+
+       response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
+       if err != nil {
+               az.logger.WithError(err).Warn("Error listing disks")
+               return
+       }
+
+       for ; response.NotDone(); err = response.Next() {
+               if err != nil {
+                       az.logger.WithError(err).Warn("Error getting next page of disks")
+                       return
+               }
+               for _, d := range response.Values() {
+                       if d.DiskProperties.DiskState == compute.Unattached &&
+                               d.Name != nil && re.MatchString(*d.Name) &&
+                               d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
+
+                               az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
+                               az.deleteDisk <- d
+                       }
+               }
+       }
+}
+
 func (az *azureInstanceSet) Stop() {
        az.stopFunc()
        az.stopWg.Wait()
        close(az.deleteNIC)
        close(az.deleteBlob)
+       close(az.deleteDisk)
 }
 
 type azureInstance struct {
@@ -653,6 +789,10 @@ func (ai *azureInstance) Address() string {
        }
 }
 
+func (ai *azureInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
+       return nil
+}
+
 func (ai *azureInstance) RemoteUser() string {
        return ai.provider.azconfig.AdminUsername
 }