1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/lib/cloud"
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24 "github.com/Azure/azure-sdk-for-go/storage"
25 "github.com/Azure/go-autorest/autorest"
26 "github.com/Azure/go-autorest/autorest/azure"
27 "github.com/Azure/go-autorest/autorest/azure/auth"
28 "github.com/Azure/go-autorest/autorest/to"
29 "github.com/jmcvetta/randutil"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37 type azureInstanceSetConfig struct {
42 CloudEnvironment string
49 DeleteDanglingResourcesAfter arvados.Duration
52 type virtualMachinesClientWrapper interface {
53 createOrUpdate(ctx context.Context,
54 resourceGroupName string,
56 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
57 delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
58 listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
61 type virtualMachinesClientImpl struct {
62 inner compute.VirtualMachinesClient
65 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
66 resourceGroupName string,
68 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
70 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
72 return compute.VirtualMachine{}, wrapAzureError(err)
74 future.WaitForCompletionRef(ctx, cl.inner.Client)
75 r, err := future.Result(cl.inner)
76 return r, wrapAzureError(err)
79 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
80 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
82 return nil, wrapAzureError(err)
84 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
85 return future.Response(), wrapAzureError(err)
88 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
89 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
90 return r, wrapAzureError(err)
93 type interfacesClientWrapper interface {
94 createOrUpdate(ctx context.Context,
95 resourceGroupName string,
96 networkInterfaceName string,
97 parameters network.Interface) (result network.Interface, err error)
98 delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
99 listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
102 type interfacesClientImpl struct {
103 inner network.InterfacesClient
106 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
107 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
109 return nil, wrapAzureError(err)
111 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
112 return future.Response(), wrapAzureError(err)
115 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
116 resourceGroupName string,
117 networkInterfaceName string,
118 parameters network.Interface) (result network.Interface, err error) {
120 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
122 return network.Interface{}, wrapAzureError(err)
124 future.WaitForCompletionRef(ctx, cl.inner.Client)
125 r, err := future.Result(cl.inner)
126 return r, wrapAzureError(err)
129 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
130 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
131 return r, wrapAzureError(err)
134 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
136 type azureRateLimitError struct {
141 func (ar *azureRateLimitError) EarliestRetry() time.Time {
145 type azureQuotaError struct {
149 func (ar *azureQuotaError) IsQuotaError() bool {
153 func wrapAzureError(err error) error {
154 de, ok := err.(autorest.DetailedError)
158 rq, ok := de.Original.(*azure.RequestError)
162 if rq.Response == nil {
165 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
167 ra := rq.Response.Header["Retry-After"][0]
168 earliestRetry, parseErr := http.ParseTime(ra)
170 // Could not parse as a timestamp, must be number of seconds
171 dur, parseErr := strconv.ParseInt(ra, 10, 64)
173 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
175 // Couldn't make sense of retry-after,
176 // so set retry to 20 seconds
177 earliestRetry = time.Now().Add(20 * time.Second)
180 return &azureRateLimitError{*rq, earliestRetry}
182 if rq.ServiceError == nil {
185 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
186 return &azureQuotaError{*rq}
191 type azureInstanceSet struct {
192 azconfig azureInstanceSetConfig
193 vmClient virtualMachinesClientWrapper
194 netClient interfacesClientWrapper
195 storageAcctClient storageacct.AccountsClient
196 azureEnv azure.Environment
197 interfaces map[string]network.Interface
201 stopFunc context.CancelFunc
202 stopWg sync.WaitGroup
203 deleteNIC chan string
204 deleteBlob chan storage.Blob
205 logger logrus.FieldLogger
208 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
209 azcfg := azureInstanceSetConfig{}
210 err = json.Unmarshal(config, &azcfg)
215 ap := azureInstanceSet{logger: logger}
216 err = ap.setup(azcfg, string(dispatcherID))
223 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
225 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
226 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
227 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
229 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
234 authorizer, err := auth.ClientCredentialsConfig{
235 ClientID: az.azconfig.ClientID,
236 ClientSecret: az.azconfig.ClientSecret,
237 TenantID: az.azconfig.TenantID,
238 Resource: az.azureEnv.ResourceManagerEndpoint,
239 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
245 vmClient.Authorizer = authorizer
246 netClient.Authorizer = authorizer
247 storageAcctClient.Authorizer = authorizer
249 az.vmClient = &virtualMachinesClientImpl{vmClient}
250 az.netClient = &interfacesClientImpl{netClient}
251 az.storageAcctClient = storageAcctClient
253 az.dispatcherID = dispatcherID
254 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
256 az.ctx, az.stopFunc = context.WithCancel(context.Background())
259 defer az.stopWg.Done()
261 tk := time.NewTicker(5 * time.Minute)
264 case <-az.ctx.Done():
273 az.deleteNIC = make(chan string)
274 az.deleteBlob = make(chan storage.Blob)
276 for i := 0; i < 4; i++ {
279 nicname, ok := <-az.deleteNIC
283 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
285 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
287 az.logger.Printf("Deleted NIC %v", nicname)
293 blob, ok := <-az.deleteBlob
297 err := blob.Delete(nil)
299 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
301 az.logger.Printf("Deleted blob %v", blob.Name)
310 func (az *azureInstanceSet) Create(
311 instanceType arvados.InstanceType,
312 imageID cloud.ImageID,
313 newTags cloud.InstanceTags,
314 publicKey ssh.PublicKey) (cloud.Instance, error) {
317 defer az.stopWg.Done()
319 if len(newTags["node-token"]) == 0 {
320 return nil, fmt.Errorf("Must provide tag 'node-token'")
323 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
328 name = az.namePrefix + name
330 timestamp := time.Now().Format(time.RFC3339Nano)
332 tags := make(map[string]*string)
333 tags["created-at"] = ×tamp
334 for k, v := range newTags {
336 tags["dispatch-"+k] = &newstr
339 tags["dispatch-instance-type"] = &instanceType.Name
341 nicParameters := network.Interface{
342 Location: &az.azconfig.Location,
344 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
345 IPConfigurations: &[]network.InterfaceIPConfiguration{
346 network.InterfaceIPConfiguration{
347 Name: to.StringPtr("ip1"),
348 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
349 Subnet: &network.Subnet{
350 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
351 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
352 az.azconfig.SubscriptionID,
353 az.azconfig.ResourceGroup,
355 az.azconfig.Subnet)),
357 PrivateIPAllocationMethod: network.Dynamic,
363 nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
365 return nil, wrapAzureError(err)
368 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
369 az.azconfig.StorageAccount,
370 az.azureEnv.StorageEndpointSuffix,
371 az.azconfig.BlobContainer,
374 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
375 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
377 vmParameters := compute.VirtualMachine{
378 Location: &az.azconfig.Location,
380 VirtualMachineProperties: &compute.VirtualMachineProperties{
381 HardwareProfile: &compute.HardwareProfile{
382 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
384 StorageProfile: &compute.StorageProfile{
385 OsDisk: &compute.OSDisk{
386 OsType: compute.Linux,
387 Name: to.StringPtr(name + "-os"),
388 CreateOption: compute.FromImage,
389 Image: &compute.VirtualHardDisk{
390 URI: to.StringPtr(string(imageID)),
392 Vhd: &compute.VirtualHardDisk{
397 NetworkProfile: &compute.NetworkProfile{
398 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
399 compute.NetworkInterfaceReference{
401 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
402 Primary: to.BoolPtr(true),
407 OsProfile: &compute.OSProfile{
409 AdminUsername: to.StringPtr("crunch"),
410 LinuxConfiguration: &compute.LinuxConfiguration{
411 DisablePasswordAuthentication: to.BoolPtr(true),
412 SSH: &compute.SSHConfiguration{
413 PublicKeys: &[]compute.SSHPublicKey{
414 compute.SSHPublicKey{
415 Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
416 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
421 CustomData: &customData,
426 vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
428 return nil, wrapAzureError(err)
431 return &azureInstance{
438 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
440 defer az.stopWg.Done()
442 interfaces, err := az.manageNics()
447 result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
449 return nil, wrapAzureError(err)
452 instances := make([]cloud.Instance, 0)
454 for ; result.NotDone(); err = result.Next() {
456 return nil, wrapAzureError(err)
458 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
459 instances = append(instances, &azureInstance{
462 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
465 return instances, nil
468 // ManageNics returns a list of Azure network interface resources.
469 // Also performs garbage collection of NICs which have "namePrefix", are
470 // not associated with a virtual machine and have a "create-at" time
471 // more than DeleteDanglingResourcesAfter (to prevent racing and
472 // deleting newly created NICs) in the past are deleted.
473 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
475 defer az.stopWg.Done()
477 result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
479 return nil, wrapAzureError(err)
482 interfaces := make(map[string]network.Interface)
484 timestamp := time.Now()
485 for ; result.NotDone(); err = result.Next() {
487 az.logger.WithError(err).Warnf("Error listing nics")
488 return interfaces, nil
490 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
491 if result.Value().VirtualMachine != nil {
492 interfaces[*result.Value().ID] = result.Value()
494 if result.Value().Tags["created-at"] != nil {
495 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
497 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
498 az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
499 az.deleteNIC <- *result.Value().Name
506 return interfaces, nil
509 // ManageBlobs garbage collects blobs (VM disk images) in the
510 // configured storage account container. It will delete blobs which
511 // have "namePrefix", are "available" (which means they are not
512 // leased to a VM) and haven't been modified for
513 // DeleteDanglingResourcesAfter seconds.
514 func (az *azureInstanceSet) manageBlobs() {
515 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
517 az.logger.WithError(err).Warn("Couldn't get account keys")
521 key1 := *(*result.Keys)[0].Value
522 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
524 az.logger.WithError(err).Warn("Couldn't make client")
528 blobsvc := client.GetBlobService()
529 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
531 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
532 timestamp := time.Now()
535 response, err := blobcont.ListBlobs(page)
537 az.logger.WithError(err).Warn("Error listing blobs")
540 for _, b := range response.Blobs {
541 age := timestamp.Sub(time.Time(b.Properties.LastModified))
542 if b.Properties.BlobType == storage.BlobTypePage &&
543 b.Properties.LeaseState == "available" &&
544 b.Properties.LeaseStatus == "unlocked" &&
545 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
547 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
551 if response.NextMarker != "" {
552 page.Marker = response.NextMarker
559 func (az *azureInstanceSet) Stop() {
566 type azureInstance struct {
567 provider *azureInstanceSet
568 nic network.Interface
569 vm compute.VirtualMachine
572 func (ai *azureInstance) ID() cloud.InstanceID {
573 return cloud.InstanceID(*ai.vm.ID)
576 func (ai *azureInstance) String() string {
580 func (ai *azureInstance) ProviderType() string {
581 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
584 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
585 ai.provider.stopWg.Add(1)
586 defer ai.provider.stopWg.Done()
588 tags := make(map[string]*string)
590 for k, v := range ai.vm.Tags {
591 if !strings.HasPrefix(k, "dispatch-") {
595 for k, v := range newTags {
597 tags["dispatch-"+k] = &newstr
600 vmParameters := compute.VirtualMachine{
601 Location: &ai.provider.azconfig.Location,
604 vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
606 return wrapAzureError(err)
613 func (ai *azureInstance) Tags() cloud.InstanceTags {
614 tags := make(map[string]string)
616 for k, v := range ai.vm.Tags {
617 if strings.HasPrefix(k, "dispatch-") {
625 func (ai *azureInstance) Destroy() error {
626 ai.provider.stopWg.Add(1)
627 defer ai.provider.stopWg.Done()
629 _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
630 return wrapAzureError(err)
633 func (ai *azureInstance) Address() string {
634 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
637 func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
638 ai.provider.stopWg.Add(1)
639 defer ai.provider.stopWg.Done()
641 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
645 tg := tags["ssh-pubkey-fingerprint"]
647 if remoteFingerprint == tg {
650 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
653 nodetokenTag := tags["node-token"]
654 if nodetokenTag == "" {
655 return fmt.Errorf("Missing node token tag")
658 sess, err := client.NewSession()
663 nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
668 nodetoken := strings.TrimSpace(string(nodetokenbytes))
670 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
672 if strings.TrimSpace(nodetoken) != expectedToken {
673 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
676 sess, err = client.NewSession()
681 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
686 sp := strings.Split(string(keyfingerprintbytes), " ")
688 if remoteFingerprint != sp[1] {
689 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
692 tags["ssh-pubkey-fingerprint"] = sp[1]
693 delete(tags, "node-token")