1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24 "github.com/Azure/azure-sdk-for-go/storage"
25 "github.com/Azure/go-autorest/autorest"
26 "github.com/Azure/go-autorest/autorest/azure"
27 "github.com/Azure/go-autorest/autorest/azure/auth"
28 "github.com/Azure/go-autorest/autorest/to"
29 "github.com/jmcvetta/randutil"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37 type azureInstanceSetConfig struct {
42 CloudEnvironment string
46 NetworkResourceGroup string
50 DeleteDanglingResourcesAfter arvados.Duration
54 type containerWrapper interface {
55 GetBlobReference(name string) *storage.Blob
56 ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
59 type virtualMachinesClientWrapper interface {
60 createOrUpdate(ctx context.Context,
61 resourceGroupName string,
63 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
64 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
65 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
68 type virtualMachinesClientImpl struct {
69 inner compute.VirtualMachinesClient
72 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
73 resourceGroupName string,
75 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
77 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
79 return compute.VirtualMachine{}, wrapAzureError(err)
81 future.WaitForCompletionRef(ctx, cl.inner.Client)
82 r, err := future.Result(cl.inner)
83 return r, wrapAzureError(err)
86 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
87 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
89 return nil, wrapAzureError(err)
91 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
92 return future.Response(), wrapAzureError(err)
95 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
96 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
97 return r, wrapAzureError(err)
100 type interfacesClientWrapper interface {
101 createOrUpdate(ctx context.Context,
102 resourceGroupName string,
103 networkInterfaceName string,
104 parameters network.Interface) (result network.Interface, err error)
105 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
106 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
109 type interfacesClientImpl struct {
110 inner network.InterfacesClient
113 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
114 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
116 return nil, wrapAzureError(err)
118 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
119 return future.Response(), wrapAzureError(err)
122 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
123 resourceGroupName string,
124 networkInterfaceName string,
125 parameters network.Interface) (result network.Interface, err error) {
127 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
129 return network.Interface{}, wrapAzureError(err)
131 future.WaitForCompletionRef(ctx, cl.inner.Client)
132 r, err := future.Result(cl.inner)
133 return r, wrapAzureError(err)
136 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
137 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
138 return r, wrapAzureError(err)
141 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
143 type azureRateLimitError struct {
148 func (ar *azureRateLimitError) EarliestRetry() time.Time {
152 type azureQuotaError struct {
156 func (ar *azureQuotaError) IsQuotaError() bool {
160 func wrapAzureError(err error) error {
161 de, ok := err.(autorest.DetailedError)
165 rq, ok := de.Original.(*azure.RequestError)
169 if rq.Response == nil {
172 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
174 ra := rq.Response.Header["Retry-After"][0]
175 earliestRetry, parseErr := http.ParseTime(ra)
177 // Could not parse as a timestamp, must be number of seconds
178 dur, parseErr := strconv.ParseInt(ra, 10, 64)
180 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
182 // Couldn't make sense of retry-after,
183 // so set retry to 20 seconds
184 earliestRetry = time.Now().Add(20 * time.Second)
187 return &azureRateLimitError{*rq, earliestRetry}
189 if rq.ServiceError == nil {
192 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
193 return &azureQuotaError{*rq}
198 type azureInstanceSet struct {
199 azconfig azureInstanceSetConfig
200 vmClient virtualMachinesClientWrapper
201 netClient interfacesClientWrapper
202 blobcont containerWrapper
203 azureEnv azure.Environment
204 interfaces map[string]network.Interface
208 stopFunc context.CancelFunc
209 stopWg sync.WaitGroup
210 deleteNIC chan string
211 deleteBlob chan storage.Blob
212 logger logrus.FieldLogger
215 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
216 azcfg := azureInstanceSetConfig{}
217 err = json.Unmarshal(config, &azcfg)
222 az := azureInstanceSet{logger: logger}
223 az.ctx, az.stopFunc = context.WithCancel(context.Background())
224 err = az.setup(azcfg, string(dispatcherID))
232 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
234 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
235 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
236 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
238 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
243 authorizer, err := auth.ClientCredentialsConfig{
244 ClientID: az.azconfig.ClientID,
245 ClientSecret: az.azconfig.ClientSecret,
246 TenantID: az.azconfig.TenantID,
247 Resource: az.azureEnv.ResourceManagerEndpoint,
248 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
254 vmClient.Authorizer = authorizer
255 netClient.Authorizer = authorizer
256 storageAcctClient.Authorizer = authorizer
258 az.vmClient = &virtualMachinesClientImpl{vmClient}
259 az.netClient = &interfacesClientImpl{netClient}
261 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
263 az.logger.WithError(err).Warn("Couldn't get account keys")
267 key1 := *(*result.Keys)[0].Value
268 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
270 az.logger.WithError(err).Warn("Couldn't make client")
274 blobsvc := client.GetBlobService()
275 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
277 az.dispatcherID = dispatcherID
278 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
282 defer az.stopWg.Done()
284 tk := time.NewTicker(5 * time.Minute)
287 case <-az.ctx.Done():
296 az.deleteNIC = make(chan string)
297 az.deleteBlob = make(chan storage.Blob)
299 for i := 0; i < 4; i++ {
302 nicname, ok := <-az.deleteNIC
306 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
308 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
310 az.logger.Printf("Deleted NIC %v", nicname)
316 blob, ok := <-az.deleteBlob
320 err := blob.Delete(nil)
322 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
324 az.logger.Printf("Deleted blob %v", blob.Name)
333 func (az *azureInstanceSet) Create(
334 instanceType arvados.InstanceType,
335 imageID cloud.ImageID,
336 newTags cloud.InstanceTags,
337 initCommand cloud.InitCommand,
338 publicKey ssh.PublicKey) (cloud.Instance, error) {
341 defer az.stopWg.Done()
343 if instanceType.AddedScratch > 0 {
344 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
347 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
352 name = az.namePrefix + name
354 tags := map[string]*string{}
355 for k, v := range newTags {
356 tags[k] = to.StringPtr(v)
358 tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
360 networkResourceGroup := az.azconfig.NetworkResourceGroup
361 if networkResourceGroup == "" {
362 networkResourceGroup = az.azconfig.ResourceGroup
365 nicParameters := network.Interface{
366 Location: &az.azconfig.Location,
368 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
369 IPConfigurations: &[]network.InterfaceIPConfiguration{
370 network.InterfaceIPConfiguration{
371 Name: to.StringPtr("ip1"),
372 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
373 Subnet: &network.Subnet{
374 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
375 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
376 az.azconfig.SubscriptionID,
377 networkResourceGroup,
379 az.azconfig.Subnet)),
381 PrivateIPAllocationMethod: network.Dynamic,
387 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
389 return nil, wrapAzureError(err)
392 blobname := fmt.Sprintf("%s-os.vhd", name)
393 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
394 az.azconfig.StorageAccount,
395 az.azureEnv.StorageEndpointSuffix,
396 az.azconfig.BlobContainer,
399 customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
401 vmParameters := compute.VirtualMachine{
402 Location: &az.azconfig.Location,
404 VirtualMachineProperties: &compute.VirtualMachineProperties{
405 HardwareProfile: &compute.HardwareProfile{
406 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
408 StorageProfile: &compute.StorageProfile{
409 OsDisk: &compute.OSDisk{
410 OsType: compute.Linux,
411 Name: to.StringPtr(name + "-os"),
412 CreateOption: compute.FromImage,
413 Image: &compute.VirtualHardDisk{
414 URI: to.StringPtr(string(imageID)),
416 Vhd: &compute.VirtualHardDisk{
421 NetworkProfile: &compute.NetworkProfile{
422 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
423 compute.NetworkInterfaceReference{
425 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
426 Primary: to.BoolPtr(true),
431 OsProfile: &compute.OSProfile{
433 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
434 LinuxConfiguration: &compute.LinuxConfiguration{
435 DisablePasswordAuthentication: to.BoolPtr(true),
436 SSH: &compute.SSHConfiguration{
437 PublicKeys: &[]compute.SSHPublicKey{
439 Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
440 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
445 CustomData: &customData,
450 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
452 _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
454 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
457 _, delerr = az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
459 az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
462 return nil, wrapAzureError(err)
465 return &azureInstance{
472 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
474 defer az.stopWg.Done()
476 interfaces, err := az.manageNics()
481 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
483 return nil, wrapAzureError(err)
486 var instances []cloud.Instance
487 for ; result.NotDone(); err = result.Next() {
489 return nil, wrapAzureError(err)
491 instances = append(instances, &azureInstance{
494 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
497 return instances, nil
500 // ManageNics returns a list of Azure network interface resources.
501 // Also performs garbage collection of NICs which have "namePrefix",
502 // are not associated with a virtual machine and have a "created-at"
503 // time more than DeleteDanglingResourcesAfter (to prevent racing and
504 // deleting newly created NICs) in the past are deleted.
505 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
507 defer az.stopWg.Done()
509 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
511 return nil, wrapAzureError(err)
514 interfaces := make(map[string]network.Interface)
516 timestamp := time.Now()
517 for ; result.NotDone(); err = result.Next() {
519 az.logger.WithError(err).Warnf("Error listing nics")
520 return interfaces, nil
522 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
523 if result.Value().VirtualMachine != nil {
524 interfaces[*result.Value().ID] = result.Value()
526 if result.Value().Tags["created-at"] != nil {
527 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
529 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
530 az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
531 az.deleteNIC <- *result.Value().Name
538 return interfaces, nil
541 // ManageBlobs garbage collects blobs (VM disk images) in the
542 // configured storage account container. It will delete blobs which
543 // have "namePrefix", are "available" (which means they are not
544 // leased to a VM) and haven't been modified for
545 // DeleteDanglingResourcesAfter seconds.
546 func (az *azureInstanceSet) manageBlobs() {
548 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
549 timestamp := time.Now()
552 response, err := az.blobcont.ListBlobs(page)
554 az.logger.WithError(err).Warn("Error listing blobs")
557 for _, b := range response.Blobs {
558 age := timestamp.Sub(time.Time(b.Properties.LastModified))
559 if b.Properties.BlobType == storage.BlobTypePage &&
560 b.Properties.LeaseState == "available" &&
561 b.Properties.LeaseStatus == "unlocked" &&
562 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
564 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
568 if response.NextMarker != "" {
569 page.Marker = response.NextMarker
576 func (az *azureInstanceSet) Stop() {
583 type azureInstance struct {
584 provider *azureInstanceSet
585 nic network.Interface
586 vm compute.VirtualMachine
589 func (ai *azureInstance) ID() cloud.InstanceID {
590 return cloud.InstanceID(*ai.vm.ID)
593 func (ai *azureInstance) String() string {
597 func (ai *azureInstance) ProviderType() string {
598 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
601 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
602 ai.provider.stopWg.Add(1)
603 defer ai.provider.stopWg.Done()
605 tags := map[string]*string{}
606 for k, v := range ai.vm.Tags {
609 for k, v := range newTags {
610 tags[k] = to.StringPtr(v)
613 vmParameters := compute.VirtualMachine{
614 Location: &ai.provider.azconfig.Location,
617 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
619 return wrapAzureError(err)
626 func (ai *azureInstance) Tags() cloud.InstanceTags {
627 tags := cloud.InstanceTags{}
628 for k, v := range ai.vm.Tags {
634 func (ai *azureInstance) Destroy() error {
635 ai.provider.stopWg.Add(1)
636 defer ai.provider.stopWg.Done()
638 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
639 return wrapAzureError(err)
642 func (ai *azureInstance) Address() string {
643 if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
645 } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
647 } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
649 } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
656 func (ai *azureInstance) RemoteUser() string {
657 return ai.provider.azconfig.AdminUsername
660 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
661 return cloud.ErrNotImplemented