1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
23 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
24 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
25 "github.com/Azure/azure-sdk-for-go/storage"
26 "github.com/Azure/go-autorest/autorest"
27 "github.com/Azure/go-autorest/autorest/azure"
28 "github.com/Azure/go-autorest/autorest/azure/auth"
29 "github.com/Azure/go-autorest/autorest/to"
30 "github.com/jmcvetta/randutil"
31 "github.com/sirupsen/logrus"
32 "golang.org/x/crypto/ssh"
35 // Driver is the azure implementation of the cloud.Driver interface.
36 var Driver = cloud.DriverFunc(newAzureInstanceSet)
38 type azureInstanceSetConfig struct {
43 CloudEnvironment string
45 ImageResourceGroup string
48 NetworkResourceGroup string
52 DeleteDanglingResourcesAfter arvados.Duration
56 type containerWrapper interface {
57 GetBlobReference(name string) *storage.Blob
58 ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
61 type virtualMachinesClientWrapper interface {
62 createOrUpdate(ctx context.Context,
63 resourceGroupName string,
65 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
66 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
67 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
70 type virtualMachinesClientImpl struct {
71 inner compute.VirtualMachinesClient
74 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
75 resourceGroupName string,
77 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
79 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
81 return compute.VirtualMachine{}, wrapAzureError(err)
83 future.WaitForCompletionRef(ctx, cl.inner.Client)
84 r, err := future.Result(cl.inner)
85 return r, wrapAzureError(err)
88 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
89 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
91 return nil, wrapAzureError(err)
93 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
94 return future.Response(), wrapAzureError(err)
97 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
98 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
99 return r, wrapAzureError(err)
102 type interfacesClientWrapper interface {
103 createOrUpdate(ctx context.Context,
104 resourceGroupName string,
105 networkInterfaceName string,
106 parameters network.Interface) (result network.Interface, err error)
107 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
108 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
111 type interfacesClientImpl struct {
112 inner network.InterfacesClient
115 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
116 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
118 return nil, wrapAzureError(err)
120 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
121 return future.Response(), wrapAzureError(err)
124 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
125 resourceGroupName string,
126 networkInterfaceName string,
127 parameters network.Interface) (result network.Interface, err error) {
129 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
131 return network.Interface{}, wrapAzureError(err)
133 future.WaitForCompletionRef(ctx, cl.inner.Client)
134 r, err := future.Result(cl.inner)
135 return r, wrapAzureError(err)
138 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
139 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
140 return r, wrapAzureError(err)
143 type disksClientWrapper interface {
144 listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
145 delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
148 type disksClientImpl struct {
149 inner compute.DisksClient
152 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
153 r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
154 return r, wrapAzureError(err)
157 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
158 r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
159 return r, wrapAzureError(err)
162 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
164 type azureRateLimitError struct {
169 func (ar *azureRateLimitError) EarliestRetry() time.Time {
173 type azureQuotaError struct {
177 func (ar *azureQuotaError) IsQuotaError() bool {
181 func wrapAzureError(err error) error {
182 de, ok := err.(autorest.DetailedError)
186 rq, ok := de.Original.(*azure.RequestError)
190 if rq.Response == nil {
193 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
195 ra := rq.Response.Header["Retry-After"][0]
196 earliestRetry, parseErr := http.ParseTime(ra)
198 // Could not parse as a timestamp, must be number of seconds
199 dur, parseErr := strconv.ParseInt(ra, 10, 64)
201 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
203 // Couldn't make sense of retry-after,
204 // so set retry to 20 seconds
205 earliestRetry = time.Now().Add(20 * time.Second)
208 return &azureRateLimitError{*rq, earliestRetry}
210 if rq.ServiceError == nil {
213 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
214 return &azureQuotaError{*rq}
219 type azureInstanceSet struct {
220 azconfig azureInstanceSetConfig
221 vmClient virtualMachinesClientWrapper
222 netClient interfacesClientWrapper
223 disksClient disksClientWrapper
224 imageResourceGroup string
225 blobcont containerWrapper
226 azureEnv azure.Environment
227 interfaces map[string]network.Interface
231 stopFunc context.CancelFunc
232 stopWg sync.WaitGroup
233 deleteNIC chan string
234 deleteBlob chan storage.Blob
235 deleteDisk chan compute.Disk
236 logger logrus.FieldLogger
239 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
240 azcfg := azureInstanceSetConfig{}
241 err = json.Unmarshal(config, &azcfg)
246 az := azureInstanceSet{logger: logger}
247 az.ctx, az.stopFunc = context.WithCancel(context.Background())
248 err = az.setup(azcfg, string(dispatcherID))
256 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
258 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
259 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
260 disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
261 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
263 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
268 authorizer, err := auth.ClientCredentialsConfig{
269 ClientID: az.azconfig.ClientID,
270 ClientSecret: az.azconfig.ClientSecret,
271 TenantID: az.azconfig.TenantID,
272 Resource: az.azureEnv.ResourceManagerEndpoint,
273 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
279 vmClient.Authorizer = authorizer
280 netClient.Authorizer = authorizer
281 disksClient.Authorizer = authorizer
282 storageAcctClient.Authorizer = authorizer
284 az.vmClient = &virtualMachinesClientImpl{vmClient}
285 az.netClient = &interfacesClientImpl{netClient}
286 az.disksClient = &disksClientImpl{disksClient}
288 az.imageResourceGroup = az.azconfig.ImageResourceGroup
289 if az.imageResourceGroup == "" {
290 az.imageResourceGroup = az.azconfig.ResourceGroup
293 var client storage.Client
294 if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
295 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
297 az.logger.WithError(err).Warn("Couldn't get account keys")
301 key1 := *(*result.Keys)[0].Value
302 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
304 az.logger.WithError(err).Warn("Couldn't make client")
308 blobsvc := client.GetBlobService()
309 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
310 } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
311 az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
314 az.dispatcherID = dispatcherID
315 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
319 defer az.stopWg.Done()
321 tk := time.NewTicker(5 * time.Minute)
324 case <-az.ctx.Done():
328 if az.blobcont != nil {
336 az.deleteNIC = make(chan string)
337 az.deleteBlob = make(chan storage.Blob)
338 az.deleteDisk = make(chan compute.Disk)
340 for i := 0; i < 4; i++ {
342 for nicname := range az.deleteNIC {
343 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
345 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
347 az.logger.Printf("Deleted NIC %v", nicname)
352 for blob := range az.deleteBlob {
353 err := blob.Delete(nil)
355 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
357 az.logger.Printf("Deleted blob %v", blob.Name)
362 for disk := range az.deleteDisk {
363 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
365 az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
367 az.logger.Printf("Deleted disk %v", *disk.Name)
376 func (az *azureInstanceSet) Create(
377 instanceType arvados.InstanceType,
378 imageID cloud.ImageID,
379 newTags cloud.InstanceTags,
380 initCommand cloud.InitCommand,
381 publicKey ssh.PublicKey) (cloud.Instance, error) {
384 defer az.stopWg.Done()
386 if instanceType.AddedScratch > 0 {
387 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
390 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
395 name = az.namePrefix + name
397 tags := map[string]*string{}
398 for k, v := range newTags {
399 tags[k] = to.StringPtr(v)
401 tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
403 networkResourceGroup := az.azconfig.NetworkResourceGroup
404 if networkResourceGroup == "" {
405 networkResourceGroup = az.azconfig.ResourceGroup
408 nicParameters := network.Interface{
409 Location: &az.azconfig.Location,
411 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
412 IPConfigurations: &[]network.InterfaceIPConfiguration{
413 network.InterfaceIPConfiguration{
414 Name: to.StringPtr("ip1"),
415 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
416 Subnet: &network.Subnet{
417 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
418 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
419 az.azconfig.SubscriptionID,
420 networkResourceGroup,
422 az.azconfig.Subnet)),
424 PrivateIPAllocationMethod: network.Dynamic,
430 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
432 return nil, wrapAzureError(err)
436 customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
437 var storageProfile *compute.StorageProfile
439 re := regexp.MustCompile(`^http(s?)://`)
440 if re.MatchString(string(imageID)) && az.blobcont != nil {
441 blobname = fmt.Sprintf("%s-os.vhd", name)
442 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
443 az.azconfig.StorageAccount,
444 az.azureEnv.StorageEndpointSuffix,
445 az.azconfig.BlobContainer,
447 az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
448 storageProfile = &compute.StorageProfile{
449 OsDisk: &compute.OSDisk{
450 OsType: compute.Linux,
451 Name: to.StringPtr(name + "-os"),
452 CreateOption: compute.DiskCreateOptionTypesFromImage,
453 Image: &compute.VirtualHardDisk{
454 URI: to.StringPtr(string(imageID)),
456 Vhd: &compute.VirtualHardDisk{
461 } else if az.blobcont == nil {
462 storageProfile = &compute.StorageProfile{
463 ImageReference: &compute.ImageReference{
464 ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
466 OsDisk: &compute.OSDisk{
467 OsType: compute.Linux,
468 Name: to.StringPtr(name + "-os"),
469 CreateOption: compute.DiskCreateOptionTypesFromImage,
473 return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
476 vmParameters := compute.VirtualMachine{
477 Location: &az.azconfig.Location,
479 VirtualMachineProperties: &compute.VirtualMachineProperties{
480 HardwareProfile: &compute.HardwareProfile{
481 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
483 StorageProfile: storageProfile,
484 NetworkProfile: &compute.NetworkProfile{
485 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
486 compute.NetworkInterfaceReference{
488 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
489 Primary: to.BoolPtr(true),
494 OsProfile: &compute.OSProfile{
496 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
497 LinuxConfiguration: &compute.LinuxConfiguration{
498 DisablePasswordAuthentication: to.BoolPtr(true),
499 SSH: &compute.SSHConfiguration{
500 PublicKeys: &[]compute.SSHPublicKey{
502 Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
503 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
508 CustomData: &customData,
513 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
515 if az.blobcont != nil {
516 _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
518 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
521 // Leave cleaning up of managed disks to the garbage collection in manageDisks()
523 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
525 az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
528 return nil, wrapAzureError(err)
531 return &azureInstance{
538 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
540 defer az.stopWg.Done()
542 interfaces, err := az.manageNics()
547 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
549 return nil, wrapAzureError(err)
552 var instances []cloud.Instance
553 for ; result.NotDone(); err = result.Next() {
555 return nil, wrapAzureError(err)
557 instances = append(instances, &azureInstance{
560 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
563 return instances, nil
566 // manageNics returns a list of Azure network interface resources.
567 // Also performs garbage collection of NICs which have "namePrefix",
568 // are not associated with a virtual machine and have a "created-at"
569 // time more than DeleteDanglingResourcesAfter (to prevent racing and
570 // deleting newly created NICs) in the past are deleted.
571 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
573 defer az.stopWg.Done()
575 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
577 return nil, wrapAzureError(err)
580 interfaces := make(map[string]network.Interface)
582 timestamp := time.Now()
583 for ; result.NotDone(); err = result.Next() {
585 az.logger.WithError(err).Warnf("Error listing nics")
586 return interfaces, nil
588 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
589 if result.Value().VirtualMachine != nil {
590 interfaces[*result.Value().ID] = result.Value()
592 if result.Value().Tags["created-at"] != nil {
593 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
595 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
596 az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
597 az.deleteNIC <- *result.Value().Name
604 return interfaces, nil
607 // manageBlobs garbage collects blobs (VM disk images) in the
608 // configured storage account container. It will delete blobs which
609 // have "namePrefix", are "available" (which means they are not
610 // leased to a VM) and haven't been modified for
611 // DeleteDanglingResourcesAfter seconds.
612 func (az *azureInstanceSet) manageBlobs() {
614 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
615 timestamp := time.Now()
618 response, err := az.blobcont.ListBlobs(page)
620 az.logger.WithError(err).Warn("Error listing blobs")
623 for _, b := range response.Blobs {
624 age := timestamp.Sub(time.Time(b.Properties.LastModified))
625 if b.Properties.BlobType == storage.BlobTypePage &&
626 b.Properties.LeaseState == "available" &&
627 b.Properties.LeaseStatus == "unlocked" &&
628 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
630 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
634 if response.NextMarker != "" {
635 page.Marker = response.NextMarker
642 // manageDisks garbage collects managed compute disks (VM disk images) in the
643 // configured resource group. It will delete disks which have "namePrefix",
644 // are "unattached" (which means they are not leased to a VM) and were created
645 // more than DeleteDanglingResourcesAfter seconds ago. (Azure provides no
646 // modification timestamp on managed disks, there is only a creation timestamp)
647 func (az *azureInstanceSet) manageDisks() {
649 re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
650 threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
652 response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
654 az.logger.WithError(err).Warn("Error listing disks")
658 for ; response.NotDone(); err = response.Next() {
659 for _, d := range response.Values() {
660 if d.DiskProperties.DiskState == compute.Unattached &&
661 d.Name != nil && re.MatchString(*d.Name) &&
662 d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
664 az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
671 func (az *azureInstanceSet) Stop() {
679 type azureInstance struct {
680 provider *azureInstanceSet
681 nic network.Interface
682 vm compute.VirtualMachine
685 func (ai *azureInstance) ID() cloud.InstanceID {
686 return cloud.InstanceID(*ai.vm.ID)
689 func (ai *azureInstance) String() string {
693 func (ai *azureInstance) ProviderType() string {
694 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
697 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
698 ai.provider.stopWg.Add(1)
699 defer ai.provider.stopWg.Done()
701 tags := map[string]*string{}
702 for k, v := range ai.vm.Tags {
705 for k, v := range newTags {
706 tags[k] = to.StringPtr(v)
709 vmParameters := compute.VirtualMachine{
710 Location: &ai.provider.azconfig.Location,
713 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
715 return wrapAzureError(err)
722 func (ai *azureInstance) Tags() cloud.InstanceTags {
723 tags := cloud.InstanceTags{}
724 for k, v := range ai.vm.Tags {
730 func (ai *azureInstance) Destroy() error {
731 ai.provider.stopWg.Add(1)
732 defer ai.provider.stopWg.Done()
734 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
735 return wrapAzureError(err)
738 func (ai *azureInstance) Address() string {
739 if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
741 } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
743 } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
745 } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
752 func (ai *azureInstance) RemoteUser() string {
753 return ai.provider.azconfig.AdminUsername
756 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
757 return cloud.ErrNotImplemented