14807: Merge branch 'master'
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/lib/cloud"
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
22         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
23         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
24         "github.com/Azure/azure-sdk-for-go/storage"
25         "github.com/Azure/go-autorest/autorest"
26         "github.com/Azure/go-autorest/autorest/azure"
27         "github.com/Azure/go-autorest/autorest/azure/auth"
28         "github.com/Azure/go-autorest/autorest/to"
29         "github.com/jmcvetta/randutil"
30         "github.com/sirupsen/logrus"
31         "golang.org/x/crypto/ssh"
32 )
33
34 // Driver is the azure implementation of the cloud.Driver interface.
35 var Driver = cloud.DriverFunc(newAzureInstanceSet)
36
37 type azureInstanceSetConfig struct {
38         SubscriptionID               string
39         ClientID                     string
40         ClientSecret                 string
41         TenantID                     string
42         CloudEnvironment             string
43         ResourceGroup                string
44         Location                     string
45         Network                      string
46         Subnet                       string
47         StorageAccount               string
48         BlobContainer                string
49         DeleteDanglingResourcesAfter arvados.Duration
50         AdminUsername                string
51 }
52
53 const tagKeyInstanceSecret = "InstanceSecret"
54
55 type virtualMachinesClientWrapper interface {
56         createOrUpdate(ctx context.Context,
57                 resourceGroupName string,
58                 VMName string,
59                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
60         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
61         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
62 }
63
64 type virtualMachinesClientImpl struct {
65         inner compute.VirtualMachinesClient
66 }
67
68 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
69         resourceGroupName string,
70         VMName string,
71         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
72
73         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
74         if err != nil {
75                 return compute.VirtualMachine{}, wrapAzureError(err)
76         }
77         future.WaitForCompletionRef(ctx, cl.inner.Client)
78         r, err := future.Result(cl.inner)
79         return r, wrapAzureError(err)
80 }
81
82 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
83         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
84         if err != nil {
85                 return nil, wrapAzureError(err)
86         }
87         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
88         return future.Response(), wrapAzureError(err)
89 }
90
91 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
92         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
93         return r, wrapAzureError(err)
94 }
95
96 type interfacesClientWrapper interface {
97         createOrUpdate(ctx context.Context,
98                 resourceGroupName string,
99                 networkInterfaceName string,
100                 parameters network.Interface) (result network.Interface, err error)
101         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
102         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
103 }
104
105 type interfacesClientImpl struct {
106         inner network.InterfacesClient
107 }
108
109 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
110         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
111         if err != nil {
112                 return nil, wrapAzureError(err)
113         }
114         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
115         return future.Response(), wrapAzureError(err)
116 }
117
118 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
119         resourceGroupName string,
120         networkInterfaceName string,
121         parameters network.Interface) (result network.Interface, err error) {
122
123         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
124         if err != nil {
125                 return network.Interface{}, wrapAzureError(err)
126         }
127         future.WaitForCompletionRef(ctx, cl.inner.Client)
128         r, err := future.Result(cl.inner)
129         return r, wrapAzureError(err)
130 }
131
132 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
133         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
134         return r, wrapAzureError(err)
135 }
136
137 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
138
139 type azureRateLimitError struct {
140         azure.RequestError
141         firstRetry time.Time
142 }
143
144 func (ar *azureRateLimitError) EarliestRetry() time.Time {
145         return ar.firstRetry
146 }
147
148 type azureQuotaError struct {
149         azure.RequestError
150 }
151
152 func (ar *azureQuotaError) IsQuotaError() bool {
153         return true
154 }
155
156 func wrapAzureError(err error) error {
157         de, ok := err.(autorest.DetailedError)
158         if !ok {
159                 return err
160         }
161         rq, ok := de.Original.(*azure.RequestError)
162         if !ok {
163                 return err
164         }
165         if rq.Response == nil {
166                 return err
167         }
168         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
169                 // API throttling
170                 ra := rq.Response.Header["Retry-After"][0]
171                 earliestRetry, parseErr := http.ParseTime(ra)
172                 if parseErr != nil {
173                         // Could not parse as a timestamp, must be number of seconds
174                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
175                         if parseErr == nil {
176                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
177                         } else {
178                                 // Couldn't make sense of retry-after,
179                                 // so set retry to 20 seconds
180                                 earliestRetry = time.Now().Add(20 * time.Second)
181                         }
182                 }
183                 return &azureRateLimitError{*rq, earliestRetry}
184         }
185         if rq.ServiceError == nil {
186                 return err
187         }
188         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
189                 return &azureQuotaError{*rq}
190         }
191         return err
192 }
193
194 type azureInstanceSet struct {
195         azconfig          azureInstanceSetConfig
196         vmClient          virtualMachinesClientWrapper
197         netClient         interfacesClientWrapper
198         storageAcctClient storageacct.AccountsClient
199         azureEnv          azure.Environment
200         interfaces        map[string]network.Interface
201         dispatcherID      string
202         namePrefix        string
203         ctx               context.Context
204         stopFunc          context.CancelFunc
205         stopWg            sync.WaitGroup
206         deleteNIC         chan string
207         deleteBlob        chan storage.Blob
208         logger            logrus.FieldLogger
209 }
210
211 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
212         azcfg := azureInstanceSetConfig{}
213         err = json.Unmarshal(config, &azcfg)
214         if err != nil {
215                 return nil, err
216         }
217
218         ap := azureInstanceSet{logger: logger}
219         err = ap.setup(azcfg, string(dispatcherID))
220         if err != nil {
221                 return nil, err
222         }
223         return &ap, nil
224 }
225
226 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
227         az.azconfig = azcfg
228         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
229         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
230         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
231
232         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
233         if err != nil {
234                 return err
235         }
236
237         authorizer, err := auth.ClientCredentialsConfig{
238                 ClientID:     az.azconfig.ClientID,
239                 ClientSecret: az.azconfig.ClientSecret,
240                 TenantID:     az.azconfig.TenantID,
241                 Resource:     az.azureEnv.ResourceManagerEndpoint,
242                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
243         }.Authorizer()
244         if err != nil {
245                 return err
246         }
247
248         vmClient.Authorizer = authorizer
249         netClient.Authorizer = authorizer
250         storageAcctClient.Authorizer = authorizer
251
252         az.vmClient = &virtualMachinesClientImpl{vmClient}
253         az.netClient = &interfacesClientImpl{netClient}
254         az.storageAcctClient = storageAcctClient
255
256         az.dispatcherID = dispatcherID
257         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
258
259         az.ctx, az.stopFunc = context.WithCancel(context.Background())
260         go func() {
261                 az.stopWg.Add(1)
262                 defer az.stopWg.Done()
263
264                 tk := time.NewTicker(5 * time.Minute)
265                 for {
266                         select {
267                         case <-az.ctx.Done():
268                                 tk.Stop()
269                                 return
270                         case <-tk.C:
271                                 az.manageBlobs()
272                         }
273                 }
274         }()
275
276         az.deleteNIC = make(chan string)
277         az.deleteBlob = make(chan storage.Blob)
278
279         for i := 0; i < 4; i++ {
280                 go func() {
281                         for {
282                                 nicname, ok := <-az.deleteNIC
283                                 if !ok {
284                                         return
285                                 }
286                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
287                                 if delerr != nil {
288                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
289                                 } else {
290                                         az.logger.Printf("Deleted NIC %v", nicname)
291                                 }
292                         }
293                 }()
294                 go func() {
295                         for {
296                                 blob, ok := <-az.deleteBlob
297                                 if !ok {
298                                         return
299                                 }
300                                 err := blob.Delete(nil)
301                                 if err != nil {
302                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
303                                 } else {
304                                         az.logger.Printf("Deleted blob %v", blob.Name)
305                                 }
306                         }
307                 }()
308         }
309
310         return nil
311 }
312
313 func (az *azureInstanceSet) Create(
314         instanceType arvados.InstanceType,
315         imageID cloud.ImageID,
316         newTags cloud.InstanceTags,
317         initCommand cloud.InitCommand,
318         publicKey ssh.PublicKey) (cloud.Instance, error) {
319
320         az.stopWg.Add(1)
321         defer az.stopWg.Done()
322
323         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
324         if err != nil {
325                 return nil, err
326         }
327
328         name = az.namePrefix + name
329
330         timestamp := time.Now().Format(time.RFC3339Nano)
331
332         tags := make(map[string]*string)
333         tags["created-at"] = &timestamp
334         for k, v := range newTags {
335                 newstr := v
336                 tags["dispatch-"+k] = &newstr
337         }
338
339         nicParameters := network.Interface{
340                 Location: &az.azconfig.Location,
341                 Tags:     tags,
342                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
343                         IPConfigurations: &[]network.InterfaceIPConfiguration{
344                                 network.InterfaceIPConfiguration{
345                                         Name: to.StringPtr("ip1"),
346                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
347                                                 Subnet: &network.Subnet{
348                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
349                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
350                                                                 az.azconfig.SubscriptionID,
351                                                                 az.azconfig.ResourceGroup,
352                                                                 az.azconfig.Network,
353                                                                 az.azconfig.Subnet)),
354                                                 },
355                                                 PrivateIPAllocationMethod: network.Dynamic,
356                                         },
357                                 },
358                         },
359                 },
360         }
361         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
362         if err != nil {
363                 return nil, wrapAzureError(err)
364         }
365
366         instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
367                 az.azconfig.StorageAccount,
368                 az.azureEnv.StorageEndpointSuffix,
369                 az.azconfig.BlobContainer,
370                 name)
371
372         customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
373
374         vmParameters := compute.VirtualMachine{
375                 Location: &az.azconfig.Location,
376                 Tags:     tags,
377                 VirtualMachineProperties: &compute.VirtualMachineProperties{
378                         HardwareProfile: &compute.HardwareProfile{
379                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
380                         },
381                         StorageProfile: &compute.StorageProfile{
382                                 OsDisk: &compute.OSDisk{
383                                         OsType:       compute.Linux,
384                                         Name:         to.StringPtr(name + "-os"),
385                                         CreateOption: compute.FromImage,
386                                         Image: &compute.VirtualHardDisk{
387                                                 URI: to.StringPtr(string(imageID)),
388                                         },
389                                         Vhd: &compute.VirtualHardDisk{
390                                                 URI: &instanceVhd,
391                                         },
392                                 },
393                         },
394                         NetworkProfile: &compute.NetworkProfile{
395                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
396                                         compute.NetworkInterfaceReference{
397                                                 ID: nic.ID,
398                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
399                                                         Primary: to.BoolPtr(true),
400                                                 },
401                                         },
402                                 },
403                         },
404                         OsProfile: &compute.OSProfile{
405                                 ComputerName:  &name,
406                                 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
407                                 LinuxConfiguration: &compute.LinuxConfiguration{
408                                         DisablePasswordAuthentication: to.BoolPtr(true),
409                                         SSH: &compute.SSHConfiguration{
410                                                 PublicKeys: &[]compute.SSHPublicKey{
411                                                         {
412                                                                 Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
413                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
414                                                         },
415                                                 },
416                                         },
417                                 },
418                                 CustomData: &customData,
419                         },
420                 },
421         }
422
423         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
424         if err != nil {
425                 return nil, wrapAzureError(err)
426         }
427
428         return &azureInstance{
429                 provider: az,
430                 nic:      nic,
431                 vm:       vm,
432         }, nil
433 }
434
435 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
436         az.stopWg.Add(1)
437         defer az.stopWg.Done()
438
439         interfaces, err := az.manageNics()
440         if err != nil {
441                 return nil, err
442         }
443
444         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
445         if err != nil {
446                 return nil, wrapAzureError(err)
447         }
448
449         instances := make([]cloud.Instance, 0)
450
451         for ; result.NotDone(); err = result.Next() {
452                 if err != nil {
453                         return nil, wrapAzureError(err)
454                 }
455                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
456                         instances = append(instances, &azureInstance{
457                                 provider: az,
458                                 vm:       result.Value(),
459                                 nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
460                 }
461         }
462         return instances, nil
463 }
464
465 // ManageNics returns a list of Azure network interface resources.
466 // Also performs garbage collection of NICs which have "namePrefix", are
467 // not associated with a virtual machine and have a "create-at" time
468 // more than DeleteDanglingResourcesAfter (to prevent racing and
469 // deleting newly created NICs) in the past are deleted.
470 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
471         az.stopWg.Add(1)
472         defer az.stopWg.Done()
473
474         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
475         if err != nil {
476                 return nil, wrapAzureError(err)
477         }
478
479         interfaces := make(map[string]network.Interface)
480
481         timestamp := time.Now()
482         for ; result.NotDone(); err = result.Next() {
483                 if err != nil {
484                         az.logger.WithError(err).Warnf("Error listing nics")
485                         return interfaces, nil
486                 }
487                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
488                         if result.Value().VirtualMachine != nil {
489                                 interfaces[*result.Value().ID] = result.Value()
490                         } else {
491                                 if result.Value().Tags["created-at"] != nil {
492                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
493                                         if err == nil {
494                                                 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
495                                                         az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
496                                                         az.deleteNIC <- *result.Value().Name
497                                                 }
498                                         }
499                                 }
500                         }
501                 }
502         }
503         return interfaces, nil
504 }
505
506 // ManageBlobs garbage collects blobs (VM disk images) in the
507 // configured storage account container.  It will delete blobs which
508 // have "namePrefix", are "available" (which means they are not
509 // leased to a VM) and haven't been modified for
510 // DeleteDanglingResourcesAfter seconds.
511 func (az *azureInstanceSet) manageBlobs() {
512         result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
513         if err != nil {
514                 az.logger.WithError(err).Warn("Couldn't get account keys")
515                 return
516         }
517
518         key1 := *(*result.Keys)[0].Value
519         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
520         if err != nil {
521                 az.logger.WithError(err).Warn("Couldn't make client")
522                 return
523         }
524
525         blobsvc := client.GetBlobService()
526         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
527
528         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
529         timestamp := time.Now()
530
531         for {
532                 response, err := blobcont.ListBlobs(page)
533                 if err != nil {
534                         az.logger.WithError(err).Warn("Error listing blobs")
535                         return
536                 }
537                 for _, b := range response.Blobs {
538                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
539                         if b.Properties.BlobType == storage.BlobTypePage &&
540                                 b.Properties.LeaseState == "available" &&
541                                 b.Properties.LeaseStatus == "unlocked" &&
542                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
543
544                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
545                                 az.deleteBlob <- b
546                         }
547                 }
548                 if response.NextMarker != "" {
549                         page.Marker = response.NextMarker
550                 } else {
551                         break
552                 }
553         }
554 }
555
556 func (az *azureInstanceSet) Stop() {
557         az.stopFunc()
558         az.stopWg.Wait()
559         close(az.deleteNIC)
560         close(az.deleteBlob)
561 }
562
563 type azureInstance struct {
564         provider *azureInstanceSet
565         nic      network.Interface
566         vm       compute.VirtualMachine
567 }
568
569 func (ai *azureInstance) ID() cloud.InstanceID {
570         return cloud.InstanceID(*ai.vm.ID)
571 }
572
573 func (ai *azureInstance) String() string {
574         return *ai.vm.Name
575 }
576
577 func (ai *azureInstance) ProviderType() string {
578         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
579 }
580
581 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
582         ai.provider.stopWg.Add(1)
583         defer ai.provider.stopWg.Done()
584
585         tags := make(map[string]*string)
586
587         for k, v := range ai.vm.Tags {
588                 if !strings.HasPrefix(k, "dispatch-") {
589                         tags[k] = v
590                 }
591         }
592         for k, v := range newTags {
593                 newstr := v
594                 tags["dispatch-"+k] = &newstr
595         }
596
597         vmParameters := compute.VirtualMachine{
598                 Location: &ai.provider.azconfig.Location,
599                 Tags:     tags,
600         }
601         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
602         if err != nil {
603                 return wrapAzureError(err)
604         }
605         ai.vm = vm
606
607         return nil
608 }
609
610 func (ai *azureInstance) Tags() cloud.InstanceTags {
611         tags := make(map[string]string)
612
613         for k, v := range ai.vm.Tags {
614                 if strings.HasPrefix(k, "dispatch-") {
615                         tags[k[9:]] = *v
616                 }
617         }
618
619         return tags
620 }
621
622 func (ai *azureInstance) Destroy() error {
623         ai.provider.stopWg.Add(1)
624         defer ai.provider.stopWg.Done()
625
626         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
627         return wrapAzureError(err)
628 }
629
630 func (ai *azureInstance) Address() string {
631         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
632 }
633
634 func (ai *azureInstance) RemoteUser() string {
635         return ai.provider.azconfig.AdminUsername
636 }
637
638 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
639         return cloud.ErrNotImplemented
640 }