1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/lib/cloud"
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24 "github.com/Azure/azure-sdk-for-go/storage"
25 "github.com/Azure/go-autorest/autorest"
26 "github.com/Azure/go-autorest/autorest/azure"
27 "github.com/Azure/go-autorest/autorest/azure/auth"
28 "github.com/Azure/go-autorest/autorest/to"
29 "github.com/jmcvetta/randutil"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37 type azureInstanceSetConfig struct {
42 CloudEnvironment string
49 DeleteDanglingResourcesAfter arvados.Duration
53 const tagKeyInstanceSecret = "InstanceSecret"
55 type virtualMachinesClientWrapper interface {
56 createOrUpdate(ctx context.Context,
57 resourceGroupName string,
59 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
60 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
61 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
64 type virtualMachinesClientImpl struct {
65 inner compute.VirtualMachinesClient
68 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
69 resourceGroupName string,
71 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
73 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
75 return compute.VirtualMachine{}, wrapAzureError(err)
77 future.WaitForCompletionRef(ctx, cl.inner.Client)
78 r, err := future.Result(cl.inner)
79 return r, wrapAzureError(err)
82 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
83 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
85 return nil, wrapAzureError(err)
87 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
88 return future.Response(), wrapAzureError(err)
91 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
92 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
93 return r, wrapAzureError(err)
96 type interfacesClientWrapper interface {
97 createOrUpdate(ctx context.Context,
98 resourceGroupName string,
99 networkInterfaceName string,
100 parameters network.Interface) (result network.Interface, err error)
101 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
102 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
105 type interfacesClientImpl struct {
106 inner network.InterfacesClient
109 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
110 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
112 return nil, wrapAzureError(err)
114 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
115 return future.Response(), wrapAzureError(err)
118 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
119 resourceGroupName string,
120 networkInterfaceName string,
121 parameters network.Interface) (result network.Interface, err error) {
123 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
125 return network.Interface{}, wrapAzureError(err)
127 future.WaitForCompletionRef(ctx, cl.inner.Client)
128 r, err := future.Result(cl.inner)
129 return r, wrapAzureError(err)
132 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
133 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
134 return r, wrapAzureError(err)
137 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
139 type azureRateLimitError struct {
144 func (ar *azureRateLimitError) EarliestRetry() time.Time {
148 type azureQuotaError struct {
152 func (ar *azureQuotaError) IsQuotaError() bool {
156 func wrapAzureError(err error) error {
157 de, ok := err.(autorest.DetailedError)
161 rq, ok := de.Original.(*azure.RequestError)
165 if rq.Response == nil {
168 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
170 ra := rq.Response.Header["Retry-After"][0]
171 earliestRetry, parseErr := http.ParseTime(ra)
173 // Could not parse as a timestamp, must be number of seconds
174 dur, parseErr := strconv.ParseInt(ra, 10, 64)
176 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
178 // Couldn't make sense of retry-after,
179 // so set retry to 20 seconds
180 earliestRetry = time.Now().Add(20 * time.Second)
183 return &azureRateLimitError{*rq, earliestRetry}
185 if rq.ServiceError == nil {
188 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
189 return &azureQuotaError{*rq}
194 type azureInstanceSet struct {
195 azconfig azureInstanceSetConfig
196 vmClient virtualMachinesClientWrapper
197 netClient interfacesClientWrapper
198 storageAcctClient storageacct.AccountsClient
199 azureEnv azure.Environment
200 interfaces map[string]network.Interface
204 stopFunc context.CancelFunc
205 stopWg sync.WaitGroup
206 deleteNIC chan string
207 deleteBlob chan storage.Blob
208 logger logrus.FieldLogger
211 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
212 azcfg := azureInstanceSetConfig{}
213 err = json.Unmarshal(config, &azcfg)
218 ap := azureInstanceSet{logger: logger}
219 err = ap.setup(azcfg, string(dispatcherID))
226 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
228 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
229 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
230 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
232 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
237 authorizer, err := auth.ClientCredentialsConfig{
238 ClientID: az.azconfig.ClientID,
239 ClientSecret: az.azconfig.ClientSecret,
240 TenantID: az.azconfig.TenantID,
241 Resource: az.azureEnv.ResourceManagerEndpoint,
242 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
248 vmClient.Authorizer = authorizer
249 netClient.Authorizer = authorizer
250 storageAcctClient.Authorizer = authorizer
252 az.vmClient = &virtualMachinesClientImpl{vmClient}
253 az.netClient = &interfacesClientImpl{netClient}
254 az.storageAcctClient = storageAcctClient
256 az.dispatcherID = dispatcherID
257 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
259 az.ctx, az.stopFunc = context.WithCancel(context.Background())
262 defer az.stopWg.Done()
264 tk := time.NewTicker(5 * time.Minute)
267 case <-az.ctx.Done():
276 az.deleteNIC = make(chan string)
277 az.deleteBlob = make(chan storage.Blob)
279 for i := 0; i < 4; i++ {
282 nicname, ok := <-az.deleteNIC
286 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
288 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
290 az.logger.Printf("Deleted NIC %v", nicname)
296 blob, ok := <-az.deleteBlob
300 err := blob.Delete(nil)
302 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
304 az.logger.Printf("Deleted blob %v", blob.Name)
313 func (az *azureInstanceSet) Create(
314 instanceType arvados.InstanceType,
315 imageID cloud.ImageID,
316 newTags cloud.InstanceTags,
317 initCommand cloud.InitCommand,
318 publicKey ssh.PublicKey) (cloud.Instance, error) {
321 defer az.stopWg.Done()
323 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
328 name = az.namePrefix + name
330 timestamp := time.Now().Format(time.RFC3339Nano)
332 tags := make(map[string]*string)
333 tags["created-at"] = ×tamp
334 for k, v := range newTags {
336 tags["dispatch-"+k] = &newstr
339 nicParameters := network.Interface{
340 Location: &az.azconfig.Location,
342 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
343 IPConfigurations: &[]network.InterfaceIPConfiguration{
344 network.InterfaceIPConfiguration{
345 Name: to.StringPtr("ip1"),
346 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
347 Subnet: &network.Subnet{
348 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
349 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
350 az.azconfig.SubscriptionID,
351 az.azconfig.ResourceGroup,
353 az.azconfig.Subnet)),
355 PrivateIPAllocationMethod: network.Dynamic,
361 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
363 return nil, wrapAzureError(err)
366 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
367 az.azconfig.StorageAccount,
368 az.azureEnv.StorageEndpointSuffix,
369 az.azconfig.BlobContainer,
372 customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
374 vmParameters := compute.VirtualMachine{
375 Location: &az.azconfig.Location,
377 VirtualMachineProperties: &compute.VirtualMachineProperties{
378 HardwareProfile: &compute.HardwareProfile{
379 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
381 StorageProfile: &compute.StorageProfile{
382 OsDisk: &compute.OSDisk{
383 OsType: compute.Linux,
384 Name: to.StringPtr(name + "-os"),
385 CreateOption: compute.FromImage,
386 Image: &compute.VirtualHardDisk{
387 URI: to.StringPtr(string(imageID)),
389 Vhd: &compute.VirtualHardDisk{
394 NetworkProfile: &compute.NetworkProfile{
395 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
396 compute.NetworkInterfaceReference{
398 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
399 Primary: to.BoolPtr(true),
404 OsProfile: &compute.OSProfile{
406 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
407 LinuxConfiguration: &compute.LinuxConfiguration{
408 DisablePasswordAuthentication: to.BoolPtr(true),
409 SSH: &compute.SSHConfiguration{
410 PublicKeys: &[]compute.SSHPublicKey{
412 Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
413 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
418 CustomData: &customData,
423 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
425 return nil, wrapAzureError(err)
428 return &azureInstance{
435 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
437 defer az.stopWg.Done()
439 interfaces, err := az.manageNics()
444 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
446 return nil, wrapAzureError(err)
449 instances := make([]cloud.Instance, 0)
451 for ; result.NotDone(); err = result.Next() {
453 return nil, wrapAzureError(err)
455 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
456 instances = append(instances, &azureInstance{
459 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
462 return instances, nil
465 // ManageNics returns a list of Azure network interface resources.
466 // Also performs garbage collection of NICs which have "namePrefix", are
467 // not associated with a virtual machine and have a "create-at" time
468 // more than DeleteDanglingResourcesAfter (to prevent racing and
469 // deleting newly created NICs) in the past are deleted.
470 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
472 defer az.stopWg.Done()
474 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
476 return nil, wrapAzureError(err)
479 interfaces := make(map[string]network.Interface)
481 timestamp := time.Now()
482 for ; result.NotDone(); err = result.Next() {
484 az.logger.WithError(err).Warnf("Error listing nics")
485 return interfaces, nil
487 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
488 if result.Value().VirtualMachine != nil {
489 interfaces[*result.Value().ID] = result.Value()
491 if result.Value().Tags["created-at"] != nil {
492 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
494 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
495 az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
496 az.deleteNIC <- *result.Value().Name
503 return interfaces, nil
506 // ManageBlobs garbage collects blobs (VM disk images) in the
507 // configured storage account container. It will delete blobs which
508 // have "namePrefix", are "available" (which means they are not
509 // leased to a VM) and haven't been modified for
510 // DeleteDanglingResourcesAfter seconds.
511 func (az *azureInstanceSet) manageBlobs() {
512 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
514 az.logger.WithError(err).Warn("Couldn't get account keys")
518 key1 := *(*result.Keys)[0].Value
519 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
521 az.logger.WithError(err).Warn("Couldn't make client")
525 blobsvc := client.GetBlobService()
526 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
528 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
529 timestamp := time.Now()
532 response, err := blobcont.ListBlobs(page)
534 az.logger.WithError(err).Warn("Error listing blobs")
537 for _, b := range response.Blobs {
538 age := timestamp.Sub(time.Time(b.Properties.LastModified))
539 if b.Properties.BlobType == storage.BlobTypePage &&
540 b.Properties.LeaseState == "available" &&
541 b.Properties.LeaseStatus == "unlocked" &&
542 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
544 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
548 if response.NextMarker != "" {
549 page.Marker = response.NextMarker
556 func (az *azureInstanceSet) Stop() {
563 type azureInstance struct {
564 provider *azureInstanceSet
565 nic network.Interface
566 vm compute.VirtualMachine
569 func (ai *azureInstance) ID() cloud.InstanceID {
570 return cloud.InstanceID(*ai.vm.ID)
573 func (ai *azureInstance) String() string {
577 func (ai *azureInstance) ProviderType() string {
578 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
581 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
582 ai.provider.stopWg.Add(1)
583 defer ai.provider.stopWg.Done()
585 tags := make(map[string]*string)
587 for k, v := range ai.vm.Tags {
588 if !strings.HasPrefix(k, "dispatch-") {
592 for k, v := range newTags {
594 tags["dispatch-"+k] = &newstr
597 vmParameters := compute.VirtualMachine{
598 Location: &ai.provider.azconfig.Location,
601 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
603 return wrapAzureError(err)
610 func (ai *azureInstance) Tags() cloud.InstanceTags {
611 tags := make(map[string]string)
613 for k, v := range ai.vm.Tags {
614 if strings.HasPrefix(k, "dispatch-") {
622 func (ai *azureInstance) Destroy() error {
623 ai.provider.stopWg.Add(1)
624 defer ai.provider.stopWg.Done()
626 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
627 return wrapAzureError(err)
630 func (ai *azureInstance) Address() string {
631 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
634 func (ai *azureInstance) RemoteUser() string {
635 return ai.provider.azconfig.AdminUsername
638 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
639 return cloud.ErrNotImplemented