1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
22 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24 "github.com/Azure/azure-sdk-for-go/storage"
25 "github.com/Azure/go-autorest/autorest"
26 "github.com/Azure/go-autorest/autorest/azure"
27 "github.com/Azure/go-autorest/autorest/azure/auth"
28 "github.com/Azure/go-autorest/autorest/to"
29 "github.com/jmcvetta/randutil"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37 type azureInstanceSetConfig struct {
42 CloudEnvironment string
44 ImageResourceGroup string
47 NetworkResourceGroup string
51 DeleteDanglingResourcesAfter arvados.Duration
55 type containerWrapper interface {
56 GetBlobReference(name string) *storage.Blob
57 ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
60 type virtualMachinesClientWrapper interface {
61 createOrUpdate(ctx context.Context,
62 resourceGroupName string,
64 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
65 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
66 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
69 type virtualMachinesClientImpl struct {
70 inner compute.VirtualMachinesClient
73 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
74 resourceGroupName string,
76 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
78 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
80 return compute.VirtualMachine{}, wrapAzureError(err)
82 future.WaitForCompletionRef(ctx, cl.inner.Client)
83 r, err := future.Result(cl.inner)
84 return r, wrapAzureError(err)
87 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
88 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
90 return nil, wrapAzureError(err)
92 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
93 return future.Response(), wrapAzureError(err)
96 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
97 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
98 return r, wrapAzureError(err)
101 type interfacesClientWrapper interface {
102 createOrUpdate(ctx context.Context,
103 resourceGroupName string,
104 networkInterfaceName string,
105 parameters network.Interface) (result network.Interface, err error)
106 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
107 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
110 type interfacesClientImpl struct {
111 inner network.InterfacesClient
114 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
115 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
117 return nil, wrapAzureError(err)
119 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
120 return future.Response(), wrapAzureError(err)
123 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
124 resourceGroupName string,
125 networkInterfaceName string,
126 parameters network.Interface) (result network.Interface, err error) {
128 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
130 return network.Interface{}, wrapAzureError(err)
132 future.WaitForCompletionRef(ctx, cl.inner.Client)
133 r, err := future.Result(cl.inner)
134 return r, wrapAzureError(err)
137 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
138 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
139 return r, wrapAzureError(err)
142 type disksClientWrapper interface {
143 listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
144 delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
147 type disksClientImpl struct {
148 inner compute.DisksClient
151 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
152 r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
153 return r, wrapAzureError(err)
156 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
157 r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
158 return r, wrapAzureError(err)
161 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
163 type azureRateLimitError struct {
168 func (ar *azureRateLimitError) EarliestRetry() time.Time {
172 type azureQuotaError struct {
176 func (ar *azureQuotaError) IsQuotaError() bool {
180 func wrapAzureError(err error) error {
181 de, ok := err.(autorest.DetailedError)
185 rq, ok := de.Original.(*azure.RequestError)
189 if rq.Response == nil {
192 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
194 ra := rq.Response.Header["Retry-After"][0]
195 earliestRetry, parseErr := http.ParseTime(ra)
197 // Could not parse as a timestamp, must be number of seconds
198 dur, parseErr := strconv.ParseInt(ra, 10, 64)
200 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
202 // Couldn't make sense of retry-after,
203 // so set retry to 20 seconds
204 earliestRetry = time.Now().Add(20 * time.Second)
207 return &azureRateLimitError{*rq, earliestRetry}
209 if rq.ServiceError == nil {
212 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
213 return &azureQuotaError{*rq}
218 type azureInstanceSet struct {
219 azconfig azureInstanceSetConfig
220 vmClient virtualMachinesClientWrapper
221 netClient interfacesClientWrapper
222 disksClient disksClientWrapper
223 imageResourceGroup string
224 blobcont containerWrapper
225 azureEnv azure.Environment
226 interfaces map[string]network.Interface
230 stopFunc context.CancelFunc
231 stopWg sync.WaitGroup
232 deleteNIC chan string
233 deleteBlob chan storage.Blob
234 deleteDisk chan compute.Disk
235 logger logrus.FieldLogger
238 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
239 azcfg := azureInstanceSetConfig{}
240 err = json.Unmarshal(config, &azcfg)
245 az := azureInstanceSet{logger: logger}
246 az.ctx, az.stopFunc = context.WithCancel(context.Background())
247 err = az.setup(azcfg, string(dispatcherID))
255 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
257 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
258 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
259 disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
260 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
262 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
267 authorizer, err := auth.ClientCredentialsConfig{
268 ClientID: az.azconfig.ClientID,
269 ClientSecret: az.azconfig.ClientSecret,
270 TenantID: az.azconfig.TenantID,
271 Resource: az.azureEnv.ResourceManagerEndpoint,
272 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
278 vmClient.Authorizer = authorizer
279 netClient.Authorizer = authorizer
280 disksClient.Authorizer = authorizer
281 storageAcctClient.Authorizer = authorizer
283 az.vmClient = &virtualMachinesClientImpl{vmClient}
284 az.netClient = &interfacesClientImpl{netClient}
285 az.disksClient = &disksClientImpl{disksClient}
287 az.imageResourceGroup = az.azconfig.ImageResourceGroup
288 if az.imageResourceGroup == "" {
289 az.imageResourceGroup = az.azconfig.ResourceGroup
292 var client storage.Client
293 if az.azconfig.StorageAccount != "" {
294 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
296 az.logger.WithError(err).Warn("Couldn't get account keys")
300 key1 := *(*result.Keys)[0].Value
301 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
303 az.logger.WithError(err).Warn("Couldn't make client")
307 blobsvc := client.GetBlobService()
308 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
311 az.dispatcherID = dispatcherID
312 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
316 defer az.stopWg.Done()
318 tk := time.NewTicker(5 * time.Minute)
321 case <-az.ctx.Done():
325 if az.blobcont != nil {
333 az.deleteNIC = make(chan string)
334 az.deleteBlob = make(chan storage.Blob)
335 az.deleteDisk = make(chan compute.Disk)
337 for i := 0; i < 4; i++ {
340 nicname, ok := <-az.deleteNIC
344 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
346 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
348 az.logger.Printf("Deleted NIC %v", nicname)
354 blob, ok := <-az.deleteBlob
358 err := blob.Delete(nil)
360 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
362 az.logger.Printf("Deleted blob %v", blob.Name)
368 disk, ok := <-az.deleteDisk
372 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
374 az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
376 az.logger.Printf("Deleted disk %v", *disk.Name)
385 func (az *azureInstanceSet) Create(
386 instanceType arvados.InstanceType,
387 imageID cloud.ImageID,
388 newTags cloud.InstanceTags,
389 initCommand cloud.InitCommand,
390 publicKey ssh.PublicKey) (cloud.Instance, error) {
393 defer az.stopWg.Done()
395 if instanceType.AddedScratch > 0 {
396 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
399 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
404 name = az.namePrefix + name
406 tags := map[string]*string{}
407 for k, v := range newTags {
408 tags[k] = to.StringPtr(v)
410 tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
412 networkResourceGroup := az.azconfig.NetworkResourceGroup
413 if networkResourceGroup == "" {
414 networkResourceGroup = az.azconfig.ResourceGroup
417 nicParameters := network.Interface{
418 Location: &az.azconfig.Location,
420 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
421 IPConfigurations: &[]network.InterfaceIPConfiguration{
422 network.InterfaceIPConfiguration{
423 Name: to.StringPtr("ip1"),
424 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
425 Subnet: &network.Subnet{
426 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
427 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
428 az.azconfig.SubscriptionID,
429 networkResourceGroup,
431 az.azconfig.Subnet)),
433 PrivateIPAllocationMethod: network.Dynamic,
439 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
441 return nil, wrapAzureError(err)
444 blobname := fmt.Sprintf("%s-os.vhd", name)
445 customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
446 var storageProfile *compute.StorageProfile
448 re := regexp.MustCompile(`^http(s?)://`)
449 if re.MatchString(string(imageID)) {
450 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
451 az.azconfig.StorageAccount,
452 az.azureEnv.StorageEndpointSuffix,
453 az.azconfig.BlobContainer,
455 az.logger.Info("using deprecated VHD image")
456 storageProfile = &compute.StorageProfile{
457 OsDisk: &compute.OSDisk{
458 OsType: compute.Linux,
459 Name: to.StringPtr(name + "-os"),
460 CreateOption: compute.DiskCreateOptionTypesFromImage,
461 Image: &compute.VirtualHardDisk{
462 URI: to.StringPtr(string(imageID)),
464 Vhd: &compute.VirtualHardDisk{
470 az.logger.Info("using managed image")
472 storageProfile = &compute.StorageProfile{
473 ImageReference: &compute.ImageReference{
474 ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
476 OsDisk: &compute.OSDisk{
477 OsType: compute.Linux,
478 Name: to.StringPtr(name + "-os"),
479 CreateOption: compute.DiskCreateOptionTypesFromImage,
484 vmParameters := compute.VirtualMachine{
485 Location: &az.azconfig.Location,
487 VirtualMachineProperties: &compute.VirtualMachineProperties{
488 HardwareProfile: &compute.HardwareProfile{
489 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
491 StorageProfile: storageProfile,
492 NetworkProfile: &compute.NetworkProfile{
493 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
494 compute.NetworkInterfaceReference{
496 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
497 Primary: to.BoolPtr(true),
502 OsProfile: &compute.OSProfile{
504 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
505 LinuxConfiguration: &compute.LinuxConfiguration{
506 DisablePasswordAuthentication: to.BoolPtr(true),
507 SSH: &compute.SSHConfiguration{
508 PublicKeys: &[]compute.SSHPublicKey{
510 Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
511 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
516 CustomData: &customData,
521 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
523 if az.blobcont != nil {
524 _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
526 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
530 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
532 az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
535 return nil, wrapAzureError(err)
538 return &azureInstance{
545 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
547 defer az.stopWg.Done()
549 interfaces, err := az.manageNics()
554 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
556 return nil, wrapAzureError(err)
559 var instances []cloud.Instance
560 for ; result.NotDone(); err = result.Next() {
562 return nil, wrapAzureError(err)
564 instances = append(instances, &azureInstance{
567 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
570 return instances, nil
573 // ManageNics returns a list of Azure network interface resources.
574 // Also performs garbage collection of NICs which have "namePrefix",
575 // are not associated with a virtual machine and have a "created-at"
576 // time more than DeleteDanglingResourcesAfter (to prevent racing and
577 // deleting newly created NICs) in the past are deleted.
578 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
580 defer az.stopWg.Done()
582 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
584 return nil, wrapAzureError(err)
587 interfaces := make(map[string]network.Interface)
589 timestamp := time.Now()
590 for ; result.NotDone(); err = result.Next() {
592 az.logger.WithError(err).Warnf("Error listing nics")
593 return interfaces, nil
595 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
596 if result.Value().VirtualMachine != nil {
597 interfaces[*result.Value().ID] = result.Value()
599 if result.Value().Tags["created-at"] != nil {
600 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
602 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
603 az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
604 az.deleteNIC <- *result.Value().Name
611 return interfaces, nil
614 // ManageBlobs garbage collects blobs (VM disk images) in the
615 // configured storage account container. It will delete blobs which
616 // have "namePrefix", are "available" (which means they are not
617 // leased to a VM) and haven't been modified for
618 // DeleteDanglingResourcesAfter seconds.
619 func (az *azureInstanceSet) manageBlobs() {
621 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
622 timestamp := time.Now()
625 response, err := az.blobcont.ListBlobs(page)
627 az.logger.WithError(err).Warn("Error listing blobs")
630 for _, b := range response.Blobs {
631 age := timestamp.Sub(time.Time(b.Properties.LastModified))
632 if b.Properties.BlobType == storage.BlobTypePage &&
633 b.Properties.LeaseState == "available" &&
634 b.Properties.LeaseStatus == "unlocked" &&
635 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
637 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
641 if response.NextMarker != "" {
642 page.Marker = response.NextMarker
649 // ManageDisks garbage collects managed compute disks (VM disk images) in the
650 // configured resource group. It will delete disks which
651 // have "namePrefix", are "available" (which means they are not
652 // leased to a VM) and were created more than DeleteDanglingResourcesAfter seconds ago.
653 // (Azure provides no modification timestamp on managed disks, there is only a
654 // creation timestamp)
655 func (az *azureInstanceSet) manageDisks() {
657 re := regexp.MustCompile(`^` + az.namePrefix + `.*-os$`)
658 timestamp := time.Now()
661 response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
663 az.logger.WithError(err).Warn("Error listing disks")
666 for _, d := range response.Values() {
667 age := timestamp.Sub(d.DiskProperties.TimeCreated.ToTime())
668 if d.DiskProperties.DiskState == "Unattached" &&
669 re.MatchString(*d.Name) &&
670 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
672 az.logger.Printf("Disk %v is unlocked and not modified for %v seconds, will delete", *d.Name, age.Seconds())
676 if response.Values() != nil {
684 func (az *azureInstanceSet) Stop() {
691 type azureInstance struct {
692 provider *azureInstanceSet
693 nic network.Interface
694 vm compute.VirtualMachine
697 func (ai *azureInstance) ID() cloud.InstanceID {
698 return cloud.InstanceID(*ai.vm.ID)
701 func (ai *azureInstance) String() string {
705 func (ai *azureInstance) ProviderType() string {
706 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
709 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
710 ai.provider.stopWg.Add(1)
711 defer ai.provider.stopWg.Done()
713 tags := map[string]*string{}
714 for k, v := range ai.vm.Tags {
717 for k, v := range newTags {
718 tags[k] = to.StringPtr(v)
721 vmParameters := compute.VirtualMachine{
722 Location: &ai.provider.azconfig.Location,
725 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
727 return wrapAzureError(err)
734 func (ai *azureInstance) Tags() cloud.InstanceTags {
735 tags := cloud.InstanceTags{}
736 for k, v := range ai.vm.Tags {
742 func (ai *azureInstance) Destroy() error {
743 ai.provider.stopWg.Add(1)
744 defer ai.provider.stopWg.Done()
746 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
747 return wrapAzureError(err)
750 func (ai *azureInstance) Address() string {
751 if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
753 } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
755 } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
757 } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
764 func (ai *azureInstance) RemoteUser() string {
765 return ai.provider.azconfig.AdminUsername
768 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
769 return cloud.ErrNotImplemented