14325: Add worker state diagram.
[arvados.git] / lib / cloud / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package cloud
6
7 import (
8         "context"
9         "encoding/base64"
10         "fmt"
11         "net/http"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
20         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
21         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
22         "github.com/Azure/azure-sdk-for-go/storage"
23         "github.com/Azure/go-autorest/autorest"
24         "github.com/Azure/go-autorest/autorest/azure"
25         "github.com/Azure/go-autorest/autorest/azure/auth"
26         "github.com/Azure/go-autorest/autorest/to"
27         "github.com/jmcvetta/randutil"
28         "github.com/mitchellh/mapstructure"
29         "github.com/sirupsen/logrus"
30         "golang.org/x/crypto/ssh"
31 )
32
33 type AzureInstanceSetConfig struct {
34         SubscriptionID               string  `mapstructure:"subscription_id"`
35         ClientID                     string  `mapstructure:"key"`
36         ClientSecret                 string  `mapstructure:"secret"`
37         TenantID                     string  `mapstructure:"tenant_id"`
38         CloudEnv                     string  `mapstructure:"cloud_environment"`
39         ResourceGroup                string  `mapstructure:"resource_group"`
40         Location                     string  `mapstructure:"region"`
41         Network                      string  `mapstructure:"network"`
42         Subnet                       string  `mapstructure:"subnet"`
43         StorageAccount               string  `mapstructure:"storage_account"`
44         BlobContainer                string  `mapstructure:"blob_container"`
45         Image                        string  `mapstructure:"image"`
46         DeleteDanglingResourcesAfter float64 `mapstructure:"delete_dangling_resources_after"`
47 }
48
49 type VirtualMachinesClientWrapper interface {
50         CreateOrUpdate(ctx context.Context,
51                 resourceGroupName string,
52                 VMName string,
53                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
54         Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
55         ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
56 }
57
58 type VirtualMachinesClientImpl struct {
59         inner compute.VirtualMachinesClient
60 }
61
62 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
63         resourceGroupName string,
64         VMName string,
65         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
66
67         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
68         if err != nil {
69                 return compute.VirtualMachine{}, WrapAzureError(err)
70         }
71         future.WaitForCompletionRef(ctx, cl.inner.Client)
72         r, err := future.Result(cl.inner)
73         return r, WrapAzureError(err)
74 }
75
76 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
77         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
78         if err != nil {
79                 return nil, WrapAzureError(err)
80         }
81         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
82         return future.Response(), WrapAzureError(err)
83 }
84
85 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
86         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
87         return r, WrapAzureError(err)
88 }
89
90 type InterfacesClientWrapper interface {
91         CreateOrUpdate(ctx context.Context,
92                 resourceGroupName string,
93                 networkInterfaceName string,
94                 parameters network.Interface) (result network.Interface, err error)
95         Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
96         ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
97 }
98
99 type InterfacesClientImpl struct {
100         inner network.InterfacesClient
101 }
102
103 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
104         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
105         if err != nil {
106                 return nil, WrapAzureError(err)
107         }
108         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
109         return future.Response(), WrapAzureError(err)
110 }
111
112 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
113         resourceGroupName string,
114         networkInterfaceName string,
115         parameters network.Interface) (result network.Interface, err error) {
116
117         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
118         if err != nil {
119                 return network.Interface{}, WrapAzureError(err)
120         }
121         future.WaitForCompletionRef(ctx, cl.inner.Client)
122         r, err := future.Result(cl.inner)
123         return r, WrapAzureError(err)
124 }
125
126 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
127         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
128         return r, WrapAzureError(err)
129 }
130
131 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
132
133 type AzureRateLimitError struct {
134         azure.RequestError
135         earliestRetry time.Time
136 }
137
138 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
139         return ar.earliestRetry
140 }
141
142 type AzureQuotaError struct {
143         azure.RequestError
144 }
145
146 func (ar *AzureQuotaError) IsQuotaError() bool {
147         return true
148 }
149
150 func WrapAzureError(err error) error {
151         de, ok := err.(autorest.DetailedError)
152         if !ok {
153                 return err
154         }
155         rq, ok := de.Original.(*azure.RequestError)
156         if !ok {
157                 return err
158         }
159         if rq.Response == nil {
160                 return err
161         }
162         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
163                 // API throttling
164                 ra := rq.Response.Header["Retry-After"][0]
165                 earliestRetry, parseErr := http.ParseTime(ra)
166                 if parseErr != nil {
167                         // Could not parse as a timestamp, must be number of seconds
168                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
169                         if parseErr == nil {
170                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
171                         } else {
172                                 // Couldn't make sense of retry-after,
173                                 // so set retry to 20 seconds
174                                 earliestRetry = time.Now().Add(20 * time.Second)
175                         }
176                 }
177                 return &AzureRateLimitError{*rq, earliestRetry}
178         }
179         if rq.ServiceError == nil {
180                 return err
181         }
182         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
183                 return &AzureQuotaError{*rq}
184         }
185         return err
186 }
187
188 type AzureInstanceSet struct {
189         azconfig          AzureInstanceSetConfig
190         vmClient          VirtualMachinesClientWrapper
191         netClient         InterfacesClientWrapper
192         storageAcctClient storageacct.AccountsClient
193         azureEnv          azure.Environment
194         interfaces        map[string]network.Interface
195         dispatcherID      string
196         namePrefix        string
197         ctx               context.Context
198         stopFunc          context.CancelFunc
199         stopWg            sync.WaitGroup
200         deleteNIC         chan string
201         deleteBlob        chan storage.Blob
202         logger            logrus.FieldLogger
203 }
204
205 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID, logger logrus.FieldLogger) (prv InstanceSet, err error) {
206         azcfg := AzureInstanceSetConfig{}
207         if err = mapstructure.Decode(config, &azcfg); err != nil {
208                 return nil, err
209         }
210         ap := AzureInstanceSet{logger: logger}
211         err = ap.setup(azcfg, string(dispatcherID))
212         if err != nil {
213                 return nil, err
214         }
215         return &ap, nil
216 }
217
218 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
219         az.azconfig = azcfg
220         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
221         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
222         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
223
224         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
225         if err != nil {
226                 return err
227         }
228
229         authorizer, err := auth.ClientCredentialsConfig{
230                 ClientID:     az.azconfig.ClientID,
231                 ClientSecret: az.azconfig.ClientSecret,
232                 TenantID:     az.azconfig.TenantID,
233                 Resource:     az.azureEnv.ResourceManagerEndpoint,
234                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
235         }.Authorizer()
236         if err != nil {
237                 return err
238         }
239
240         vmClient.Authorizer = authorizer
241         netClient.Authorizer = authorizer
242         storageAcctClient.Authorizer = authorizer
243
244         az.vmClient = &VirtualMachinesClientImpl{vmClient}
245         az.netClient = &InterfacesClientImpl{netClient}
246         az.storageAcctClient = storageAcctClient
247
248         az.dispatcherID = dispatcherID
249         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
250
251         az.ctx, az.stopFunc = context.WithCancel(context.Background())
252         go func() {
253                 az.stopWg.Add(1)
254                 defer az.stopWg.Done()
255
256                 tk := time.NewTicker(5 * time.Minute)
257                 for {
258                         select {
259                         case <-az.ctx.Done():
260                                 tk.Stop()
261                                 return
262                         case <-tk.C:
263                                 az.ManageBlobs()
264                         }
265                 }
266         }()
267
268         az.deleteNIC = make(chan string)
269         az.deleteBlob = make(chan storage.Blob)
270
271         for i := 0; i < 4; i += 1 {
272                 go func() {
273                         for {
274                                 nicname, ok := <-az.deleteNIC
275                                 if !ok {
276                                         return
277                                 }
278                                 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
279                                 if delerr != nil {
280                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
281                                 } else {
282                                         az.logger.Printf("Deleted NIC %v", nicname)
283                                 }
284                         }
285                 }()
286                 go func() {
287                         for {
288                                 blob, ok := <-az.deleteBlob
289                                 if !ok {
290                                         return
291                                 }
292                                 err := blob.Delete(nil)
293                                 if err != nil {
294                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
295                                 } else {
296                                         az.logger.Printf("Deleted blob %v", blob.Name)
297                                 }
298                         }
299                 }()
300         }
301
302         return nil
303 }
304
305 func (az *AzureInstanceSet) Create(
306         instanceType arvados.InstanceType,
307         imageId ImageID,
308         newTags InstanceTags,
309         publicKey ssh.PublicKey) (Instance, error) {
310
311         az.stopWg.Add(1)
312         defer az.stopWg.Done()
313
314         if len(newTags["node-token"]) == 0 {
315                 return nil, fmt.Errorf("Must provide tag 'node-token'")
316         }
317
318         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
319         if err != nil {
320                 return nil, err
321         }
322
323         name = az.namePrefix + name
324
325         timestamp := time.Now().Format(time.RFC3339Nano)
326
327         tags := make(map[string]*string)
328         tags["created-at"] = &timestamp
329         for k, v := range newTags {
330                 newstr := v
331                 tags["dispatch-"+k] = &newstr
332         }
333
334         tags["dispatch-instance-type"] = &instanceType.Name
335
336         nicParameters := network.Interface{
337                 Location: &az.azconfig.Location,
338                 Tags:     tags,
339                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
340                         IPConfigurations: &[]network.InterfaceIPConfiguration{
341                                 network.InterfaceIPConfiguration{
342                                         Name: to.StringPtr("ip1"),
343                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
344                                                 Subnet: &network.Subnet{
345                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
346                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
347                                                                 az.azconfig.SubscriptionID,
348                                                                 az.azconfig.ResourceGroup,
349                                                                 az.azconfig.Network,
350                                                                 az.azconfig.Subnet)),
351                                                 },
352                                                 PrivateIPAllocationMethod: network.Dynamic,
353                                         },
354                                 },
355                         },
356                 },
357         }
358         nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
359         if err != nil {
360                 return nil, WrapAzureError(err)
361         }
362
363         instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
364                 az.azconfig.StorageAccount,
365                 az.azureEnv.StorageEndpointSuffix,
366                 az.azconfig.BlobContainer,
367                 name)
368
369         customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
370 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
371
372         vmParameters := compute.VirtualMachine{
373                 Location: &az.azconfig.Location,
374                 Tags:     tags,
375                 VirtualMachineProperties: &compute.VirtualMachineProperties{
376                         HardwareProfile: &compute.HardwareProfile{
377                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
378                         },
379                         StorageProfile: &compute.StorageProfile{
380                                 OsDisk: &compute.OSDisk{
381                                         OsType:       compute.Linux,
382                                         Name:         to.StringPtr(name + "-os"),
383                                         CreateOption: compute.FromImage,
384                                         Image: &compute.VirtualHardDisk{
385                                                 URI: to.StringPtr(string(imageId)),
386                                         },
387                                         Vhd: &compute.VirtualHardDisk{
388                                                 URI: &instance_vhd,
389                                         },
390                                 },
391                         },
392                         NetworkProfile: &compute.NetworkProfile{
393                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
394                                         compute.NetworkInterfaceReference{
395                                                 ID: nic.ID,
396                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
397                                                         Primary: to.BoolPtr(true),
398                                                 },
399                                         },
400                                 },
401                         },
402                         OsProfile: &compute.OSProfile{
403                                 ComputerName:  &name,
404                                 AdminUsername: to.StringPtr("crunch"),
405                                 LinuxConfiguration: &compute.LinuxConfiguration{
406                                         DisablePasswordAuthentication: to.BoolPtr(true),
407                                         SSH: &compute.SSHConfiguration{
408                                                 PublicKeys: &[]compute.SSHPublicKey{
409                                                         compute.SSHPublicKey{
410                                                                 Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
411                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
412                                                         },
413                                                 },
414                                         },
415                                 },
416                                 CustomData: &customData,
417                         },
418                 },
419         }
420
421         vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
422         if err != nil {
423                 return nil, WrapAzureError(err)
424         }
425
426         return &AzureInstance{
427                 provider: az,
428                 nic:      nic,
429                 vm:       vm,
430         }, nil
431 }
432
433 func (az *AzureInstanceSet) Instances(InstanceTags) ([]Instance, error) {
434         az.stopWg.Add(1)
435         defer az.stopWg.Done()
436
437         interfaces, err := az.ManageNics()
438         if err != nil {
439                 return nil, err
440         }
441
442         result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
443         if err != nil {
444                 return nil, WrapAzureError(err)
445         }
446
447         instances := make([]Instance, 0)
448
449         for ; result.NotDone(); err = result.Next() {
450                 if err != nil {
451                         return nil, WrapAzureError(err)
452                 }
453                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
454                         instances = append(instances, &AzureInstance{
455                                 provider: az,
456                                 vm:       result.Value(),
457                                 nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
458                 }
459         }
460         return instances, nil
461 }
462
463 // ManageNics returns a list of Azure network interface resources.
464 // Also performs garbage collection of NICs which have "namePrefix", are
465 // not associated with a virtual machine and have a "create-at" time
466 // more than DeleteDanglingResourcesAfter (to prevent racing and
467 // deleting newly created NICs) in the past are deleted.
468 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
469         az.stopWg.Add(1)
470         defer az.stopWg.Done()
471
472         result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
473         if err != nil {
474                 return nil, WrapAzureError(err)
475         }
476
477         interfaces := make(map[string]network.Interface)
478
479         timestamp := time.Now()
480         for ; result.NotDone(); err = result.Next() {
481                 if err != nil {
482                         az.logger.WithError(err).Warnf("Error listing nics")
483                         return interfaces, nil
484                 }
485                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
486                         if result.Value().VirtualMachine != nil {
487                                 interfaces[*result.Value().ID] = result.Value()
488                         } else {
489                                 if result.Value().Tags["created-at"] != nil {
490                                         created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
491                                         if err == nil {
492                                                 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
493                                                         az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
494                                                         az.deleteNIC <- *result.Value().Name
495                                                 }
496                                         }
497                                 }
498                         }
499                 }
500         }
501         return interfaces, nil
502 }
503
504 // ManageBlobs garbage collects blobs (VM disk images) in the
505 // configured storage account container.  It will delete blobs which
506 // have "namePrefix", are "available" (which means they are not
507 // leased to a VM) and haven't been modified for
508 // DeleteDanglingResourcesAfter seconds.
509 func (az *AzureInstanceSet) ManageBlobs() {
510         result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
511         if err != nil {
512                 az.logger.WithError(err).Warn("Couldn't get account keys")
513                 return
514         }
515
516         key1 := *(*result.Keys)[0].Value
517         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
518         if err != nil {
519                 az.logger.WithError(err).Warn("Couldn't make client")
520                 return
521         }
522
523         blobsvc := client.GetBlobService()
524         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
525
526         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
527         timestamp := time.Now()
528
529         for {
530                 response, err := blobcont.ListBlobs(page)
531                 if err != nil {
532                         az.logger.WithError(err).Warn("Error listing blobs")
533                         return
534                 }
535                 for _, b := range response.Blobs {
536                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
537                         if b.Properties.BlobType == storage.BlobTypePage &&
538                                 b.Properties.LeaseState == "available" &&
539                                 b.Properties.LeaseStatus == "unlocked" &&
540                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
541
542                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
543                                 az.deleteBlob <- b
544                         }
545                 }
546                 if response.NextMarker != "" {
547                         page.Marker = response.NextMarker
548                 } else {
549                         break
550                 }
551         }
552 }
553
554 func (az *AzureInstanceSet) Stop() {
555         az.stopFunc()
556         az.stopWg.Wait()
557         close(az.deleteNIC)
558         close(az.deleteBlob)
559 }
560
561 type AzureInstance struct {
562         provider *AzureInstanceSet
563         nic      network.Interface
564         vm       compute.VirtualMachine
565 }
566
567 func (ai *AzureInstance) ID() InstanceID {
568         return InstanceID(*ai.vm.ID)
569 }
570
571 func (ai *AzureInstance) String() string {
572         return *ai.vm.Name
573 }
574
575 func (ai *AzureInstance) ProviderType() string {
576         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
577 }
578
579 func (ai *AzureInstance) SetTags(newTags InstanceTags) error {
580         ai.provider.stopWg.Add(1)
581         defer ai.provider.stopWg.Done()
582
583         tags := make(map[string]*string)
584
585         for k, v := range ai.vm.Tags {
586                 if !strings.HasPrefix(k, "dispatch-") {
587                         tags[k] = v
588                 }
589         }
590         for k, v := range newTags {
591                 newstr := v
592                 tags["dispatch-"+k] = &newstr
593         }
594
595         vmParameters := compute.VirtualMachine{
596                 Location: &ai.provider.azconfig.Location,
597                 Tags:     tags,
598         }
599         vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
600         if err != nil {
601                 return WrapAzureError(err)
602         }
603         ai.vm = vm
604
605         return nil
606 }
607
608 func (ai *AzureInstance) Tags() InstanceTags {
609         tags := make(map[string]string)
610
611         for k, v := range ai.vm.Tags {
612                 if strings.HasPrefix(k, "dispatch-") {
613                         tags[k[9:]] = *v
614                 }
615         }
616
617         return tags
618 }
619
620 func (ai *AzureInstance) Destroy() error {
621         ai.provider.stopWg.Add(1)
622         defer ai.provider.stopWg.Done()
623
624         _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
625         return WrapAzureError(err)
626 }
627
628 func (ai *AzureInstance) Address() string {
629         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
630 }
631
632 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
633         ai.provider.stopWg.Add(1)
634         defer ai.provider.stopWg.Done()
635
636         remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
637
638         tags := ai.Tags()
639
640         tg := tags["ssh-pubkey-fingerprint"]
641         if tg != "" {
642                 if remoteFingerprint == tg {
643                         return nil
644                 } else {
645                         return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
646                 }
647         }
648
649         nodetokenTag := tags["node-token"]
650         if nodetokenTag == "" {
651                 return fmt.Errorf("Missing node token tag")
652         }
653
654         sess, err := client.NewSession()
655         if err != nil {
656                 return err
657         }
658
659         nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
660         if err != nil {
661                 return err
662         }
663
664         nodetoken := strings.TrimSpace(string(nodetokenbytes))
665
666         expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
667
668         if strings.TrimSpace(nodetoken) != expectedToken {
669                 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
670         }
671
672         sess, err = client.NewSession()
673         if err != nil {
674                 return err
675         }
676
677         keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
678         if err != nil {
679                 return err
680         }
681
682         sp := strings.Split(string(keyfingerprintbytes), " ")
683
684         if remoteFingerprint != sp[1] {
685                 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
686         }
687
688         tags["ssh-pubkey-fingerprint"] = sp[1]
689         delete(tags, "node-token")
690         ai.SetTags(tags)
691         return nil
692 }