1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23 "github.com/Azure/azure-sdk-for-go/storage"
24 "github.com/Azure/go-autorest/autorest"
25 "github.com/Azure/go-autorest/autorest/azure"
26 "github.com/Azure/go-autorest/autorest/azure/auth"
27 "github.com/Azure/go-autorest/autorest/to"
28 "github.com/jmcvetta/randutil"
29 "github.com/mitchellh/mapstructure"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 type AzureInstanceSetConfig struct {
35 SubscriptionID string "SubscriptionID"
36 ClientID string "ClientID"
37 ClientSecret string "ClientSecret"
38 TenantID string "TenantID"
39 CloudEnvironment string "CloudEnvironment"
40 ResourceGroup string "ResourceGroup"
41 Location string "Location"
42 Network string "Network"
43 Subnet string "Subnet"
44 StorageAccount string "StorageAccount"
45 BlobContainer string "BlobContainer"
46 DeleteDanglingResourcesAfter float64 "DeleteDanglingResourcesAfter"
49 type VirtualMachinesClientWrapper interface {
50 CreateOrUpdate(ctx context.Context,
51 resourceGroupName string,
53 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
54 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
55 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
58 type VirtualMachinesClientImpl struct {
59 inner compute.VirtualMachinesClient
62 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
63 resourceGroupName string,
65 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
67 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
69 return compute.VirtualMachine{}, WrapAzureError(err)
71 future.WaitForCompletionRef(ctx, cl.inner.Client)
72 r, err := future.Result(cl.inner)
73 return r, WrapAzureError(err)
76 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
77 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
79 return nil, WrapAzureError(err)
81 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
82 return future.Response(), WrapAzureError(err)
85 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
86 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
87 return r, WrapAzureError(err)
90 type InterfacesClientWrapper interface {
91 CreateOrUpdate(ctx context.Context,
92 resourceGroupName string,
93 networkInterfaceName string,
94 parameters network.Interface) (result network.Interface, err error)
95 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
96 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
99 type InterfacesClientImpl struct {
100 inner network.InterfacesClient
103 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
104 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
106 return nil, WrapAzureError(err)
108 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
109 return future.Response(), WrapAzureError(err)
112 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
113 resourceGroupName string,
114 networkInterfaceName string,
115 parameters network.Interface) (result network.Interface, err error) {
117 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
119 return network.Interface{}, WrapAzureError(err)
121 future.WaitForCompletionRef(ctx, cl.inner.Client)
122 r, err := future.Result(cl.inner)
123 return r, WrapAzureError(err)
126 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
127 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
128 return r, WrapAzureError(err)
131 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
133 type AzureRateLimitError struct {
135 earliestRetry time.Time
138 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
139 return ar.earliestRetry
142 type AzureQuotaError struct {
146 func (ar *AzureQuotaError) IsQuotaError() bool {
150 func WrapAzureError(err error) error {
151 de, ok := err.(autorest.DetailedError)
155 rq, ok := de.Original.(*azure.RequestError)
159 if rq.Response == nil {
162 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
164 ra := rq.Response.Header["Retry-After"][0]
165 earliestRetry, parseErr := http.ParseTime(ra)
167 // Could not parse as a timestamp, must be number of seconds
168 dur, parseErr := strconv.ParseInt(ra, 10, 64)
170 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
172 // Couldn't make sense of retry-after,
173 // so set retry to 20 seconds
174 earliestRetry = time.Now().Add(20 * time.Second)
177 return &AzureRateLimitError{*rq, earliestRetry}
179 if rq.ServiceError == nil {
182 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
183 return &AzureQuotaError{*rq}
188 type AzureInstanceSet struct {
189 azconfig AzureInstanceSetConfig
190 vmClient VirtualMachinesClientWrapper
191 netClient InterfacesClientWrapper
192 storageAcctClient storageacct.AccountsClient
193 azureEnv azure.Environment
194 interfaces map[string]network.Interface
198 stopFunc context.CancelFunc
199 stopWg sync.WaitGroup
200 deleteNIC chan string
201 deleteBlob chan storage.Blob
202 logger logrus.FieldLogger
205 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
206 azcfg := AzureInstanceSetConfig{}
207 if err = mapstructure.Decode(config, &azcfg); err != nil {
210 ap := AzureInstanceSet{logger: logger}
211 err = ap.setup(azcfg, string(dispatcherID))
218 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
220 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
221 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
222 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
224 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
229 authorizer, err := auth.ClientCredentialsConfig{
230 ClientID: az.azconfig.ClientID,
231 ClientSecret: az.azconfig.ClientSecret,
232 TenantID: az.azconfig.TenantID,
233 Resource: az.azureEnv.ResourceManagerEndpoint,
234 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
240 vmClient.Authorizer = authorizer
241 netClient.Authorizer = authorizer
242 storageAcctClient.Authorizer = authorizer
244 az.vmClient = &VirtualMachinesClientImpl{vmClient}
245 az.netClient = &InterfacesClientImpl{netClient}
246 az.storageAcctClient = storageAcctClient
248 az.dispatcherID = dispatcherID
249 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
251 az.ctx, az.stopFunc = context.WithCancel(context.Background())
254 defer az.stopWg.Done()
256 tk := time.NewTicker(5 * time.Minute)
259 case <-az.ctx.Done():
268 az.deleteNIC = make(chan string)
269 az.deleteBlob = make(chan storage.Blob)
271 for i := 0; i < 4; i++ {
274 nicname, ok := <-az.deleteNIC
278 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
280 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
282 az.logger.Printf("Deleted NIC %v", nicname)
288 blob, ok := <-az.deleteBlob
292 err := blob.Delete(nil)
294 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
296 az.logger.Printf("Deleted blob %v", blob.Name)
305 func (az *AzureInstanceSet) Create(
306 instanceType arvados.InstanceType,
307 imageID cloud.ImageID,
308 newTags cloud.InstanceTags,
309 publicKey ssh.PublicKey) (cloud.Instance, error) {
312 defer az.stopWg.Done()
314 if len(newTags["node-token"]) == 0 {
315 return nil, fmt.Errorf("Must provide tag 'node-token'")
318 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
323 name = az.namePrefix + name
325 timestamp := time.Now().Format(time.RFC3339Nano)
327 tags := make(map[string]*string)
328 tags["created-at"] = ×tamp
329 for k, v := range newTags {
331 tags["dispatch-"+k] = &newstr
334 tags["dispatch-instance-type"] = &instanceType.Name
336 nicParameters := network.Interface{
337 Location: &az.azconfig.Location,
339 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
340 IPConfigurations: &[]network.InterfaceIPConfiguration{
341 network.InterfaceIPConfiguration{
342 Name: to.StringPtr("ip1"),
343 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
344 Subnet: &network.Subnet{
345 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
346 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
347 az.azconfig.SubscriptionID,
348 az.azconfig.ResourceGroup,
350 az.azconfig.Subnet)),
352 PrivateIPAllocationMethod: network.Dynamic,
358 nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
360 return nil, WrapAzureError(err)
363 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
364 az.azconfig.StorageAccount,
365 az.azureEnv.StorageEndpointSuffix,
366 az.azconfig.BlobContainer,
369 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
370 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
372 vmParameters := compute.VirtualMachine{
373 Location: &az.azconfig.Location,
375 VirtualMachineProperties: &compute.VirtualMachineProperties{
376 HardwareProfile: &compute.HardwareProfile{
377 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
379 StorageProfile: &compute.StorageProfile{
380 OsDisk: &compute.OSDisk{
381 OsType: compute.Linux,
382 Name: to.StringPtr(name + "-os"),
383 CreateOption: compute.FromImage,
384 Image: &compute.VirtualHardDisk{
385 URI: to.StringPtr(string(imageID)),
387 Vhd: &compute.VirtualHardDisk{
392 NetworkProfile: &compute.NetworkProfile{
393 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
394 compute.NetworkInterfaceReference{
396 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
397 Primary: to.BoolPtr(true),
402 OsProfile: &compute.OSProfile{
404 AdminUsername: to.StringPtr("crunch"),
405 LinuxConfiguration: &compute.LinuxConfiguration{
406 DisablePasswordAuthentication: to.BoolPtr(true),
407 SSH: &compute.SSHConfiguration{
408 PublicKeys: &[]compute.SSHPublicKey{
409 compute.SSHPublicKey{
410 Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
411 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
416 CustomData: &customData,
421 vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
423 return nil, WrapAzureError(err)
426 return &AzureInstance{
433 func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
435 defer az.stopWg.Done()
437 interfaces, err := az.ManageNics()
442 result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
444 return nil, WrapAzureError(err)
447 instances := make([]cloud.Instance, 0)
449 for ; result.NotDone(); err = result.Next() {
451 return nil, WrapAzureError(err)
453 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
454 instances = append(instances, &AzureInstance{
457 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
460 return instances, nil
463 // ManageNics returns a list of Azure network interface resources.
464 // Also performs garbage collection of NICs which have "namePrefix", are
465 // not associated with a virtual machine and have a "create-at" time
466 // more than DeleteDanglingResourcesAfter (to prevent racing and
467 // deleting newly created NICs) in the past are deleted.
468 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
470 defer az.stopWg.Done()
472 result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
474 return nil, WrapAzureError(err)
477 interfaces := make(map[string]network.Interface)
479 timestamp := time.Now()
480 for ; result.NotDone(); err = result.Next() {
482 az.logger.WithError(err).Warnf("Error listing nics")
483 return interfaces, nil
485 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
486 if result.Value().VirtualMachine != nil {
487 interfaces[*result.Value().ID] = result.Value()
489 if result.Value().Tags["created-at"] != nil {
490 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
492 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
493 az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
494 az.deleteNIC <- *result.Value().Name
501 return interfaces, nil
504 // ManageBlobs garbage collects blobs (VM disk images) in the
505 // configured storage account container. It will delete blobs which
506 // have "namePrefix", are "available" (which means they are not
507 // leased to a VM) and haven't been modified for
508 // DeleteDanglingResourcesAfter seconds.
509 func (az *AzureInstanceSet) ManageBlobs() {
510 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
512 az.logger.WithError(err).Warn("Couldn't get account keys")
516 key1 := *(*result.Keys)[0].Value
517 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
519 az.logger.WithError(err).Warn("Couldn't make client")
523 blobsvc := client.GetBlobService()
524 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
526 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
527 timestamp := time.Now()
530 response, err := blobcont.ListBlobs(page)
532 az.logger.WithError(err).Warn("Error listing blobs")
535 for _, b := range response.Blobs {
536 age := timestamp.Sub(time.Time(b.Properties.LastModified))
537 if b.Properties.BlobType == storage.BlobTypePage &&
538 b.Properties.LeaseState == "available" &&
539 b.Properties.LeaseStatus == "unlocked" &&
540 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
542 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
546 if response.NextMarker != "" {
547 page.Marker = response.NextMarker
554 func (az *AzureInstanceSet) Stop() {
561 type AzureInstance struct {
562 provider *AzureInstanceSet
563 nic network.Interface
564 vm compute.VirtualMachine
567 func (ai *AzureInstance) ID() cloud.InstanceID {
568 return cloud.InstanceID(*ai.vm.ID)
571 func (ai *AzureInstance) String() string {
575 func (ai *AzureInstance) ProviderType() string {
576 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
579 func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
580 ai.provider.stopWg.Add(1)
581 defer ai.provider.stopWg.Done()
583 tags := make(map[string]*string)
585 for k, v := range ai.vm.Tags {
586 if !strings.HasPrefix(k, "dispatch-") {
590 for k, v := range newTags {
592 tags["dispatch-"+k] = &newstr
595 vmParameters := compute.VirtualMachine{
596 Location: &ai.provider.azconfig.Location,
599 vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
601 return WrapAzureError(err)
608 func (ai *AzureInstance) Tags() cloud.InstanceTags {
609 tags := make(map[string]string)
611 for k, v := range ai.vm.Tags {
612 if strings.HasPrefix(k, "dispatch-") {
620 func (ai *AzureInstance) Destroy() error {
621 ai.provider.stopWg.Add(1)
622 defer ai.provider.stopWg.Done()
624 _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
625 return WrapAzureError(err)
628 func (ai *AzureInstance) Address() string {
629 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
632 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
633 ai.provider.stopWg.Add(1)
634 defer ai.provider.stopWg.Done()
636 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
640 tg := tags["ssh-pubkey-fingerprint"]
642 if remoteFingerprint == tg {
645 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
649 nodetokenTag := tags["node-token"]
650 if nodetokenTag == "" {
651 return fmt.Errorf("Missing node token tag")
654 sess, err := client.NewSession()
659 nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
664 nodetoken := strings.TrimSpace(string(nodetokenbytes))
666 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
668 if strings.TrimSpace(nodetoken) != expectedToken {
669 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
672 sess, err = client.NewSession()
677 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
682 sp := strings.Split(string(keyfingerprintbytes), " ")
684 if remoteFingerprint != sp[1] {
685 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
688 tags["ssh-pubkey-fingerprint"] = sp[1]
689 delete(tags, "node-token")