1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23 "github.com/Azure/azure-sdk-for-go/storage"
24 "github.com/Azure/go-autorest/autorest"
25 "github.com/Azure/go-autorest/autorest/azure"
26 "github.com/Azure/go-autorest/autorest/azure/auth"
27 "github.com/Azure/go-autorest/autorest/to"
28 "github.com/jmcvetta/randutil"
29 "github.com/mitchellh/mapstructure"
30 "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/ssh"
34 type AzureInstanceSetConfig struct {
39 CloudEnvironment string
46 DeleteDanglingResourcesAfter arvados.Duration
49 type VirtualMachinesClientWrapper interface {
50 CreateOrUpdate(ctx context.Context,
51 resourceGroupName string,
53 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
54 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
55 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
58 type VirtualMachinesClientImpl struct {
59 inner compute.VirtualMachinesClient
62 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
63 resourceGroupName string,
65 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
67 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
69 return compute.VirtualMachine{}, WrapAzureError(err)
71 future.WaitForCompletionRef(ctx, cl.inner.Client)
72 r, err := future.Result(cl.inner)
73 return r, WrapAzureError(err)
76 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
77 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
79 return nil, WrapAzureError(err)
81 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
82 return future.Response(), WrapAzureError(err)
85 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
86 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
87 return r, WrapAzureError(err)
90 type InterfacesClientWrapper interface {
91 CreateOrUpdate(ctx context.Context,
92 resourceGroupName string,
93 networkInterfaceName string,
94 parameters network.Interface) (result network.Interface, err error)
95 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
96 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
99 type InterfacesClientImpl struct {
100 inner network.InterfacesClient
103 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
104 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
106 return nil, WrapAzureError(err)
108 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
109 return future.Response(), WrapAzureError(err)
112 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
113 resourceGroupName string,
114 networkInterfaceName string,
115 parameters network.Interface) (result network.Interface, err error) {
117 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
119 return network.Interface{}, WrapAzureError(err)
121 future.WaitForCompletionRef(ctx, cl.inner.Client)
122 r, err := future.Result(cl.inner)
123 return r, WrapAzureError(err)
126 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
127 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
128 return r, WrapAzureError(err)
131 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
133 type AzureRateLimitError struct {
135 earliestRetry time.Time
138 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
139 return ar.earliestRetry
142 type AzureQuotaError struct {
146 func (ar *AzureQuotaError) IsQuotaError() bool {
150 func WrapAzureError(err error) error {
151 de, ok := err.(autorest.DetailedError)
155 rq, ok := de.Original.(*azure.RequestError)
159 if rq.Response == nil {
162 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
164 ra := rq.Response.Header["Retry-After"][0]
165 earliestRetry, parseErr := http.ParseTime(ra)
167 // Could not parse as a timestamp, must be number of seconds
168 dur, parseErr := strconv.ParseInt(ra, 10, 64)
170 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
172 // Couldn't make sense of retry-after,
173 // so set retry to 20 seconds
174 earliestRetry = time.Now().Add(20 * time.Second)
177 return &AzureRateLimitError{*rq, earliestRetry}
179 if rq.ServiceError == nil {
182 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
183 return &AzureQuotaError{*rq}
188 type AzureInstanceSet struct {
189 azconfig AzureInstanceSetConfig
190 vmClient VirtualMachinesClientWrapper
191 netClient InterfacesClientWrapper
192 storageAcctClient storageacct.AccountsClient
193 azureEnv azure.Environment
194 interfaces map[string]network.Interface
198 stopFunc context.CancelFunc
199 stopWg sync.WaitGroup
200 deleteNIC chan string
201 deleteBlob chan storage.Blob
202 logger logrus.FieldLogger
205 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
206 azcfg := AzureInstanceSetConfig{}
208 decoderConfig := mapstructure.DecoderConfig{
209 DecodeHook: arvados.DurationMapStructureDecodeHook(),
212 decoder, err := mapstructure.NewDecoder(&decoderConfig)
216 if err = decoder.Decode(config); err != nil {
220 ap := AzureInstanceSet{logger: logger}
221 err = ap.setup(azcfg, string(dispatcherID))
228 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
230 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
231 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
232 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
234 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
239 authorizer, err := auth.ClientCredentialsConfig{
240 ClientID: az.azconfig.ClientID,
241 ClientSecret: az.azconfig.ClientSecret,
242 TenantID: az.azconfig.TenantID,
243 Resource: az.azureEnv.ResourceManagerEndpoint,
244 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
250 vmClient.Authorizer = authorizer
251 netClient.Authorizer = authorizer
252 storageAcctClient.Authorizer = authorizer
254 az.vmClient = &VirtualMachinesClientImpl{vmClient}
255 az.netClient = &InterfacesClientImpl{netClient}
256 az.storageAcctClient = storageAcctClient
258 az.dispatcherID = dispatcherID
259 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
261 az.ctx, az.stopFunc = context.WithCancel(context.Background())
264 defer az.stopWg.Done()
266 tk := time.NewTicker(5 * time.Minute)
269 case <-az.ctx.Done():
278 az.deleteNIC = make(chan string)
279 az.deleteBlob = make(chan storage.Blob)
281 for i := 0; i < 4; i++ {
284 nicname, ok := <-az.deleteNIC
288 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
290 az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
292 az.logger.Printf("Deleted NIC %v", nicname)
298 blob, ok := <-az.deleteBlob
302 err := blob.Delete(nil)
304 az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
306 az.logger.Printf("Deleted blob %v", blob.Name)
315 func (az *AzureInstanceSet) Create(
316 instanceType arvados.InstanceType,
317 imageID cloud.ImageID,
318 newTags cloud.InstanceTags,
319 publicKey ssh.PublicKey) (cloud.Instance, error) {
322 defer az.stopWg.Done()
324 if len(newTags["node-token"]) == 0 {
325 return nil, fmt.Errorf("Must provide tag 'node-token'")
328 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
333 name = az.namePrefix + name
335 timestamp := time.Now().Format(time.RFC3339Nano)
337 tags := make(map[string]*string)
338 tags["created-at"] = ×tamp
339 for k, v := range newTags {
341 tags["dispatch-"+k] = &newstr
344 tags["dispatch-instance-type"] = &instanceType.Name
346 nicParameters := network.Interface{
347 Location: &az.azconfig.Location,
349 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
350 IPConfigurations: &[]network.InterfaceIPConfiguration{
351 network.InterfaceIPConfiguration{
352 Name: to.StringPtr("ip1"),
353 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
354 Subnet: &network.Subnet{
355 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
356 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
357 az.azconfig.SubscriptionID,
358 az.azconfig.ResourceGroup,
360 az.azconfig.Subnet)),
362 PrivateIPAllocationMethod: network.Dynamic,
368 nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
370 return nil, WrapAzureError(err)
373 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
374 az.azconfig.StorageAccount,
375 az.azureEnv.StorageEndpointSuffix,
376 az.azconfig.BlobContainer,
379 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
380 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
382 vmParameters := compute.VirtualMachine{
383 Location: &az.azconfig.Location,
385 VirtualMachineProperties: &compute.VirtualMachineProperties{
386 HardwareProfile: &compute.HardwareProfile{
387 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
389 StorageProfile: &compute.StorageProfile{
390 OsDisk: &compute.OSDisk{
391 OsType: compute.Linux,
392 Name: to.StringPtr(name + "-os"),
393 CreateOption: compute.FromImage,
394 Image: &compute.VirtualHardDisk{
395 URI: to.StringPtr(string(imageID)),
397 Vhd: &compute.VirtualHardDisk{
402 NetworkProfile: &compute.NetworkProfile{
403 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
404 compute.NetworkInterfaceReference{
406 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
407 Primary: to.BoolPtr(true),
412 OsProfile: &compute.OSProfile{
414 AdminUsername: to.StringPtr("crunch"),
415 LinuxConfiguration: &compute.LinuxConfiguration{
416 DisablePasswordAuthentication: to.BoolPtr(true),
417 SSH: &compute.SSHConfiguration{
418 PublicKeys: &[]compute.SSHPublicKey{
419 compute.SSHPublicKey{
420 Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
421 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
426 CustomData: &customData,
431 vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
433 return nil, WrapAzureError(err)
436 return &AzureInstance{
443 func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
445 defer az.stopWg.Done()
447 interfaces, err := az.ManageNics()
452 result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
454 return nil, WrapAzureError(err)
457 instances := make([]cloud.Instance, 0)
459 for ; result.NotDone(); err = result.Next() {
461 return nil, WrapAzureError(err)
463 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
464 instances = append(instances, &AzureInstance{
467 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
470 return instances, nil
473 // ManageNics returns a list of Azure network interface resources.
474 // Also performs garbage collection of NICs which have "namePrefix", are
475 // not associated with a virtual machine and have a "create-at" time
476 // more than DeleteDanglingResourcesAfter (to prevent racing and
477 // deleting newly created NICs) in the past are deleted.
478 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
480 defer az.stopWg.Done()
482 result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
484 return nil, WrapAzureError(err)
487 interfaces := make(map[string]network.Interface)
489 timestamp := time.Now()
490 for ; result.NotDone(); err = result.Next() {
492 az.logger.WithError(err).Warnf("Error listing nics")
493 return interfaces, nil
495 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
496 if result.Value().VirtualMachine != nil {
497 interfaces[*result.Value().ID] = result.Value()
499 if result.Value().Tags["created-at"] != nil {
500 createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
502 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
503 az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
504 az.deleteNIC <- *result.Value().Name
511 return interfaces, nil
514 // ManageBlobs garbage collects blobs (VM disk images) in the
515 // configured storage account container. It will delete blobs which
516 // have "namePrefix", are "available" (which means they are not
517 // leased to a VM) and haven't been modified for
518 // DeleteDanglingResourcesAfter seconds.
519 func (az *AzureInstanceSet) ManageBlobs() {
520 result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
522 az.logger.WithError(err).Warn("Couldn't get account keys")
526 key1 := *(*result.Keys)[0].Value
527 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
529 az.logger.WithError(err).Warn("Couldn't make client")
533 blobsvc := client.GetBlobService()
534 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
536 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
537 timestamp := time.Now()
540 response, err := blobcont.ListBlobs(page)
542 az.logger.WithError(err).Warn("Error listing blobs")
545 for _, b := range response.Blobs {
546 age := timestamp.Sub(time.Time(b.Properties.LastModified))
547 if b.Properties.BlobType == storage.BlobTypePage &&
548 b.Properties.LeaseState == "available" &&
549 b.Properties.LeaseStatus == "unlocked" &&
550 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
552 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
556 if response.NextMarker != "" {
557 page.Marker = response.NextMarker
564 func (az *AzureInstanceSet) Stop() {
571 type AzureInstance struct {
572 provider *AzureInstanceSet
573 nic network.Interface
574 vm compute.VirtualMachine
577 func (ai *AzureInstance) ID() cloud.InstanceID {
578 return cloud.InstanceID(*ai.vm.ID)
581 func (ai *AzureInstance) String() string {
585 func (ai *AzureInstance) ProviderType() string {
586 return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
589 func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
590 ai.provider.stopWg.Add(1)
591 defer ai.provider.stopWg.Done()
593 tags := make(map[string]*string)
595 for k, v := range ai.vm.Tags {
596 if !strings.HasPrefix(k, "dispatch-") {
600 for k, v := range newTags {
602 tags["dispatch-"+k] = &newstr
605 vmParameters := compute.VirtualMachine{
606 Location: &ai.provider.azconfig.Location,
609 vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
611 return WrapAzureError(err)
618 func (ai *AzureInstance) Tags() cloud.InstanceTags {
619 tags := make(map[string]string)
621 for k, v := range ai.vm.Tags {
622 if strings.HasPrefix(k, "dispatch-") {
630 func (ai *AzureInstance) Destroy() error {
631 ai.provider.stopWg.Add(1)
632 defer ai.provider.stopWg.Done()
634 _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
635 return WrapAzureError(err)
638 func (ai *AzureInstance) Address() string {
639 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
642 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
643 ai.provider.stopWg.Add(1)
644 defer ai.provider.stopWg.Done()
646 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
650 tg := tags["ssh-pubkey-fingerprint"]
652 if remoteFingerprint == tg {
655 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
659 nodetokenTag := tags["node-token"]
660 if nodetokenTag == "" {
661 return fmt.Errorf("Missing node token tag")
664 sess, err := client.NewSession()
669 nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
674 nodetoken := strings.TrimSpace(string(nodetokenbytes))
676 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
678 if strings.TrimSpace(nodetoken) != expectedToken {
679 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
682 sess, err = client.NewSession()
687 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
692 sp := strings.Split(string(keyfingerprintbytes), " ")
694 if remoteFingerprint != sp[1] {
695 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
698 tags["ssh-pubkey-fingerprint"] = sp[1]
699 delete(tags, "node-token")