14745: Makes the azure driver its own package
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "fmt"
11         "net/http"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23         "github.com/Azure/azure-sdk-for-go/storage"
24         "github.com/Azure/go-autorest/autorest"
25         "github.com/Azure/go-autorest/autorest/azure"
26         "github.com/Azure/go-autorest/autorest/azure/auth"
27         "github.com/Azure/go-autorest/autorest/to"
28         "github.com/jmcvetta/randutil"
29         "github.com/mitchellh/mapstructure"
30         "github.com/sirupsen/logrus"
31         "golang.org/x/crypto/ssh"
32 )
33
34 type AzureInstanceSetConfig struct {
35         SubscriptionID               string  `mapstructure:"subscription_id"`
36         ClientID                     string  `mapstructure:"key"`
37         ClientSecret                 string  `mapstructure:"secret"`
38         TenantID                     string  `mapstructure:"tenant_id"`
39         CloudEnv                     string  `mapstructure:"cloud_environment"`
40         ResourceGroup                string  `mapstructure:"resource_group"`
41         Location                     string  `mapstructure:"region"`
42         Network                      string  `mapstructure:"network"`
43         Subnet                       string  `mapstructure:"subnet"`
44         StorageAccount               string  `mapstructure:"storage_account"`
45         BlobContainer                string  `mapstructure:"blob_container"`
46         Image                        string  `mapstructure:"image"`
47         DeleteDanglingResourcesAfter float64 `mapstructure:"delete_dangling_resources_after"`
48 }
49
50 type VirtualMachinesClientWrapper interface {
51         CreateOrUpdate(ctx context.Context,
52                 resourceGroupName string,
53                 VMName string,
54                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
55         Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
56         ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
57 }
58
59 type VirtualMachinesClientImpl struct {
60         inner compute.VirtualMachinesClient
61 }
62
63 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
64         resourceGroupName string,
65         VMName string,
66         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
67
68         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
69         if err != nil {
70                 return compute.VirtualMachine{}, WrapAzureError(err)
71         }
72         future.WaitForCompletionRef(ctx, cl.inner.Client)
73         r, err := future.Result(cl.inner)
74         return r, WrapAzureError(err)
75 }
76
77 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
78         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
79         if err != nil {
80                 return nil, WrapAzureError(err)
81         }
82         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
83         return future.Response(), WrapAzureError(err)
84 }
85
86 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
87         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
88         return r, WrapAzureError(err)
89 }
90
91 type InterfacesClientWrapper interface {
92         CreateOrUpdate(ctx context.Context,
93                 resourceGroupName string,
94                 networkInterfaceName string,
95                 parameters network.Interface) (result network.Interface, err error)
96         Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
97         ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
98 }
99
100 type InterfacesClientImpl struct {
101         inner network.InterfacesClient
102 }
103
104 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
105         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
106         if err != nil {
107                 return nil, WrapAzureError(err)
108         }
109         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
110         return future.Response(), WrapAzureError(err)
111 }
112
113 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
114         resourceGroupName string,
115         networkInterfaceName string,
116         parameters network.Interface) (result network.Interface, err error) {
117
118         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
119         if err != nil {
120                 return network.Interface{}, WrapAzureError(err)
121         }
122         future.WaitForCompletionRef(ctx, cl.inner.Client)
123         r, err := future.Result(cl.inner)
124         return r, WrapAzureError(err)
125 }
126
127 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
128         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
129         return r, WrapAzureError(err)
130 }
131
132 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
133
134 type AzureRateLimitError struct {
135         azure.RequestError
136         earliestRetry time.Time
137 }
138
139 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
140         return ar.earliestRetry
141 }
142
143 type AzureQuotaError struct {
144         azure.RequestError
145 }
146
147 func (ar *AzureQuotaError) IsQuotaError() bool {
148         return true
149 }
150
151 func WrapAzureError(err error) error {
152         de, ok := err.(autorest.DetailedError)
153         if !ok {
154                 return err
155         }
156         rq, ok := de.Original.(*azure.RequestError)
157         if !ok {
158                 return err
159         }
160         if rq.Response == nil {
161                 return err
162         }
163         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
164                 // API throttling
165                 ra := rq.Response.Header["Retry-After"][0]
166                 earliestRetry, parseErr := http.ParseTime(ra)
167                 if parseErr != nil {
168                         // Could not parse as a timestamp, must be number of seconds
169                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
170                         if parseErr == nil {
171                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
172                         } else {
173                                 // Couldn't make sense of retry-after,
174                                 // so set retry to 20 seconds
175                                 earliestRetry = time.Now().Add(20 * time.Second)
176                         }
177                 }
178                 return &AzureRateLimitError{*rq, earliestRetry}
179         }
180         if rq.ServiceError == nil {
181                 return err
182         }
183         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
184                 return &AzureQuotaError{*rq}
185         }
186         return err
187 }
188
189 type AzureInstanceSet struct {
190         azconfig          AzureInstanceSetConfig
191         vmClient          VirtualMachinesClientWrapper
192         netClient         InterfacesClientWrapper
193         storageAcctClient storageacct.AccountsClient
194         azureEnv          azure.Environment
195         interfaces        map[string]network.Interface
196         dispatcherID      string
197         namePrefix        string
198         ctx               context.Context
199         stopFunc          context.CancelFunc
200         stopWg            sync.WaitGroup
201         deleteNIC         chan string
202         deleteBlob        chan storage.Blob
203         logger            logrus.FieldLogger
204 }
205
206 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
207         azcfg := AzureInstanceSetConfig{}
208         if err = mapstructure.Decode(config, &azcfg); err != nil {
209                 return nil, err
210         }
211         ap := AzureInstanceSet{logger: logger}
212         err = ap.setup(azcfg, string(dispatcherID))
213         if err != nil {
214                 return nil, err
215         }
216         return &ap, nil
217 }
218
219 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
220         az.azconfig = azcfg
221         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
222         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
223         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
224
225         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
226         if err != nil {
227                 return err
228         }
229
230         authorizer, err := auth.ClientCredentialsConfig{
231                 ClientID:     az.azconfig.ClientID,
232                 ClientSecret: az.azconfig.ClientSecret,
233                 TenantID:     az.azconfig.TenantID,
234                 Resource:     az.azureEnv.ResourceManagerEndpoint,
235                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
236         }.Authorizer()
237         if err != nil {
238                 return err
239         }
240
241         vmClient.Authorizer = authorizer
242         netClient.Authorizer = authorizer
243         storageAcctClient.Authorizer = authorizer
244
245         az.vmClient = &VirtualMachinesClientImpl{vmClient}
246         az.netClient = &InterfacesClientImpl{netClient}
247         az.storageAcctClient = storageAcctClient
248
249         az.dispatcherID = dispatcherID
250         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
251
252         az.ctx, az.stopFunc = context.WithCancel(context.Background())
253         go func() {
254                 az.stopWg.Add(1)
255                 defer az.stopWg.Done()
256
257                 tk := time.NewTicker(5 * time.Minute)
258                 for {
259                         select {
260                         case <-az.ctx.Done():
261                                 tk.Stop()
262                                 return
263                         case <-tk.C:
264                                 az.ManageBlobs()
265                         }
266                 }
267         }()
268
269         az.deleteNIC = make(chan string)
270         az.deleteBlob = make(chan storage.Blob)
271
272         for i := 0; i < 4; i += 1 {
273                 go func() {
274                         for {
275                                 nicname, ok := <-az.deleteNIC
276                                 if !ok {
277                                         return
278                                 }
279                                 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
280                                 if delerr != nil {
281                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
282                                 } else {
283                                         az.logger.Printf("Deleted NIC %v", nicname)
284                                 }
285                         }
286                 }()
287                 go func() {
288                         for {
289                                 blob, ok := <-az.deleteBlob
290                                 if !ok {
291                                         return
292                                 }
293                                 err := blob.Delete(nil)
294                                 if err != nil {
295                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
296                                 } else {
297                                         az.logger.Printf("Deleted blob %v", blob.Name)
298                                 }
299                         }
300                 }()
301         }
302
303         return nil
304 }
305
306 func (az *AzureInstanceSet) Create(
307         instanceType arvados.InstanceType,
308         imageId cloud.ImageID,
309         newTags cloud.InstanceTags,
310         publicKey ssh.PublicKey) (cloud.Instance, error) {
311
312         az.stopWg.Add(1)
313         defer az.stopWg.Done()
314
315         if len(newTags["node-token"]) == 0 {
316                 return nil, fmt.Errorf("Must provide tag 'node-token'")
317         }
318
319         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
320         if err != nil {
321                 return nil, err
322         }
323
324         name = az.namePrefix + name
325
326         timestamp := time.Now().Format(time.RFC3339Nano)
327
328         tags := make(map[string]*string)
329         tags["created-at"] = &timestamp
330         for k, v := range newTags {
331                 newstr := v
332                 tags["dispatch-"+k] = &newstr
333         }
334
335         tags["dispatch-instance-type"] = &instanceType.Name
336
337         nicParameters := network.Interface{
338                 Location: &az.azconfig.Location,
339                 Tags:     tags,
340                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
341                         IPConfigurations: &[]network.InterfaceIPConfiguration{
342                                 network.InterfaceIPConfiguration{
343                                         Name: to.StringPtr("ip1"),
344                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
345                                                 Subnet: &network.Subnet{
346                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
347                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
348                                                                 az.azconfig.SubscriptionID,
349                                                                 az.azconfig.ResourceGroup,
350                                                                 az.azconfig.Network,
351                                                                 az.azconfig.Subnet)),
352                                                 },
353                                                 PrivateIPAllocationMethod: network.Dynamic,
354                                         },
355                                 },
356                         },
357                 },
358         }
359         nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
360         if err != nil {
361                 return nil, WrapAzureError(err)
362         }
363
364         instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
365                 az.azconfig.StorageAccount,
366                 az.azureEnv.StorageEndpointSuffix,
367                 az.azconfig.BlobContainer,
368                 name)
369
370         customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
371 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
372
373         vmParameters := compute.VirtualMachine{
374                 Location: &az.azconfig.Location,
375                 Tags:     tags,
376                 VirtualMachineProperties: &compute.VirtualMachineProperties{
377                         HardwareProfile: &compute.HardwareProfile{
378                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
379                         },
380                         StorageProfile: &compute.StorageProfile{
381                                 OsDisk: &compute.OSDisk{
382                                         OsType:       compute.Linux,
383                                         Name:         to.StringPtr(name + "-os"),
384                                         CreateOption: compute.FromImage,
385                                         Image: &compute.VirtualHardDisk{
386                                                 URI: to.StringPtr(string(imageId)),
387                                         },
388                                         Vhd: &compute.VirtualHardDisk{
389                                                 URI: &instance_vhd,
390                                         },
391                                 },
392                         },
393                         NetworkProfile: &compute.NetworkProfile{
394                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
395                                         compute.NetworkInterfaceReference{
396                                                 ID: nic.ID,
397                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
398                                                         Primary: to.BoolPtr(true),
399                                                 },
400                                         },
401                                 },
402                         },
403                         OsProfile: &compute.OSProfile{
404                                 ComputerName:  &name,
405                                 AdminUsername: to.StringPtr("crunch"),
406                                 LinuxConfiguration: &compute.LinuxConfiguration{
407                                         DisablePasswordAuthentication: to.BoolPtr(true),
408                                         SSH: &compute.SSHConfiguration{
409                                                 PublicKeys: &[]compute.SSHPublicKey{
410                                                         compute.SSHPublicKey{
411                                                                 Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
412                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
413                                                         },
414                                                 },
415                                         },
416                                 },
417                                 CustomData: &customData,
418                         },
419                 },
420         }
421
422         vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
423         if err != nil {
424                 return nil, WrapAzureError(err)
425         }
426
427         return &AzureInstance{
428                 provider: az,
429                 nic:      nic,
430                 vm:       vm,
431         }, nil
432 }
433
434 func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
435         az.stopWg.Add(1)
436         defer az.stopWg.Done()
437
438         interfaces, err := az.ManageNics()
439         if err != nil {
440                 return nil, err
441         }
442
443         result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
444         if err != nil {
445                 return nil, WrapAzureError(err)
446         }
447
448         instances := make([]cloud.Instance, 0)
449
450         for ; result.NotDone(); err = result.Next() {
451                 if err != nil {
452                         return nil, WrapAzureError(err)
453                 }
454                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
455                         instances = append(instances, &AzureInstance{
456                                 provider: az,
457                                 vm:       result.Value(),
458                                 nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
459                 }
460         }
461         return instances, nil
462 }
463
464 // ManageNics returns a list of Azure network interface resources.
465 // Also performs garbage collection of NICs which have "namePrefix", are
466 // not associated with a virtual machine and have a "create-at" time
467 // more than DeleteDanglingResourcesAfter (to prevent racing and
468 // deleting newly created NICs) in the past are deleted.
469 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
470         az.stopWg.Add(1)
471         defer az.stopWg.Done()
472
473         result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
474         if err != nil {
475                 return nil, WrapAzureError(err)
476         }
477
478         interfaces := make(map[string]network.Interface)
479
480         timestamp := time.Now()
481         for ; result.NotDone(); err = result.Next() {
482                 if err != nil {
483                         az.logger.WithError(err).Warnf("Error listing nics")
484                         return interfaces, nil
485                 }
486                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
487                         if result.Value().VirtualMachine != nil {
488                                 interfaces[*result.Value().ID] = result.Value()
489                         } else {
490                                 if result.Value().Tags["created-at"] != nil {
491                                         created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
492                                         if err == nil {
493                                                 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
494                                                         az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
495                                                         az.deleteNIC <- *result.Value().Name
496                                                 }
497                                         }
498                                 }
499                         }
500                 }
501         }
502         return interfaces, nil
503 }
504
505 // ManageBlobs garbage collects blobs (VM disk images) in the
506 // configured storage account container.  It will delete blobs which
507 // have "namePrefix", are "available" (which means they are not
508 // leased to a VM) and haven't been modified for
509 // DeleteDanglingResourcesAfter seconds.
510 func (az *AzureInstanceSet) ManageBlobs() {
511         result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
512         if err != nil {
513                 az.logger.WithError(err).Warn("Couldn't get account keys")
514                 return
515         }
516
517         key1 := *(*result.Keys)[0].Value
518         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
519         if err != nil {
520                 az.logger.WithError(err).Warn("Couldn't make client")
521                 return
522         }
523
524         blobsvc := client.GetBlobService()
525         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
526
527         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
528         timestamp := time.Now()
529
530         for {
531                 response, err := blobcont.ListBlobs(page)
532                 if err != nil {
533                         az.logger.WithError(err).Warn("Error listing blobs")
534                         return
535                 }
536                 for _, b := range response.Blobs {
537                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
538                         if b.Properties.BlobType == storage.BlobTypePage &&
539                                 b.Properties.LeaseState == "available" &&
540                                 b.Properties.LeaseStatus == "unlocked" &&
541                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
542
543                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
544                                 az.deleteBlob <- b
545                         }
546                 }
547                 if response.NextMarker != "" {
548                         page.Marker = response.NextMarker
549                 } else {
550                         break
551                 }
552         }
553 }
554
555 func (az *AzureInstanceSet) Stop() {
556         az.stopFunc()
557         az.stopWg.Wait()
558         close(az.deleteNIC)
559         close(az.deleteBlob)
560 }
561
562 type AzureInstance struct {
563         provider *AzureInstanceSet
564         nic      network.Interface
565         vm       compute.VirtualMachine
566 }
567
568 func (ai *AzureInstance) ID() cloud.InstanceID {
569         return cloud.InstanceID(*ai.vm.ID)
570 }
571
572 func (ai *AzureInstance) String() string {
573         return *ai.vm.Name
574 }
575
576 func (ai *AzureInstance) ProviderType() string {
577         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
578 }
579
580 func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
581         ai.provider.stopWg.Add(1)
582         defer ai.provider.stopWg.Done()
583
584         tags := make(map[string]*string)
585
586         for k, v := range ai.vm.Tags {
587                 if !strings.HasPrefix(k, "dispatch-") {
588                         tags[k] = v
589                 }
590         }
591         for k, v := range newTags {
592                 newstr := v
593                 tags["dispatch-"+k] = &newstr
594         }
595
596         vmParameters := compute.VirtualMachine{
597                 Location: &ai.provider.azconfig.Location,
598                 Tags:     tags,
599         }
600         vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
601         if err != nil {
602                 return WrapAzureError(err)
603         }
604         ai.vm = vm
605
606         return nil
607 }
608
609 func (ai *AzureInstance) Tags() cloud.InstanceTags {
610         tags := make(map[string]string)
611
612         for k, v := range ai.vm.Tags {
613                 if strings.HasPrefix(k, "dispatch-") {
614                         tags[k[9:]] = *v
615                 }
616         }
617
618         return tags
619 }
620
621 func (ai *AzureInstance) Destroy() error {
622         ai.provider.stopWg.Add(1)
623         defer ai.provider.stopWg.Done()
624
625         _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
626         return WrapAzureError(err)
627 }
628
629 func (ai *AzureInstance) Address() string {
630         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
631 }
632
633 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
634         ai.provider.stopWg.Add(1)
635         defer ai.provider.stopWg.Done()
636
637         remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
638
639         tags := ai.Tags()
640
641         tg := tags["ssh-pubkey-fingerprint"]
642         if tg != "" {
643                 if remoteFingerprint == tg {
644                         return nil
645                 } else {
646                         return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
647                 }
648         }
649
650         nodetokenTag := tags["node-token"]
651         if nodetokenTag == "" {
652                 return fmt.Errorf("Missing node token tag")
653         }
654
655         sess, err := client.NewSession()
656         if err != nil {
657                 return err
658         }
659
660         nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
661         if err != nil {
662                 return err
663         }
664
665         nodetoken := strings.TrimSpace(string(nodetokenbytes))
666
667         expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
668
669         if strings.TrimSpace(nodetoken) != expectedToken {
670                 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
671         }
672
673         sess, err = client.NewSession()
674         if err != nil {
675                 return err
676         }
677
678         keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
679         if err != nil {
680                 return err
681         }
682
683         sp := strings.Split(string(keyfingerprintbytes), " ")
684
685         if remoteFingerprint != sp[1] {
686                 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
687         }
688
689         tags["ssh-pubkey-fingerprint"] = sp[1]
690         delete(tags, "node-token")
691         ai.SetTags(tags)
692         return nil
693 }