1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23 "github.com/Azure/azure-sdk-for-go/storage"
24 "github.com/Azure/go-autorest/autorest"
25 "github.com/Azure/go-autorest/autorest/azure"
26 "github.com/Azure/go-autorest/autorest/azure/auth"
27 "github.com/Azure/go-autorest/autorest/to"
28 "github.com/jmcvetta/randutil"
29 "github.com/mitchellh/mapstructure"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 type AzureInstanceSetConfig struct {
35 SubscriptionID string `mapstructure:"subscription_id"`
36 ClientID string `mapstructure:"key"`
37 ClientSecret string `mapstructure:"secret"`
38 TenantID string `mapstructure:"tenant_id"`
39 CloudEnv string `mapstructure:"cloud_environment"`
40 ResourceGroup string `mapstructure:"resource_group"`
41 Location string `mapstructure:"region"`
42 Network string `mapstructure:"network"`
43 Subnet string `mapstructure:"subnet"`
44 StorageAccount string `mapstructure:"storage_account"`
45 BlobContainer string `mapstructure:"blob_container"`
46 Image string `mapstructure:"image"`
47 DeleteDanglingResourcesAfter float64 `mapstructure:"delete_dangling_resources_after"`
50 type VirtualMachinesClientWrapper interface {
51 CreateOrUpdate(ctx context.Context,
52 resourceGroupName string,
54 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
55 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
56 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
59 type VirtualMachinesClientImpl struct {
60 inner compute.VirtualMachinesClient
63 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
64 resourceGroupName string,
66 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
68 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
70 return compute.VirtualMachine{}, WrapAzureError(err)
72 future.WaitForCompletionRef(ctx, cl.inner.Client)
73 r, err := future.Result(cl.inner)
74 return r, WrapAzureError(err)
77 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
78 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
80 return nil, WrapAzureError(err)
82 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
83 return future.Response(), WrapAzureError(err)
86 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
87 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
88 return r, WrapAzureError(err)
91 type InterfacesClientWrapper interface {
92 CreateOrUpdate(ctx context.Context,
93 resourceGroupName string,
94 networkInterfaceName string,
95 parameters network.Interface) (result network.Interface, err error)
96 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
97 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
100 type InterfacesClientImpl struct {
101 inner network.InterfacesClient
104 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
105 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
107 return nil, WrapAzureError(err)
109 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
110 return future.Response(), WrapAzureError(err)
113 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
114 resourceGroupName string,
115 networkInterfaceName string,
116 parameters network.Interface) (result network.Interface, err error) {
118 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
120 return network.Interface{}, WrapAzureError(err)
122 future.WaitForCompletionRef(ctx, cl.inner.Client)
123 r, err := future.Result(cl.inner)
124 return r, WrapAzureError(err)
127 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
128 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
129 return r, WrapAzureError(err)
132 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
134 type AzureRateLimitError struct {
136 earliestRetry time.Time
139 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
140 return ar.earliestRetry
143 type AzureQuotaError struct {
147 func (ar *AzureQuotaError) IsQuotaError() bool {
151 func WrapAzureError(err error) error {
152 de, ok := err.(autorest.DetailedError)
156 rq, ok := de.Original.(*azure.RequestError)
160 if rq.Response == nil {
163 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
165 ra := rq.Response.Header["Retry-After"][0]
166 earliestRetry, parseErr := http.ParseTime(ra)
168 // Could not parse as a timestamp, must be number of seconds
169 dur, parseErr := strconv.ParseInt(ra, 10, 64)
171 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
173 // Couldn't make sense of retry-after,
174 // so set retry to 20 seconds
175 earliestRetry = time.Now().Add(20 * time.Second)
178 return &AzureRateLimitError{*rq, earliestRetry}
180 if rq.ServiceError == nil {
183 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
184 return &AzureQuotaError{*rq}
189 type AzureInstanceSet struct {
190 azconfig AzureInstanceSetConfig
191 vmClient VirtualMachinesClientWrapper
192 netClient InterfacesClientWrapper
193 storageAcctClient storageacct.AccountsClient
194 azureEnv azure.Environment
195 interfaces map[string]network.Interface
199 stopFunc context.CancelFunc
200 stopWg sync.WaitGroup
201 deleteNIC chan string
202 deleteBlob chan storage.Blob
203 logger logrus.FieldLogger
206 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
207 azcfg := AzureInstanceSetConfig{}
208 if err = mapstructure.Decode(config, &azcfg); err != nil {
211 ap := AzureInstanceSet{logger: logger}
212 err = ap.setup(azcfg, string(dispatcherID))
219 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
221 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
222 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
223 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
225 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
230 authorizer, err := auth.ClientCredentialsConfig{
231 ClientID: az.azconfig.ClientID,
232 ClientSecret: az.azconfig.ClientSecret,
233 TenantID: az.azconfig.TenantID,
234 Resource: az.azureEnv.ResourceManagerEndpoint,
235 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
241 vmClient.Authorizer = authorizer
242 netClient.Authorizer = authorizer
243 storageAcctClient.Authorizer = authorizer
245 az.vmClient = &VirtualMachinesClientImpl{vmClient}
246 az.netClient = &InterfacesClientImpl{netClient}
247 az.storageAcctClient = storageAcctClient
249 az.dispatcherID = dispatcherID
250 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
252 az.ctx, az.stopFunc = context.WithCancel(context.Background())
255 defer az.stopWg.Done()
257 tk := time.NewTicker(5 * time.Minute)
260 case <-az.ctx.Done():
269 az.deleteNIC = make(chan string)
270 az.deleteBlob = make(chan storage.Blob)
272 for i := 0; i < 4; i += 1 {
275 nicname, ok := <-az.deleteNIC
279 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
281 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
283 az.logger.Printf("Deleted NIC %v", nicname)
289 blob, ok := <-az.deleteBlob
293 err := blob.Delete(nil)
295 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
297 az.logger.Printf("Deleted blob %v", blob.Name)
306 func (az *AzureInstanceSet) Create(
307 instanceType arvados.InstanceType,
308 imageId cloud.ImageID,
309 newTags cloud.InstanceTags,
310 publicKey ssh.PublicKey) (cloud.Instance, error) {
313 defer az.stopWg.Done()
315 if len(newTags["node-token"]) == 0 {
316 return nil, fmt.Errorf("Must provide tag 'node-token'")
319 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
324 name = az.namePrefix + name
326 timestamp := time.Now().Format(time.RFC3339Nano)
328 tags := make(map[string]*string)
329 tags["created-at"] = ×tamp
330 for k, v := range newTags {
332 tags["dispatch-"+k] = &newstr
335 tags["dispatch-instance-type"] = &instanceType.Name
337 nicParameters := network.Interface{
338 Location: &az.azconfig.Location,
340 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
341 IPConfigurations: &[]network.InterfaceIPConfiguration{
342 network.InterfaceIPConfiguration{
343 Name: to.StringPtr("ip1"),
344 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
345 Subnet: &network.Subnet{
346 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
347 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
348 az.azconfig.SubscriptionID,
349 az.azconfig.ResourceGroup,
351 az.azconfig.Subnet)),
353 PrivateIPAllocationMethod: network.Dynamic,
359 nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
361 return nil, WrapAzureError(err)
364 instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
365 az.azconfig.StorageAccount,
366 az.azureEnv.StorageEndpointSuffix,
367 az.azconfig.BlobContainer,
370 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
371 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
373 vmParameters := compute.VirtualMachine{
374 Location: &az.azconfig.Location,
376 VirtualMachineProperties: &compute.VirtualMachineProperties{
377 HardwareProfile: &compute.HardwareProfile{
378 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
380 StorageProfile: &compute.StorageProfile{
381 OsDisk: &compute.OSDisk{
382 OsType: compute.Linux,
383 Name: to.StringPtr(name + "-os"),
384 CreateOption: compute.FromImage,
385 Image: &compute.VirtualHardDisk{
386 URI: to.StringPtr(string(imageId)),
388 Vhd: &compute.VirtualHardDisk{
393 NetworkProfile: &compute.NetworkProfile{
394 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
395 compute.NetworkInterfaceReference{
397 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
398 Primary: to.BoolPtr(true),
403 OsProfile: &compute.OSProfile{
405 AdminUsername: to.StringPtr("crunch"),
406 LinuxConfiguration: &compute.LinuxConfiguration{
407 DisablePasswordAuthentication: to.BoolPtr(true),
408 SSH: &compute.SSHConfiguration{
409 PublicKeys: &[]compute.SSHPublicKey{
410 compute.SSHPublicKey{
411 Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
412 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
417 CustomData: &customData,
422 vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
424 return nil, WrapAzureError(err)
427 return &AzureInstance{
434 func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
436 defer az.stopWg.Done()
438 interfaces, err := az.ManageNics()
443 result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
445 return nil, WrapAzureError(err)
448 instances := make([]cloud.Instance, 0)
450 for ; result.NotDone(); err = result.Next() {
452 return nil, WrapAzureError(err)
454 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
455 instances = append(instances, &AzureInstance{
458 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
461 return instances, nil
464 // ManageNics returns a list of Azure network interface resources.
465 // Also performs garbage collection of NICs which have "namePrefix", are
466 // not associated with a virtual machine and have a "create-at" time
467 // more than DeleteDanglingResourcesAfter (to prevent racing and
468 // deleting newly created NICs) in the past are deleted.
469 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
471 defer az.stopWg.Done()
473 result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
475 return nil, WrapAzureError(err)
478 interfaces := make(map[string]network.Interface)
480 timestamp := time.Now()
481 for ; result.NotDone(); err = result.Next() {
483 az.logger.WithError(err).Warnf("Error listing nics")
484 return interfaces, nil
486 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
487 if result.Value().VirtualMachine != nil {
488 interfaces[*result.Value().ID] = result.Value()
490 if result.Value().Tags["created-at"] != nil {
491 created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
493 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
494 az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
495 az.deleteNIC <- *result.Value().Name
502 return interfaces, nil
505 // ManageBlobs garbage collects blobs (VM disk images) in the
506 // configured storage account container. It will delete blobs which
507 // have "namePrefix", are "available" (which means they are not
508 // leased to a VM) and haven't been modified for
509 // DeleteDanglingResourcesAfter seconds.
510 func (az *AzureInstanceSet) ManageBlobs() {
511 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
513 az.logger.WithError(err).Warn("Couldn't get account keys")
517 key1 := *(*result.Keys)[0].Value
518 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
520 az.logger.WithError(err).Warn("Couldn't make client")
524 blobsvc := client.GetBlobService()
525 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
527 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
528 timestamp := time.Now()
531 response, err := blobcont.ListBlobs(page)
533 az.logger.WithError(err).Warn("Error listing blobs")
536 for _, b := range response.Blobs {
537 age := timestamp.Sub(time.Time(b.Properties.LastModified))
538 if b.Properties.BlobType == storage.BlobTypePage &&
539 b.Properties.LeaseState == "available" &&
540 b.Properties.LeaseStatus == "unlocked" &&
541 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
543 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
547 if response.NextMarker != "" {
548 page.Marker = response.NextMarker
555 func (az *AzureInstanceSet) Stop() {
562 type AzureInstance struct {
563 provider *AzureInstanceSet
564 nic network.Interface
565 vm compute.VirtualMachine
568 func (ai *AzureInstance) ID() cloud.InstanceID {
569 return cloud.InstanceID(*ai.vm.ID)
572 func (ai *AzureInstance) String() string {
576 func (ai *AzureInstance) ProviderType() string {
577 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
580 func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
581 ai.provider.stopWg.Add(1)
582 defer ai.provider.stopWg.Done()
584 tags := make(map[string]*string)
586 for k, v := range ai.vm.Tags {
587 if !strings.HasPrefix(k, "dispatch-") {
591 for k, v := range newTags {
593 tags["dispatch-"+k] = &newstr
596 vmParameters := compute.VirtualMachine{
597 Location: &ai.provider.azconfig.Location,
600 vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
602 return WrapAzureError(err)
609 func (ai *AzureInstance) Tags() cloud.InstanceTags {
610 tags := make(map[string]string)
612 for k, v := range ai.vm.Tags {
613 if strings.HasPrefix(k, "dispatch-") {
621 func (ai *AzureInstance) Destroy() error {
622 ai.provider.stopWg.Add(1)
623 defer ai.provider.stopWg.Done()
625 _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
626 return WrapAzureError(err)
629 func (ai *AzureInstance) Address() string {
630 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
633 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
634 ai.provider.stopWg.Add(1)
635 defer ai.provider.stopWg.Done()
637 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
641 tg := tags["ssh-pubkey-fingerprint"]
643 if remoteFingerprint == tg {
646 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
650 nodetokenTag := tags["node-token"]
651 if nodetokenTag == "" {
652 return fmt.Errorf("Missing node token tag")
655 sess, err := client.NewSession()
660 nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
665 nodetoken := strings.TrimSpace(string(nodetokenbytes))
667 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
669 if strings.TrimSpace(nodetoken) != expectedToken {
670 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
673 sess, err = client.NewSession()
678 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
683 sp := strings.Split(string(keyfingerprintbytes), " ")
685 if remoteFingerprint != sp[1] {
686 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
689 tags["ssh-pubkey-fingerprint"] = sp[1]
690 delete(tags, "node-token")