14745: Improves azure driver exported var comment
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/lib/cloud"
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24         "github.com/Azure/azure-sdk-for-go/storage"
25         "github.com/Azure/go-autorest/autorest"
26         "github.com/Azure/go-autorest/autorest/azure"
27         "github.com/Azure/go-autorest/autorest/azure/auth"
28         "github.com/Azure/go-autorest/autorest/to"
29         "github.com/jmcvetta/randutil"
30         "github.com/sirupsen/logrus"
31         "golang.org/x/crypto/ssh"
32 )
33
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
36
37 type azureInstanceSetConfig struct {
38         SubscriptionID               string
39         ClientID                     string
40         ClientSecret                 string
41         TenantID                     string
42         CloudEnvironment             string
43         ResourceGroup                string
44         Location                     string
45         Network                      string
46         Subnet                       string
47         StorageAccount               string
48         BlobContainer                string
49         DeleteDanglingResourcesAfter arvados.Duration
50 }
51
52 type virtualMachinesClientWrapper interface {
53         createOrUpdate(ctx context.Context,
54                 resourceGroupName string,
55                 VMName string,
56                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
57         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
58         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
59 }
60
61 type virtualMachinesClientImpl struct {
62         inner compute.VirtualMachinesClient
63 }
64
65 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
66         resourceGroupName string,
67         VMName string,
68         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
69
70         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
71         if err != nil {
72                 return compute.VirtualMachine{}, wrapAzureError(err)
73         }
74         future.WaitForCompletionRef(ctx, cl.inner.Client)
75         r, err := future.Result(cl.inner)
76         return r, wrapAzureError(err)
77 }
78
79 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
80         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
81         if err != nil {
82                 return nil, wrapAzureError(err)
83         }
84         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
85         return future.Response(), wrapAzureError(err)
86 }
87
88 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
89         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
90         return r, wrapAzureError(err)
91 }
92
93 type interfacesClientWrapper interface {
94         createOrUpdate(ctx context.Context,
95                 resourceGroupName string,
96                 networkInterfaceName string,
97                 parameters network.Interface) (result network.Interface, err error)
98         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
99         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
100 }
101
102 type interfacesClientImpl struct {
103         inner network.InterfacesClient
104 }
105
106 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
107         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
108         if err != nil {
109                 return nil, wrapAzureError(err)
110         }
111         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
112         return future.Response(), wrapAzureError(err)
113 }
114
115 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
116         resourceGroupName string,
117         networkInterfaceName string,
118         parameters network.Interface) (result network.Interface, err error) {
119
120         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
121         if err != nil {
122                 return network.Interface{}, wrapAzureError(err)
123         }
124         future.WaitForCompletionRef(ctx, cl.inner.Client)
125         r, err := future.Result(cl.inner)
126         return r, wrapAzureError(err)
127 }
128
129 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
130         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
131         return r, wrapAzureError(err)
132 }
133
134 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
135
136 type azureRateLimitError struct {
137         azure.RequestError
138         firstRetry time.Time
139 }
140
141 func (ar *azureRateLimitError) EarliestRetry() time.Time {
142         return ar.firstRetry
143 }
144
145 type azureQuotaError struct {
146         azure.RequestError
147 }
148
149 func (ar *azureQuotaError) IsQuotaError() bool {
150         return true
151 }
152
153 func wrapAzureError(err error) error {
154         de, ok := err.(autorest.DetailedError)
155         if !ok {
156                 return err
157         }
158         rq, ok := de.Original.(*azure.RequestError)
159         if !ok {
160                 return err
161         }
162         if rq.Response == nil {
163                 return err
164         }
165         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
166                 // API throttling
167                 ra := rq.Response.Header["Retry-After"][0]
168                 earliestRetry, parseErr := http.ParseTime(ra)
169                 if parseErr != nil {
170                         // Could not parse as a timestamp, must be number of seconds
171                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
172                         if parseErr == nil {
173                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
174                         } else {
175                                 // Couldn't make sense of retry-after,
176                                 // so set retry to 20 seconds
177                                 earliestRetry = time.Now().Add(20 * time.Second)
178                         }
179                 }
180                 return &azureRateLimitError{*rq, earliestRetry}
181         }
182         if rq.ServiceError == nil {
183                 return err
184         }
185         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
186                 return &azureQuotaError{*rq}
187         }
188         return err
189 }
190
191 type azureInstanceSet struct {
192         azconfig          azureInstanceSetConfig
193         vmClient          virtualMachinesClientWrapper
194         netClient         interfacesClientWrapper
195         storageAcctClient storageacct.AccountsClient
196         azureEnv          azure.Environment
197         interfaces        map[string]network.Interface
198         dispatcherID      string
199         namePrefix        string
200         ctx               context.Context
201         stopFunc          context.CancelFunc
202         stopWg            sync.WaitGroup
203         deleteNIC         chan string
204         deleteBlob        chan storage.Blob
205         logger            logrus.FieldLogger
206 }
207
208 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
209         azcfg := azureInstanceSetConfig{}
210         err = json.Unmarshal(config, &azcfg)
211         if err != nil {
212                 return nil, err
213         }
214
215         ap := azureInstanceSet{logger: logger}
216         err = ap.setup(azcfg, string(dispatcherID))
217         if err != nil {
218                 return nil, err
219         }
220         return &ap, nil
221 }
222
223 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
224         az.azconfig = azcfg
225         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
226         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
227         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
228
229         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
230         if err != nil {
231                 return err
232         }
233
234         authorizer, err := auth.ClientCredentialsConfig{
235                 ClientID:     az.azconfig.ClientID,
236                 ClientSecret: az.azconfig.ClientSecret,
237                 TenantID:     az.azconfig.TenantID,
238                 Resource:     az.azureEnv.ResourceManagerEndpoint,
239                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
240         }.Authorizer()
241         if err != nil {
242                 return err
243         }
244
245         vmClient.Authorizer = authorizer
246         netClient.Authorizer = authorizer
247         storageAcctClient.Authorizer = authorizer
248
249         az.vmClient = &virtualMachinesClientImpl{vmClient}
250         az.netClient = &interfacesClientImpl{netClient}
251         az.storageAcctClient = storageAcctClient
252
253         az.dispatcherID = dispatcherID
254         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
255
256         az.ctx, az.stopFunc = context.WithCancel(context.Background())
257         go func() {
258                 az.stopWg.Add(1)
259                 defer az.stopWg.Done()
260
261                 tk := time.NewTicker(5 * time.Minute)
262                 for {
263                         select {
264                         case <-az.ctx.Done():
265                                 tk.Stop()
266                                 return
267                         case <-tk.C:
268                                 az.manageBlobs()
269                         }
270                 }
271         }()
272
273         az.deleteNIC = make(chan string)
274         az.deleteBlob = make(chan storage.Blob)
275
276         for i := 0; i < 4; i++ {
277                 go func() {
278                         for {
279                                 nicname, ok := <-az.deleteNIC
280                                 if !ok {
281                                         return
282                                 }
283                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
284                                 if delerr != nil {
285                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
286                                 } else {
287                                         az.logger.Printf("Deleted NIC %v", nicname)
288                                 }
289                         }
290                 }()
291                 go func() {
292                         for {
293                                 blob, ok := <-az.deleteBlob
294                                 if !ok {
295                                         return
296                                 }
297                                 err := blob.Delete(nil)
298                                 if err != nil {
299                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
300                                 } else {
301                                         az.logger.Printf("Deleted blob %v", blob.Name)
302                                 }
303                         }
304                 }()
305         }
306
307         return nil
308 }
309
310 func (az *azureInstanceSet) Create(
311         instanceType arvados.InstanceType,
312         imageID cloud.ImageID,
313         newTags cloud.InstanceTags,
314         publicKey ssh.PublicKey) (cloud.Instance, error) {
315
316         az.stopWg.Add(1)
317         defer az.stopWg.Done()
318
319         if len(newTags["node-token"]) == 0 {
320                 return nil, fmt.Errorf("Must provide tag 'node-token'")
321         }
322
323         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
324         if err != nil {
325                 return nil, err
326         }
327
328         name = az.namePrefix + name
329
330         timestamp := time.Now().Format(time.RFC3339Nano)
331
332         tags := make(map[string]*string)
333         tags["created-at"] = &timestamp
334         for k, v := range newTags {
335                 newstr := v
336                 tags["dispatch-"+k] = &newstr
337         }
338
339         tags["dispatch-instance-type"] = &instanceType.Name
340
341         nicParameters := network.Interface{
342                 Location: &az.azconfig.Location,
343                 Tags:     tags,
344                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
345                         IPConfigurations: &[]network.InterfaceIPConfiguration{
346                                 network.InterfaceIPConfiguration{
347                                         Name: to.StringPtr("ip1"),
348                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
349                                                 Subnet: &network.Subnet{
350                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
351                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
352                                                                 az.azconfig.SubscriptionID,
353                                                                 az.azconfig.ResourceGroup,
354                                                                 az.azconfig.Network,
355                                                                 az.azconfig.Subnet)),
356                                                 },
357                                                 PrivateIPAllocationMethod: network.Dynamic,
358                                         },
359                                 },
360                         },
361                 },
362         }
363         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
364         if err != nil {
365                 return nil, wrapAzureError(err)
366         }
367
368         instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
369                 az.azconfig.StorageAccount,
370                 az.azureEnv.StorageEndpointSuffix,
371                 az.azconfig.BlobContainer,
372                 name)
373
374         customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
375 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
376
377         vmParameters := compute.VirtualMachine{
378                 Location: &az.azconfig.Location,
379                 Tags:     tags,
380                 VirtualMachineProperties: &compute.VirtualMachineProperties{
381                         HardwareProfile: &compute.HardwareProfile{
382                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
383                         },
384                         StorageProfile: &compute.StorageProfile{
385                                 OsDisk: &compute.OSDisk{
386                                         OsType:       compute.Linux,
387                                         Name:         to.StringPtr(name + "-os"),
388                                         CreateOption: compute.FromImage,
389                                         Image: &compute.VirtualHardDisk{
390                                                 URI: to.StringPtr(string(imageID)),
391                                         },
392                                         Vhd: &compute.VirtualHardDisk{
393                                                 URI: &instanceVhd,
394                                         },
395                                 },
396                         },
397                         NetworkProfile: &compute.NetworkProfile{
398                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
399                                         compute.NetworkInterfaceReference{
400                                                 ID: nic.ID,
401                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
402                                                         Primary: to.BoolPtr(true),
403                                                 },
404                                         },
405                                 },
406                         },
407                         OsProfile: &compute.OSProfile{
408                                 ComputerName:  &name,
409                                 AdminUsername: to.StringPtr("crunch"),
410                                 LinuxConfiguration: &compute.LinuxConfiguration{
411                                         DisablePasswordAuthentication: to.BoolPtr(true),
412                                         SSH: &compute.SSHConfiguration{
413                                                 PublicKeys: &[]compute.SSHPublicKey{
414                                                         compute.SSHPublicKey{
415                                                                 Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
416                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
417                                                         },
418                                                 },
419                                         },
420                                 },
421                                 CustomData: &customData,
422                         },
423                 },
424         }
425
426         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
427         if err != nil {
428                 return nil, wrapAzureError(err)
429         }
430
431         return &azureInstance{
432                 provider: az,
433                 nic:      nic,
434                 vm:       vm,
435         }, nil
436 }
437
438 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
439         az.stopWg.Add(1)
440         defer az.stopWg.Done()
441
442         interfaces, err := az.manageNics()
443         if err != nil {
444                 return nil, err
445         }
446
447         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
448         if err != nil {
449                 return nil, wrapAzureError(err)
450         }
451
452         instances := make([]cloud.Instance, 0)
453
454         for ; result.NotDone(); err = result.Next() {
455                 if err != nil {
456                         return nil, wrapAzureError(err)
457                 }
458                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
459                         instances = append(instances, &azureInstance{
460                                 provider: az,
461                                 vm:       result.Value(),
462                                 nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
463                 }
464         }
465         return instances, nil
466 }
467
468 // ManageNics returns a list of Azure network interface resources.
469 // Also performs garbage collection of NICs which have "namePrefix", are
470 // not associated with a virtual machine and have a "create-at" time
471 // more than DeleteDanglingResourcesAfter (to prevent racing and
472 // deleting newly created NICs) in the past are deleted.
473 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
474         az.stopWg.Add(1)
475         defer az.stopWg.Done()
476
477         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
478         if err != nil {
479                 return nil, wrapAzureError(err)
480         }
481
482         interfaces := make(map[string]network.Interface)
483
484         timestamp := time.Now()
485         for ; result.NotDone(); err = result.Next() {
486                 if err != nil {
487                         az.logger.WithError(err).Warnf("Error listing nics")
488                         return interfaces, nil
489                 }
490                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
491                         if result.Value().VirtualMachine != nil {
492                                 interfaces[*result.Value().ID] = result.Value()
493                         } else {
494                                 if result.Value().Tags["created-at"] != nil {
495                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
496                                         if err == nil {
497                                                 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
498                                                         az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
499                                                         az.deleteNIC <- *result.Value().Name
500                                                 }
501                                         }
502                                 }
503                         }
504                 }
505         }
506         return interfaces, nil
507 }
508
509 // ManageBlobs garbage collects blobs (VM disk images) in the
510 // configured storage account container.  It will delete blobs which
511 // have "namePrefix", are "available" (which means they are not
512 // leased to a VM) and haven't been modified for
513 // DeleteDanglingResourcesAfter seconds.
514 func (az *azureInstanceSet) manageBlobs() {
515         result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
516         if err != nil {
517                 az.logger.WithError(err).Warn("Couldn't get account keys")
518                 return
519         }
520
521         key1 := *(*result.Keys)[0].Value
522         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
523         if err != nil {
524                 az.logger.WithError(err).Warn("Couldn't make client")
525                 return
526         }
527
528         blobsvc := client.GetBlobService()
529         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
530
531         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
532         timestamp := time.Now()
533
534         for {
535                 response, err := blobcont.ListBlobs(page)
536                 if err != nil {
537                         az.logger.WithError(err).Warn("Error listing blobs")
538                         return
539                 }
540                 for _, b := range response.Blobs {
541                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
542                         if b.Properties.BlobType == storage.BlobTypePage &&
543                                 b.Properties.LeaseState == "available" &&
544                                 b.Properties.LeaseStatus == "unlocked" &&
545                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
546
547                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
548                                 az.deleteBlob <- b
549                         }
550                 }
551                 if response.NextMarker != "" {
552                         page.Marker = response.NextMarker
553                 } else {
554                         break
555                 }
556         }
557 }
558
559 func (az *azureInstanceSet) Stop() {
560         az.stopFunc()
561         az.stopWg.Wait()
562         close(az.deleteNIC)
563         close(az.deleteBlob)
564 }
565
566 type azureInstance struct {
567         provider *azureInstanceSet
568         nic      network.Interface
569         vm       compute.VirtualMachine
570 }
571
572 func (ai *azureInstance) ID() cloud.InstanceID {
573         return cloud.InstanceID(*ai.vm.ID)
574 }
575
576 func (ai *azureInstance) String() string {
577         return *ai.vm.Name
578 }
579
580 func (ai *azureInstance) ProviderType() string {
581         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
582 }
583
584 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
585         ai.provider.stopWg.Add(1)
586         defer ai.provider.stopWg.Done()
587
588         tags := make(map[string]*string)
589
590         for k, v := range ai.vm.Tags {
591                 if !strings.HasPrefix(k, "dispatch-") {
592                         tags[k] = v
593                 }
594         }
595         for k, v := range newTags {
596                 newstr := v
597                 tags["dispatch-"+k] = &newstr
598         }
599
600         vmParameters := compute.VirtualMachine{
601                 Location: &ai.provider.azconfig.Location,
602                 Tags:     tags,
603         }
604         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
605         if err != nil {
606                 return wrapAzureError(err)
607         }
608         ai.vm = vm
609
610         return nil
611 }
612
613 func (ai *azureInstance) Tags() cloud.InstanceTags {
614         tags := make(map[string]string)
615
616         for k, v := range ai.vm.Tags {
617                 if strings.HasPrefix(k, "dispatch-") {
618                         tags[k[9:]] = *v
619                 }
620         }
621
622         return tags
623 }
624
625 func (ai *azureInstance) Destroy() error {
626         ai.provider.stopWg.Add(1)
627         defer ai.provider.stopWg.Done()
628
629         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
630         return wrapAzureError(err)
631 }
632
633 func (ai *azureInstance) Address() string {
634         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
635 }
636
637 func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
638         ai.provider.stopWg.Add(1)
639         defer ai.provider.stopWg.Done()
640
641         remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
642
643         tags := ai.Tags()
644
645         tg := tags["ssh-pubkey-fingerprint"]
646         if tg != "" {
647                 if remoteFingerprint == tg {
648                         return nil
649                 }
650                 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
651         }
652
653         nodetokenTag := tags["node-token"]
654         if nodetokenTag == "" {
655                 return fmt.Errorf("Missing node token tag")
656         }
657
658         sess, err := client.NewSession()
659         if err != nil {
660                 return err
661         }
662
663         nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
664         if err != nil {
665                 return err
666         }
667
668         nodetoken := strings.TrimSpace(string(nodetokenbytes))
669
670         expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
671
672         if strings.TrimSpace(nodetoken) != expectedToken {
673                 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
674         }
675
676         sess, err = client.NewSession()
677         if err != nil {
678                 return err
679         }
680
681         keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
682         if err != nil {
683                 return err
684         }
685
686         sp := strings.Split(string(keyfingerprintbytes), " ")
687
688         if remoteFingerprint != sp[1] {
689                 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
690         }
691
692         tags["ssh-pubkey-fingerprint"] = sp[1]
693         delete(tags, "node-token")
694         ai.SetTags(tags)
695         return nil
696 }