1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/lib/cloud"
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24 "github.com/Azure/azure-sdk-for-go/storage"
25 "github.com/Azure/go-autorest/autorest"
26 "github.com/Azure/go-autorest/autorest/azure"
27 "github.com/Azure/go-autorest/autorest/azure/auth"
28 "github.com/Azure/go-autorest/autorest/to"
29 "github.com/jmcvetta/randutil"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37 type azureInstanceSetConfig struct {
42 CloudEnvironment string
49 DeleteDanglingResourcesAfter arvados.Duration
53 type virtualMachinesClientWrapper interface {
54 createOrUpdate(ctx context.Context,
55 resourceGroupName string,
57 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
58 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
59 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
62 type virtualMachinesClientImpl struct {
63 inner compute.VirtualMachinesClient
66 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
67 resourceGroupName string,
69 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
71 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
73 return compute.VirtualMachine{}, wrapAzureError(err)
75 future.WaitForCompletionRef(ctx, cl.inner.Client)
76 r, err := future.Result(cl.inner)
77 return r, wrapAzureError(err)
80 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
81 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
83 return nil, wrapAzureError(err)
85 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
86 return future.Response(), wrapAzureError(err)
89 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
90 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
91 return r, wrapAzureError(err)
94 type interfacesClientWrapper interface {
95 createOrUpdate(ctx context.Context,
96 resourceGroupName string,
97 networkInterfaceName string,
98 parameters network.Interface) (result network.Interface, err error)
99 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
100 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
103 type interfacesClientImpl struct {
104 inner network.InterfacesClient
107 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
108 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
110 return nil, wrapAzureError(err)
112 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
113 return future.Response(), wrapAzureError(err)
116 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
117 resourceGroupName string,
118 networkInterfaceName string,
119 parameters network.Interface) (result network.Interface, err error) {
121 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
123 return network.Interface{}, wrapAzureError(err)
125 future.WaitForCompletionRef(ctx, cl.inner.Client)
126 r, err := future.Result(cl.inner)
127 return r, wrapAzureError(err)
130 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
131 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
132 return r, wrapAzureError(err)
135 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
137 type azureRateLimitError struct {
142 func (ar *azureRateLimitError) EarliestRetry() time.Time {
146 type azureQuotaError struct {
150 func (ar *azureQuotaError) IsQuotaError() bool {
154 func wrapAzureError(err error) error {
155 de, ok := err.(autorest.DetailedError)
159 rq, ok := de.Original.(*azure.RequestError)
163 if rq.Response == nil {
166 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
168 ra := rq.Response.Header["Retry-After"][0]
169 earliestRetry, parseErr := http.ParseTime(ra)
171 // Could not parse as a timestamp, must be number of seconds
172 dur, parseErr := strconv.ParseInt(ra, 10, 64)
174 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
176 // Couldn't make sense of retry-after,
177 // so set retry to 20 seconds
178 earliestRetry = time.Now().Add(20 * time.Second)
181 return &azureRateLimitError{*rq, earliestRetry}
183 if rq.ServiceError == nil {
186 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
187 return &azureQuotaError{*rq}
192 type azureInstanceSet struct {
193 azconfig azureInstanceSetConfig
194 vmClient virtualMachinesClientWrapper
195 netClient interfacesClientWrapper
196 storageAcctClient storageacct.AccountsClient
197 azureEnv azure.Environment
198 interfaces map[string]network.Interface
202 stopFunc context.CancelFunc
203 stopWg sync.WaitGroup
204 deleteNIC chan string
205 deleteBlob chan storage.Blob
206 logger logrus.FieldLogger
209 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
210 azcfg := azureInstanceSetConfig{}
211 err = json.Unmarshal(config, &azcfg)
216 ap := azureInstanceSet{logger: logger}
217 err = ap.setup(azcfg, string(dispatcherID))
224 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
226 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
227 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
228 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
230 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
235 authorizer, err := auth.ClientCredentialsConfig{
236 ClientID: az.azconfig.ClientID,
237 ClientSecret: az.azconfig.ClientSecret,
238 TenantID: az.azconfig.TenantID,
239 Resource: az.azureEnv.ResourceManagerEndpoint,
240 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
246 vmClient.Authorizer = authorizer
247 netClient.Authorizer = authorizer
248 storageAcctClient.Authorizer = authorizer
250 az.vmClient = &virtualMachinesClientImpl{vmClient}
251 az.netClient = &interfacesClientImpl{netClient}
252 az.storageAcctClient = storageAcctClient
254 az.dispatcherID = dispatcherID
255 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
257 az.ctx, az.stopFunc = context.WithCancel(context.Background())
260 defer az.stopWg.Done()
262 tk := time.NewTicker(5 * time.Minute)
265 case <-az.ctx.Done():
274 az.deleteNIC = make(chan string)
275 az.deleteBlob = make(chan storage.Blob)
277 for i := 0; i < 4; i++ {
280 nicname, ok := <-az.deleteNIC
284 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
286 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
288 az.logger.Printf("Deleted NIC %v", nicname)
294 blob, ok := <-az.deleteBlob
298 err := blob.Delete(nil)
300 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
302 az.logger.Printf("Deleted blob %v", blob.Name)
311 func (az *azureInstanceSet) Create(
312 instanceType arvados.InstanceType,
313 imageID cloud.ImageID,
314 newTags cloud.InstanceTags,
315 publicKey ssh.PublicKey) (cloud.Instance, error) {
318 defer az.stopWg.Done()
320 if len(newTags["node-token"]) == 0 {
321 return nil, fmt.Errorf("Must provide tag 'node-token'")
324 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
329 name = az.namePrefix + name
331 timestamp := time.Now().Format(time.RFC3339Nano)
333 tags := make(map[string]*string)
334 tags["created-at"] = ×tamp
335 for k, v := range newTags {
337 tags["dispatch-"+k] = &newstr
340 tags["dispatch-instance-type"] = &instanceType.Name
342 nicParameters := network.Interface{
343 Location: &az.azconfig.Location,
345 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
346 IPConfigurations: &[]network.InterfaceIPConfiguration{
347 network.InterfaceIPConfiguration{
348 Name: to.StringPtr("ip1"),
349 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
350 Subnet: &network.Subnet{
351 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
352 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
353 az.azconfig.SubscriptionID,
354 az.azconfig.ResourceGroup,
356 az.azconfig.Subnet)),
358 PrivateIPAllocationMethod: network.Dynamic,
364 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
366 return nil, wrapAzureError(err)
369 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
370 az.azconfig.StorageAccount,
371 az.azureEnv.StorageEndpointSuffix,
372 az.azconfig.BlobContainer,
375 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
376 echo '%s-%s' > '/home/%s/node-token'`, name, newTags["node-token"], az.azconfig.AdminUsername)))
378 vmParameters := compute.VirtualMachine{
379 Location: &az.azconfig.Location,
381 VirtualMachineProperties: &compute.VirtualMachineProperties{
382 HardwareProfile: &compute.HardwareProfile{
383 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
385 StorageProfile: &compute.StorageProfile{
386 OsDisk: &compute.OSDisk{
387 OsType: compute.Linux,
388 Name: to.StringPtr(name + "-os"),
389 CreateOption: compute.FromImage,
390 Image: &compute.VirtualHardDisk{
391 URI: to.StringPtr(string(imageID)),
393 Vhd: &compute.VirtualHardDisk{
398 NetworkProfile: &compute.NetworkProfile{
399 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
400 compute.NetworkInterfaceReference{
402 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
403 Primary: to.BoolPtr(true),
408 OsProfile: &compute.OSProfile{
410 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
411 LinuxConfiguration: &compute.LinuxConfiguration{
412 DisablePasswordAuthentication: to.BoolPtr(true),
413 SSH: &compute.SSHConfiguration{
414 PublicKeys: &[]compute.SSHPublicKey{
416 Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
417 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
422 CustomData: &customData,
427 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
429 return nil, wrapAzureError(err)
432 return &azureInstance{
439 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
441 defer az.stopWg.Done()
443 interfaces, err := az.manageNics()
448 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
450 return nil, wrapAzureError(err)
453 instances := make([]cloud.Instance, 0)
455 for ; result.NotDone(); err = result.Next() {
457 return nil, wrapAzureError(err)
459 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
460 instances = append(instances, &azureInstance{
463 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
466 return instances, nil
469 // ManageNics returns a list of Azure network interface resources.
470 // Also performs garbage collection of NICs which have "namePrefix", are
471 // not associated with a virtual machine and have a "create-at" time
472 // more than DeleteDanglingResourcesAfter (to prevent racing and
473 // deleting newly created NICs) in the past are deleted.
474 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
476 defer az.stopWg.Done()
478 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
480 return nil, wrapAzureError(err)
483 interfaces := make(map[string]network.Interface)
485 timestamp := time.Now()
486 for ; result.NotDone(); err = result.Next() {
488 az.logger.WithError(err).Warnf("Error listing nics")
489 return interfaces, nil
491 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
492 if result.Value().VirtualMachine != nil {
493 interfaces[*result.Value().ID] = result.Value()
495 if result.Value().Tags["created-at"] != nil {
496 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
498 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
499 az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
500 az.deleteNIC <- *result.Value().Name
507 return interfaces, nil
510 // ManageBlobs garbage collects blobs (VM disk images) in the
511 // configured storage account container. It will delete blobs which
512 // have "namePrefix", are "available" (which means they are not
513 // leased to a VM) and haven't been modified for
514 // DeleteDanglingResourcesAfter seconds.
515 func (az *azureInstanceSet) manageBlobs() {
516 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
518 az.logger.WithError(err).Warn("Couldn't get account keys")
522 key1 := *(*result.Keys)[0].Value
523 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
525 az.logger.WithError(err).Warn("Couldn't make client")
529 blobsvc := client.GetBlobService()
530 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
532 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
533 timestamp := time.Now()
536 response, err := blobcont.ListBlobs(page)
538 az.logger.WithError(err).Warn("Error listing blobs")
541 for _, b := range response.Blobs {
542 age := timestamp.Sub(time.Time(b.Properties.LastModified))
543 if b.Properties.BlobType == storage.BlobTypePage &&
544 b.Properties.LeaseState == "available" &&
545 b.Properties.LeaseStatus == "unlocked" &&
546 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
548 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
552 if response.NextMarker != "" {
553 page.Marker = response.NextMarker
560 func (az *azureInstanceSet) Stop() {
567 type azureInstance struct {
568 provider *azureInstanceSet
569 nic network.Interface
570 vm compute.VirtualMachine
573 func (ai *azureInstance) ID() cloud.InstanceID {
574 return cloud.InstanceID(*ai.vm.ID)
577 func (ai *azureInstance) String() string {
581 func (ai *azureInstance) ProviderType() string {
582 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
585 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
586 ai.provider.stopWg.Add(1)
587 defer ai.provider.stopWg.Done()
589 tags := make(map[string]*string)
591 for k, v := range ai.vm.Tags {
592 if !strings.HasPrefix(k, "dispatch-") {
596 for k, v := range newTags {
598 tags["dispatch-"+k] = &newstr
601 vmParameters := compute.VirtualMachine{
602 Location: &ai.provider.azconfig.Location,
605 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
607 return wrapAzureError(err)
614 func (ai *azureInstance) Tags() cloud.InstanceTags {
615 tags := make(map[string]string)
617 for k, v := range ai.vm.Tags {
618 if strings.HasPrefix(k, "dispatch-") {
626 func (ai *azureInstance) Destroy() error {
627 ai.provider.stopWg.Add(1)
628 defer ai.provider.stopWg.Done()
630 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
631 return wrapAzureError(err)
634 func (ai *azureInstance) Address() string {
635 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
638 func (ai *azureInstance) RemoteUser() string {
639 return ai.provider.azconfig.AdminUsername
642 func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
643 ai.provider.stopWg.Add(1)
644 defer ai.provider.stopWg.Done()
646 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
650 tg := tags["ssh-pubkey-fingerprint"]
652 if remoteFingerprint == tg {
655 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
658 nodetokenTag := tags["node-token"]
659 if nodetokenTag == "" {
660 return fmt.Errorf("Missing node token tag")
663 sess, err := client.NewSession()
668 nodetokenbytes, err := sess.Output("cat /home/" + ai.provider.azconfig.AdminUsername + "/node-token")
673 nodetoken := strings.TrimSpace(string(nodetokenbytes))
675 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
677 if strings.TrimSpace(nodetoken) != expectedToken {
678 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
681 sess, err = client.NewSession()
686 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
691 sp := strings.Split(string(keyfingerprintbytes), " ")
693 if remoteFingerprint != sp[1] {
694 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
697 tags["ssh-pubkey-fingerprint"] = sp[1]
698 delete(tags, "node-token")