14745: Makes DeleteDanglingResourcesAfter an arvados.Duration
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "fmt"
11         "net/http"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23         "github.com/Azure/azure-sdk-for-go/storage"
24         "github.com/Azure/go-autorest/autorest"
25         "github.com/Azure/go-autorest/autorest/azure"
26         "github.com/Azure/go-autorest/autorest/azure/auth"
27         "github.com/Azure/go-autorest/autorest/to"
28         "github.com/jmcvetta/randutil"
29         "github.com/mitchellh/mapstructure"
30         "github.com/sirupsen/logrus"
31         "golang.org/x/crypto/ssh"
32 )
33
34 type AzureInstanceSetConfig struct {
35         SubscriptionID               string
36         ClientID                     string
37         ClientSecret                 string
38         TenantID                     string
39         CloudEnvironment             string
40         ResourceGroup                string
41         Location                     string
42         Network                      string
43         Subnet                       string
44         StorageAccount               string
45         BlobContainer                string
46         DeleteDanglingResourcesAfter arvados.Duration
47 }
48
49 type VirtualMachinesClientWrapper interface {
50         CreateOrUpdate(ctx context.Context,
51                 resourceGroupName string,
52                 VMName string,
53                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
54         Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
55         ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
56 }
57
58 type VirtualMachinesClientImpl struct {
59         inner compute.VirtualMachinesClient
60 }
61
62 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
63         resourceGroupName string,
64         VMName string,
65         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
66
67         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
68         if err != nil {
69                 return compute.VirtualMachine{}, WrapAzureError(err)
70         }
71         future.WaitForCompletionRef(ctx, cl.inner.Client)
72         r, err := future.Result(cl.inner)
73         return r, WrapAzureError(err)
74 }
75
76 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
77         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
78         if err != nil {
79                 return nil, WrapAzureError(err)
80         }
81         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
82         return future.Response(), WrapAzureError(err)
83 }
84
85 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
86         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
87         return r, WrapAzureError(err)
88 }
89
90 type InterfacesClientWrapper interface {
91         CreateOrUpdate(ctx context.Context,
92                 resourceGroupName string,
93                 networkInterfaceName string,
94                 parameters network.Interface) (result network.Interface, err error)
95         Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
96         ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
97 }
98
99 type InterfacesClientImpl struct {
100         inner network.InterfacesClient
101 }
102
103 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
104         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
105         if err != nil {
106                 return nil, WrapAzureError(err)
107         }
108         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
109         return future.Response(), WrapAzureError(err)
110 }
111
112 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
113         resourceGroupName string,
114         networkInterfaceName string,
115         parameters network.Interface) (result network.Interface, err error) {
116
117         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
118         if err != nil {
119                 return network.Interface{}, WrapAzureError(err)
120         }
121         future.WaitForCompletionRef(ctx, cl.inner.Client)
122         r, err := future.Result(cl.inner)
123         return r, WrapAzureError(err)
124 }
125
126 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
127         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
128         return r, WrapAzureError(err)
129 }
130
131 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
132
133 type AzureRateLimitError struct {
134         azure.RequestError
135         earliestRetry time.Time
136 }
137
138 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
139         return ar.earliestRetry
140 }
141
142 type AzureQuotaError struct {
143         azure.RequestError
144 }
145
146 func (ar *AzureQuotaError) IsQuotaError() bool {
147         return true
148 }
149
150 func WrapAzureError(err error) error {
151         de, ok := err.(autorest.DetailedError)
152         if !ok {
153                 return err
154         }
155         rq, ok := de.Original.(*azure.RequestError)
156         if !ok {
157                 return err
158         }
159         if rq.Response == nil {
160                 return err
161         }
162         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
163                 // API throttling
164                 ra := rq.Response.Header["Retry-After"][0]
165                 earliestRetry, parseErr := http.ParseTime(ra)
166                 if parseErr != nil {
167                         // Could not parse as a timestamp, must be number of seconds
168                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
169                         if parseErr == nil {
170                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
171                         } else {
172                                 // Couldn't make sense of retry-after,
173                                 // so set retry to 20 seconds
174                                 earliestRetry = time.Now().Add(20 * time.Second)
175                         }
176                 }
177                 return &AzureRateLimitError{*rq, earliestRetry}
178         }
179         if rq.ServiceError == nil {
180                 return err
181         }
182         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
183                 return &AzureQuotaError{*rq}
184         }
185         return err
186 }
187
188 type AzureInstanceSet struct {
189         azconfig          AzureInstanceSetConfig
190         vmClient          VirtualMachinesClientWrapper
191         netClient         InterfacesClientWrapper
192         storageAcctClient storageacct.AccountsClient
193         azureEnv          azure.Environment
194         interfaces        map[string]network.Interface
195         dispatcherID      string
196         namePrefix        string
197         ctx               context.Context
198         stopFunc          context.CancelFunc
199         stopWg            sync.WaitGroup
200         deleteNIC         chan string
201         deleteBlob        chan storage.Blob
202         logger            logrus.FieldLogger
203 }
204
205 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
206         azcfg := AzureInstanceSetConfig{}
207
208         decoderConfig := mapstructure.DecoderConfig{
209                 DecodeHook: arvados.DurationMapStructureDecodeHook(),
210                 Result:     &azcfg}
211
212         decoder, err := mapstructure.NewDecoder(&decoderConfig)
213         if err != nil {
214                 return nil, err
215         }
216         if err = decoder.Decode(config); err != nil {
217                 return nil, err
218         }
219
220         ap := AzureInstanceSet{logger: logger}
221         err = ap.setup(azcfg, string(dispatcherID))
222         if err != nil {
223                 return nil, err
224         }
225         return &ap, nil
226 }
227
228 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
229         az.azconfig = azcfg
230         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
231         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
232         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
233
234         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
235         if err != nil {
236                 return err
237         }
238
239         authorizer, err := auth.ClientCredentialsConfig{
240                 ClientID:     az.azconfig.ClientID,
241                 ClientSecret: az.azconfig.ClientSecret,
242                 TenantID:     az.azconfig.TenantID,
243                 Resource:     az.azureEnv.ResourceManagerEndpoint,
244                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
245         }.Authorizer()
246         if err != nil {
247                 return err
248         }
249
250         vmClient.Authorizer = authorizer
251         netClient.Authorizer = authorizer
252         storageAcctClient.Authorizer = authorizer
253
254         az.vmClient = &VirtualMachinesClientImpl{vmClient}
255         az.netClient = &InterfacesClientImpl{netClient}
256         az.storageAcctClient = storageAcctClient
257
258         az.dispatcherID = dispatcherID
259         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
260
261         az.ctx, az.stopFunc = context.WithCancel(context.Background())
262         go func() {
263                 az.stopWg.Add(1)
264                 defer az.stopWg.Done()
265
266                 tk := time.NewTicker(5 * time.Minute)
267                 for {
268                         select {
269                         case <-az.ctx.Done():
270                                 tk.Stop()
271                                 return
272                         case <-tk.C:
273                                 az.ManageBlobs()
274                         }
275                 }
276         }()
277
278         az.deleteNIC = make(chan string)
279         az.deleteBlob = make(chan storage.Blob)
280
281         for i := 0; i < 4; i++ {
282                 go func() {
283                         for {
284                                 nicname, ok := <-az.deleteNIC
285                                 if !ok {
286                                         return
287                                 }
288                                 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
289                                 if delerr != nil {
290                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
291                                 } else {
292                                         az.logger.Printf("Deleted NIC %v", nicname)
293                                 }
294                         }
295                 }()
296                 go func() {
297                         for {
298                                 blob, ok := <-az.deleteBlob
299                                 if !ok {
300                                         return
301                                 }
302                                 err := blob.Delete(nil)
303                                 if err != nil {
304                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
305                                 } else {
306                                         az.logger.Printf("Deleted blob %v", blob.Name)
307                                 }
308                         }
309                 }()
310         }
311
312         return nil
313 }
314
315 func (az *AzureInstanceSet) Create(
316         instanceType arvados.InstanceType,
317         imageID cloud.ImageID,
318         newTags cloud.InstanceTags,
319         publicKey ssh.PublicKey) (cloud.Instance, error) {
320
321         az.stopWg.Add(1)
322         defer az.stopWg.Done()
323
324         if len(newTags["node-token"]) == 0 {
325                 return nil, fmt.Errorf("Must provide tag 'node-token'")
326         }
327
328         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
329         if err != nil {
330                 return nil, err
331         }
332
333         name = az.namePrefix + name
334
335         timestamp := time.Now().Format(time.RFC3339Nano)
336
337         tags := make(map[string]*string)
338         tags["created-at"] = &timestamp
339         for k, v := range newTags {
340                 newstr := v
341                 tags["dispatch-"+k] = &newstr
342         }
343
344         tags["dispatch-instance-type"] = &instanceType.Name
345
346         nicParameters := network.Interface{
347                 Location: &az.azconfig.Location,
348                 Tags:     tags,
349                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
350                         IPConfigurations: &[]network.InterfaceIPConfiguration{
351                                 network.InterfaceIPConfiguration{
352                                         Name: to.StringPtr("ip1"),
353                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
354                                                 Subnet: &network.Subnet{
355                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
356                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
357                                                                 az.azconfig.SubscriptionID,
358                                                                 az.azconfig.ResourceGroup,
359                                                                 az.azconfig.Network,
360                                                                 az.azconfig.Subnet)),
361                                                 },
362                                                 PrivateIPAllocationMethod: network.Dynamic,
363                                         },
364                                 },
365                         },
366                 },
367         }
368         nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
369         if err != nil {
370                 return nil, WrapAzureError(err)
371         }
372
373         instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
374                 az.azconfig.StorageAccount,
375                 az.azureEnv.StorageEndpointSuffix,
376                 az.azconfig.BlobContainer,
377                 name)
378
379         customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
380 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
381
382         vmParameters := compute.VirtualMachine{
383                 Location: &az.azconfig.Location,
384                 Tags:     tags,
385                 VirtualMachineProperties: &compute.VirtualMachineProperties{
386                         HardwareProfile: &compute.HardwareProfile{
387                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
388                         },
389                         StorageProfile: &compute.StorageProfile{
390                                 OsDisk: &compute.OSDisk{
391                                         OsType:       compute.Linux,
392                                         Name:         to.StringPtr(name + "-os"),
393                                         CreateOption: compute.FromImage,
394                                         Image: &compute.VirtualHardDisk{
395                                                 URI: to.StringPtr(string(imageID)),
396                                         },
397                                         Vhd: &compute.VirtualHardDisk{
398                                                 URI: &instanceVhd,
399                                         },
400                                 },
401                         },
402                         NetworkProfile: &compute.NetworkProfile{
403                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
404                                         compute.NetworkInterfaceReference{
405                                                 ID: nic.ID,
406                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
407                                                         Primary: to.BoolPtr(true),
408                                                 },
409                                         },
410                                 },
411                         },
412                         OsProfile: &compute.OSProfile{
413                                 ComputerName:  &name,
414                                 AdminUsername: to.StringPtr("crunch"),
415                                 LinuxConfiguration: &compute.LinuxConfiguration{
416                                         DisablePasswordAuthentication: to.BoolPtr(true),
417                                         SSH: &compute.SSHConfiguration{
418                                                 PublicKeys: &[]compute.SSHPublicKey{
419                                                         compute.SSHPublicKey{
420                                                                 Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
421                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
422                                                         },
423                                                 },
424                                         },
425                                 },
426                                 CustomData: &customData,
427                         },
428                 },
429         }
430
431         vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
432         if err != nil {
433                 return nil, WrapAzureError(err)
434         }
435
436         return &AzureInstance{
437                 provider: az,
438                 nic:      nic,
439                 vm:       vm,
440         }, nil
441 }
442
443 func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
444         az.stopWg.Add(1)
445         defer az.stopWg.Done()
446
447         interfaces, err := az.ManageNics()
448         if err != nil {
449                 return nil, err
450         }
451
452         result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
453         if err != nil {
454                 return nil, WrapAzureError(err)
455         }
456
457         instances := make([]cloud.Instance, 0)
458
459         for ; result.NotDone(); err = result.Next() {
460                 if err != nil {
461                         return nil, WrapAzureError(err)
462                 }
463                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
464                         instances = append(instances, &AzureInstance{
465                                 provider: az,
466                                 vm:       result.Value(),
467                                 nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
468                 }
469         }
470         return instances, nil
471 }
472
473 // ManageNics returns a list of Azure network interface resources.
474 // Also performs garbage collection of NICs which have "namePrefix", are
475 // not associated with a virtual machine and have a "create-at" time
476 // more than DeleteDanglingResourcesAfter (to prevent racing and
477 // deleting newly created NICs) in the past are deleted.
478 func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
479         az.stopWg.Add(1)
480         defer az.stopWg.Done()
481
482         result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
483         if err != nil {
484                 return nil, WrapAzureError(err)
485         }
486
487         interfaces := make(map[string]network.Interface)
488
489         timestamp := time.Now()
490         for ; result.NotDone(); err = result.Next() {
491                 if err != nil {
492                         az.logger.WithError(err).Warnf("Error listing nics")
493                         return interfaces, nil
494                 }
495                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
496                         if result.Value().VirtualMachine != nil {
497                                 interfaces[*result.Value().ID] = result.Value()
498                         } else {
499                                 if result.Value().Tags["created-at"] != nil {
500                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
501                                         if err == nil {
502                                                 if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
503                                                         az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
504                                                         az.deleteNIC <- *result.Value().Name
505                                                 }
506                                         }
507                                 }
508                         }
509                 }
510         }
511         return interfaces, nil
512 }
513
514 // ManageBlobs garbage collects blobs (VM disk images) in the
515 // configured storage account container.  It will delete blobs which
516 // have "namePrefix", are "available" (which means they are not
517 // leased to a VM) and haven't been modified for
518 // DeleteDanglingResourcesAfter seconds.
519 func (az *AzureInstanceSet) ManageBlobs() {
520         result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
521         if err != nil {
522                 az.logger.WithError(err).Warn("Couldn't get account keys")
523                 return
524         }
525
526         key1 := *(*result.Keys)[0].Value
527         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
528         if err != nil {
529                 az.logger.WithError(err).Warn("Couldn't make client")
530                 return
531         }
532
533         blobsvc := client.GetBlobService()
534         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
535
536         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
537         timestamp := time.Now()
538
539         for {
540                 response, err := blobcont.ListBlobs(page)
541                 if err != nil {
542                         az.logger.WithError(err).Warn("Error listing blobs")
543                         return
544                 }
545                 for _, b := range response.Blobs {
546                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
547                         if b.Properties.BlobType == storage.BlobTypePage &&
548                                 b.Properties.LeaseState == "available" &&
549                                 b.Properties.LeaseStatus == "unlocked" &&
550                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
551
552                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
553                                 az.deleteBlob <- b
554                         }
555                 }
556                 if response.NextMarker != "" {
557                         page.Marker = response.NextMarker
558                 } else {
559                         break
560                 }
561         }
562 }
563
564 func (az *AzureInstanceSet) Stop() {
565         az.stopFunc()
566         az.stopWg.Wait()
567         close(az.deleteNIC)
568         close(az.deleteBlob)
569 }
570
571 type AzureInstance struct {
572         provider *AzureInstanceSet
573         nic      network.Interface
574         vm       compute.VirtualMachine
575 }
576
577 func (ai *AzureInstance) ID() cloud.InstanceID {
578         return cloud.InstanceID(*ai.vm.ID)
579 }
580
581 func (ai *AzureInstance) String() string {
582         return *ai.vm.Name
583 }
584
585 func (ai *AzureInstance) ProviderType() string {
586         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
587 }
588
589 func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
590         ai.provider.stopWg.Add(1)
591         defer ai.provider.stopWg.Done()
592
593         tags := make(map[string]*string)
594
595         for k, v := range ai.vm.Tags {
596                 if !strings.HasPrefix(k, "dispatch-") {
597                         tags[k] = v
598                 }
599         }
600         for k, v := range newTags {
601                 newstr := v
602                 tags["dispatch-"+k] = &newstr
603         }
604
605         vmParameters := compute.VirtualMachine{
606                 Location: &ai.provider.azconfig.Location,
607                 Tags:     tags,
608         }
609         vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
610         if err != nil {
611                 return WrapAzureError(err)
612         }
613         ai.vm = vm
614
615         return nil
616 }
617
618 func (ai *AzureInstance) Tags() cloud.InstanceTags {
619         tags := make(map[string]string)
620
621         for k, v := range ai.vm.Tags {
622                 if strings.HasPrefix(k, "dispatch-") {
623                         tags[k[9:]] = *v
624                 }
625         }
626
627         return tags
628 }
629
630 func (ai *AzureInstance) Destroy() error {
631         ai.provider.stopWg.Add(1)
632         defer ai.provider.stopWg.Done()
633
634         _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
635         return WrapAzureError(err)
636 }
637
638 func (ai *AzureInstance) Address() string {
639         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
640 }
641
642 func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
643         ai.provider.stopWg.Add(1)
644         defer ai.provider.stopWg.Done()
645
646         remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
647
648         tags := ai.Tags()
649
650         tg := tags["ssh-pubkey-fingerprint"]
651         if tg != "" {
652                 if remoteFingerprint == tg {
653                         return nil
654                 } else {
655                         return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
656                 }
657         }
658
659         nodetokenTag := tags["node-token"]
660         if nodetokenTag == "" {
661                 return fmt.Errorf("Missing node token tag")
662         }
663
664         sess, err := client.NewSession()
665         if err != nil {
666                 return err
667         }
668
669         nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
670         if err != nil {
671                 return err
672         }
673
674         nodetoken := strings.TrimSpace(string(nodetokenbytes))
675
676         expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
677
678         if strings.TrimSpace(nodetoken) != expectedToken {
679                 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
680         }
681
682         sess, err = client.NewSession()
683         if err != nil {
684                 return err
685         }
686
687         keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
688         if err != nil {
689                 return err
690         }
691
692         sp := strings.Split(string(keyfingerprintbytes), " ")
693
694         if remoteFingerprint != sp[1] {
695                 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
696         }
697
698         tags["ssh-pubkey-fingerprint"] = sp[1]
699         delete(tags, "node-token")
700         ai.SetTags(tags)
701         return nil
702 }