"context"
"encoding/base64"
"encoding/json"
+ "errors"
"fmt"
"net/http"
"regexp"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
var Driver = cloud.DriverFunc(newAzureInstanceSet)
type azureInstanceSetConfig struct {
- SubscriptionID string
- ClientID string
- ClientSecret string
- TenantID string
- CloudEnvironment string
- ResourceGroup string
- Location string
- Network string
- Subnet string
- StorageAccount string
- BlobContainer string
- DeleteDanglingResourcesAfter arvados.Duration
- AdminUsername string
-}
-
-const tagKeyInstanceSecret = "InstanceSecret"
+ SubscriptionID string
+ ClientID string
+ ClientSecret string
+ TenantID string
+ CloudEnvironment string
+ ResourceGroup string
+ ImageResourceGroup string
+ Location string
+ Network string
+ NetworkResourceGroup string
+ Subnet string
+ StorageAccount string
+ BlobContainer string
+ SharedImageGalleryName string
+ SharedImageGalleryImageVersion string
+ DeleteDanglingResourcesAfter arvados.Duration
+ AdminUsername string
+}
type containerWrapper interface {
GetBlobReference(name string) *storage.Blob
return r, wrapAzureError(err)
}
+type disksClientWrapper interface {
+ listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
+ delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
+}
+
+type disksClientImpl struct {
+ inner compute.DisksClient
+}
+
+func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
+ r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
+ return r, wrapAzureError(err)
+}
+
+func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
+ r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
+ return r, wrapAzureError(err)
+}
+
var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
type azureRateLimitError struct {
}
type azureInstanceSet struct {
- azconfig azureInstanceSetConfig
- vmClient virtualMachinesClientWrapper
- netClient interfacesClientWrapper
- blobcont containerWrapper
- azureEnv azure.Environment
- interfaces map[string]network.Interface
- dispatcherID string
- namePrefix string
- ctx context.Context
- stopFunc context.CancelFunc
- stopWg sync.WaitGroup
- deleteNIC chan string
- deleteBlob chan storage.Blob
- logger logrus.FieldLogger
-}
-
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+ azconfig azureInstanceSetConfig
+ vmClient virtualMachinesClientWrapper
+ netClient interfacesClientWrapper
+ disksClient disksClientWrapper
+ imageResourceGroup string
+ blobcont containerWrapper
+ azureEnv azure.Environment
+ interfaces map[string]network.Interface
+ dispatcherID string
+ namePrefix string
+ ctx context.Context
+ stopFunc context.CancelFunc
+ stopWg sync.WaitGroup
+ deleteNIC chan string
+ deleteBlob chan storage.Blob
+ deleteDisk chan compute.Disk
+ logger logrus.FieldLogger
+}
+
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
azcfg := azureInstanceSetConfig{}
err = json.Unmarshal(config, &azcfg)
if err != nil {
az.azconfig = azcfg
vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
+ disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
vmClient.Authorizer = authorizer
netClient.Authorizer = authorizer
+ disksClient.Authorizer = authorizer
storageAcctClient.Authorizer = authorizer
az.vmClient = &virtualMachinesClientImpl{vmClient}
az.netClient = &interfacesClientImpl{netClient}
+ az.disksClient = &disksClientImpl{disksClient}
- result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't get account keys")
- return err
+ az.imageResourceGroup = az.azconfig.ImageResourceGroup
+ if az.imageResourceGroup == "" {
+ az.imageResourceGroup = az.azconfig.ResourceGroup
}
- key1 := *(*result.Keys)[0].Value
- client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't make client")
- return err
- }
+ var client storage.Client
+ if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
+ result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't get account keys")
+ return err
+ }
- blobsvc := client.GetBlobService()
- az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+ key1 := *(*result.Keys)[0].Value
+ client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't make client")
+ return err
+ }
+
+ blobsvc := client.GetBlobService()
+ az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+ } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
+ az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
+ }
az.dispatcherID = dispatcherID
az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
tk.Stop()
return
case <-tk.C:
- az.manageBlobs()
+ if az.blobcont != nil {
+ az.manageBlobs()
+ }
+ az.manageDisks()
}
}
}()
az.deleteNIC = make(chan string)
az.deleteBlob = make(chan storage.Blob)
+ az.deleteDisk = make(chan compute.Disk)
for i := 0; i < 4; i++ {
go func() {
- for {
- nicname, ok := <-az.deleteNIC
- if !ok {
- return
- }
+ for nicname := range az.deleteNIC {
_, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
if delerr != nil {
az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
}
}()
go func() {
- for {
- blob, ok := <-az.deleteBlob
- if !ok {
- return
- }
+ for blob := range az.deleteBlob {
err := blob.Delete(nil)
if err != nil {
az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
}
}
}()
+ go func() {
+ for disk := range az.deleteDisk {
+ _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
+ if err != nil {
+ az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
+ } else {
+ az.logger.Printf("Deleted disk %v", *disk.Name)
+ }
+ }
+ }()
}
return nil
}
+func (az *azureInstanceSet) cleanupNic(nic network.Interface) {
+ _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+ }
+}
+
func (az *azureInstanceSet) Create(
instanceType arvados.InstanceType,
imageID cloud.ImageID,
name = az.namePrefix + name
- timestamp := time.Now().Format(time.RFC3339Nano)
-
- tags := make(map[string]*string)
- tags["created-at"] = ×tamp
+ tags := map[string]*string{}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
+ }
+ tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
+
+ networkResourceGroup := az.azconfig.NetworkResourceGroup
+ if networkResourceGroup == "" {
+ networkResourceGroup = az.azconfig.ResourceGroup
}
nicParameters := network.Interface{
Tags: tags,
InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
IPConfigurations: &[]network.InterfaceIPConfiguration{
- network.InterfaceIPConfiguration{
+ {
Name: to.StringPtr("ip1"),
InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
Subnet: &network.Subnet{
ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
"/Microsoft.Network/virtualnetworks/%s/subnets/%s",
az.azconfig.SubscriptionID,
- az.azconfig.ResourceGroup,
+ networkResourceGroup,
az.azconfig.Network,
az.azconfig.Subnet)),
},
return nil, wrapAzureError(err)
}
- blobname := fmt.Sprintf("%s-os.vhd", name)
- instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
- az.azconfig.StorageAccount,
- az.azureEnv.StorageEndpointSuffix,
- az.azconfig.BlobContainer,
- blobname)
-
+ var blobname string
customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
+ var storageProfile *compute.StorageProfile
+
+ re := regexp.MustCompile(`^http(s?)://`)
+ if re.MatchString(string(imageID)) {
+ if az.blobcont == nil {
+ az.cleanupNic(nic)
+ return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
+ }
+ blobname = fmt.Sprintf("%s-os.vhd", name)
+ instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
+ az.azconfig.StorageAccount,
+ az.azureEnv.StorageEndpointSuffix,
+ az.azconfig.BlobContainer,
+ blobname)
+ az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
+ storageProfile = &compute.StorageProfile{
+ OsDisk: &compute.OSDisk{
+ OsType: compute.Linux,
+ Name: to.StringPtr(name + "-os"),
+ CreateOption: compute.DiskCreateOptionTypesFromImage,
+ Image: &compute.VirtualHardDisk{
+ URI: to.StringPtr(string(imageID)),
+ },
+ Vhd: &compute.VirtualHardDisk{
+ URI: &instanceVhd,
+ },
+ },
+ }
+ } else {
+ id := to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID))
+ if az.azconfig.SharedImageGalleryName != "" && az.azconfig.SharedImageGalleryImageVersion != "" {
+ id = to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/galleries/" + az.azconfig.SharedImageGalleryName + "/images/" + string(imageID) + "/versions/" + az.azconfig.SharedImageGalleryImageVersion)
+ } else if az.azconfig.SharedImageGalleryName != "" || az.azconfig.SharedImageGalleryImageVersion != "" {
+ az.cleanupNic(nic)
+ return nil, wrapAzureError(errors.New("Invalid configuration: SharedImageGalleryName and SharedImageGalleryImageVersion must both be set or both be empty"))
+ }
+ storageProfile = &compute.StorageProfile{
+ ImageReference: &compute.ImageReference{
+ ID: id,
+ },
+ OsDisk: &compute.OSDisk{
+ OsType: compute.Linux,
+ Name: to.StringPtr(name + "-os"),
+ CreateOption: compute.DiskCreateOptionTypesFromImage,
+ },
+ }
+ }
vmParameters := compute.VirtualMachine{
Location: &az.azconfig.Location,
HardwareProfile: &compute.HardwareProfile{
VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
},
- StorageProfile: &compute.StorageProfile{
- OsDisk: &compute.OSDisk{
- OsType: compute.Linux,
- Name: to.StringPtr(name + "-os"),
- CreateOption: compute.FromImage,
- Image: &compute.VirtualHardDisk{
- URI: to.StringPtr(string(imageID)),
- },
- Vhd: &compute.VirtualHardDisk{
- URI: &instanceVhd,
- },
- },
- },
+ StorageProfile: storageProfile,
NetworkProfile: &compute.NetworkProfile{
NetworkInterfaces: &[]compute.NetworkInterfaceReference{
- compute.NetworkInterfaceReference{
+ {
ID: nic.ID,
NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
Primary: to.BoolPtr(true),
AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
- SSH: &compute.SSHConfiguration{
- PublicKeys: &[]compute.SSHPublicKey{
- {
- Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
- KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
- },
- },
- },
},
CustomData: &customData,
},
},
}
+ if publicKey != nil {
+ vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH = &compute.SSHConfiguration{
+ PublicKeys: &[]compute.SSHPublicKey{
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
+ KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
+ },
+ },
+ }
+ }
+
+ if instanceType.Preemptible {
+ // Setting maxPrice to -1 is the equivalent of paying spot price, up to the
+ // normal price. This means the node will not be pre-empted for price
+ // reasons. It may still be pre-empted for capacity reasons though. And
+ // Azure offers *no* SLA on spot instances.
+ var maxPrice float64 = -1
+ vmParameters.VirtualMachineProperties.Priority = compute.Spot
+ vmParameters.VirtualMachineProperties.EvictionPolicy = compute.Delete
+ vmParameters.VirtualMachineProperties.BillingProfile = &compute.BillingProfile{MaxPrice: &maxPrice}
+ }
+
vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
- _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
- if delerr != nil {
- az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+ // Do some cleanup. Otherwise, an unbounded number of new unused nics and
+ // blobs can pile up during times when VMs can't be created and the
+ // dispatcher keeps retrying, because the garbage collection in manageBlobs
+ // and manageNics is only triggered periodically. This is most important
+ // for nics, because those are subject to a quota.
+ az.cleanupNic(nic)
+
+ if blobname != "" {
+ _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+ }
}
- _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
- if delerr != nil {
- az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
- }
+ // Leave cleaning up of managed disks to the garbage collection in manageDisks()
return nil, wrapAzureError(err)
}
return nil, wrapAzureError(err)
}
- instances := make([]cloud.Instance, 0)
-
+ var instances []cloud.Instance
for ; result.NotDone(); err = result.Next() {
if err != nil {
return nil, wrapAzureError(err)
}
- if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &azureInstance{
- provider: az,
- vm: result.Value(),
- nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
- }
+ instances = append(instances, &azureInstance{
+ provider: az,
+ vm: result.Value(),
+ nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+ })
}
return instances, nil
}
-// ManageNics returns a list of Azure network interface resources.
-// Also performs garbage collection of NICs which have "namePrefix", are
-// not associated with a virtual machine and have a "create-at" time
-// more than DeleteDanglingResourcesAfter (to prevent racing and
+// manageNics returns a list of Azure network interface resources.
+// Also performs garbage collection of NICs which have "namePrefix",
+// are not associated with a virtual machine and have a "created-at"
+// time more than DeleteDanglingResourcesAfter (to prevent racing and
// deleting newly created NICs) in the past are deleted.
func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
az.stopWg.Add(1)
return interfaces, nil
}
-// ManageBlobs garbage collects blobs (VM disk images) in the
+// manageBlobs garbage collects blobs (VM disk images) in the
// configured storage account container. It will delete blobs which
// have "namePrefix", are "available" (which means they are not
// leased to a VM) and haven't been modified for
}
}
+// manageDisks garbage collects managed compute disks (VM disk images) in the
+// configured resource group. It will delete disks which have "namePrefix",
+// are "unattached" (which means they are not leased to a VM) and were created
+// more than DeleteDanglingResourcesAfter seconds ago. (Azure provides no
+// modification timestamp on managed disks, there is only a creation timestamp)
+func (az *azureInstanceSet) manageDisks() {
+
+ re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
+ threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
+
+ response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
+ if err != nil {
+ az.logger.WithError(err).Warn("Error listing disks")
+ return
+ }
+
+ for ; response.NotDone(); err = response.Next() {
+ if err != nil {
+ az.logger.WithError(err).Warn("Error getting next page of disks")
+ return
+ }
+ for _, d := range response.Values() {
+ if d.DiskProperties.DiskState == compute.Unattached &&
+ d.Name != nil && re.MatchString(*d.Name) &&
+ d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
+
+ az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
+ az.deleteDisk <- d
+ }
+ }
+ }
+}
+
func (az *azureInstanceSet) Stop() {
az.stopFunc()
az.stopWg.Wait()
close(az.deleteNIC)
close(az.deleteBlob)
+ close(az.deleteDisk)
}
type azureInstance struct {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- tags := make(map[string]*string)
-
+ tags := map[string]*string{}
for k, v := range ai.vm.Tags {
- if !strings.HasPrefix(k, "dispatch-") {
- tags[k] = v
- }
+ tags[k] = v
}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
}
vmParameters := compute.VirtualMachine{
}
func (ai *azureInstance) Tags() cloud.InstanceTags {
- tags := make(map[string]string)
-
+ tags := cloud.InstanceTags{}
for k, v := range ai.vm.Tags {
- if strings.HasPrefix(k, "dispatch-") {
- tags[k[9:]] = *v
- }
+ tags[k] = *v
}
-
return tags
}
}
func (ai *azureInstance) Address() string {
- if ai.nic.IPConfigurations != nil &&
- len(*ai.nic.IPConfigurations) > 0 &&
- (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat != nil &&
- (*ai.nic.IPConfigurations)[0].InterfaceIPConfigurationPropertiesFormat.PrivateIPAddress != nil {
-
- return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
+ if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
+ return ""
+ } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
+ return ""
+ } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
+ return ""
+ } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
+ return ""
+ } else {
+ return *addr
}
- return ""
+}
+
+func (ai *azureInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
+ return nil
}
func (ai *azureInstance) RemoteUser() string {