21e8d28d8317cd76b0b3ebe6bbce9ad50aab64b5
[arvados.git] / lib / dispatchcloud / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "context"
9         "fmt"
10         "log"
11         "net/http"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
20         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
21         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
22         "github.com/Azure/azure-sdk-for-go/storage"
23         "github.com/Azure/go-autorest/autorest"
24         "github.com/Azure/go-autorest/autorest/azure"
25         "github.com/Azure/go-autorest/autorest/azure/auth"
26         "github.com/Azure/go-autorest/autorest/to"
27         "github.com/jmcvetta/randutil"
28 )
29
30 type AzureProviderConfig struct {
31         SubscriptionID               string  `json:"subscription_id"`
32         ClientID                     string  `json:"key"`
33         ClientSecret                 string  `json:"secret"`
34         TenantID                     string  `json:"tenant_id"`
35         CloudEnv                     string  `json:"cloud_environment"`
36         ResourceGroup                string  `json:"resource_group"`
37         Location                     string  `json:"region"`
38         Network                      string  `json:"network"`
39         Subnet                       string  `json:"subnet"`
40         StorageAccount               string  `json:"storage_account"`
41         BlobContainer                string  `json:"blob_container"`
42         Image                        string  `json:"image"`
43         AuthorizedKey                string  `json:"authorized_key"`
44         DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
45 }
46
47 type VirtualMachinesClientWrapper interface {
48         CreateOrUpdate(ctx context.Context,
49                 resourceGroupName string,
50                 VMName string,
51                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
52         Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
53         ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
54 }
55
56 type VirtualMachinesClientImpl struct {
57         inner compute.VirtualMachinesClient
58 }
59
60 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
61         resourceGroupName string,
62         VMName string,
63         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
64
65         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
66         if err != nil {
67                 return compute.VirtualMachine{}, WrapAzureError(err)
68         }
69         future.WaitForCompletionRef(ctx, cl.inner.Client)
70         r, err := future.Result(cl.inner)
71         return r, WrapAzureError(err)
72 }
73
74 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
75         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
76         if err != nil {
77                 return nil, WrapAzureError(err)
78         }
79         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
80         return future.Response(), WrapAzureError(err)
81 }
82
83 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
84         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
85         return r, WrapAzureError(err)
86 }
87
88 type InterfacesClientWrapper interface {
89         CreateOrUpdate(ctx context.Context,
90                 resourceGroupName string,
91                 networkInterfaceName string,
92                 parameters network.Interface) (result network.Interface, err error)
93         Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
94         ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
95 }
96
97 type InterfacesClientImpl struct {
98         inner network.InterfacesClient
99 }
100
101 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
102         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
103         if err != nil {
104                 return nil, WrapAzureError(err)
105         }
106         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
107         return future.Response(), WrapAzureError(err)
108 }
109
110 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
111         resourceGroupName string,
112         networkInterfaceName string,
113         parameters network.Interface) (result network.Interface, err error) {
114
115         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
116         if err != nil {
117                 return network.Interface{}, WrapAzureError(err)
118         }
119         future.WaitForCompletionRef(ctx, cl.inner.Client)
120         r, err := future.Result(cl.inner)
121         return r, WrapAzureError(err)
122 }
123
124 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
125         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
126         return r, WrapAzureError(err)
127 }
128
129 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
130
131 type AzureRateLimitError struct {
132         azure.RequestError
133         earliestRetry time.Time
134 }
135
136 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
137         return ar.earliestRetry
138 }
139
140 type AzureQuotaError struct {
141         azure.RequestError
142 }
143
144 func (ar *AzureQuotaError) IsQuotaError() bool {
145         return true
146 }
147
148 func WrapAzureError(err error) error {
149         de, ok := err.(autorest.DetailedError)
150         if !ok {
151                 return err
152         }
153         rq, ok := de.Original.(*azure.RequestError)
154         if !ok {
155                 return err
156         }
157         if rq.Response == nil {
158                 return err
159         }
160         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
161                 // API throttling
162                 ra := rq.Response.Header["Retry-After"][0]
163                 earliestRetry, parseErr := http.ParseTime(ra)
164                 if parseErr != nil {
165                         // Could not parse as a timestamp, must be number of seconds
166                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
167                         if parseErr != nil {
168                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
169                         }
170                 }
171                 if parseErr != nil {
172                         // Couldn't make sense of retry-after,
173                         // so set retry to 20 seconds
174                         earliestRetry = time.Now().Add(20 * time.Second)
175                 }
176                 return &AzureRateLimitError{*rq, earliestRetry}
177         }
178         if rq.ServiceError == nil {
179                 return err
180         }
181         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
182                 return &AzureQuotaError{*rq}
183         }
184         return err
185 }
186
187 type AzureProvider struct {
188         azconfig          AzureProviderConfig
189         arvconfig         arvados.Cluster
190         vmClient          VirtualMachinesClientWrapper
191         netClient         InterfacesClientWrapper
192         storageAcctClient storageacct.AccountsClient
193         azureEnv          azure.Environment
194         interfaces        map[string]network.Interface
195         dispatcherID      string
196         namePrefix        string
197 }
198
199 func NewAzureProvider(azcfg AzureProviderConfig, arvcfg arvados.Cluster, dispatcherID string) (prv Provider, err error) {
200         ap := AzureProvider{}
201         err = ap.setup(azcfg, arvcfg, dispatcherID)
202         if err != nil {
203                 return nil, err
204         }
205         return &ap, nil
206 }
207
208 func (az *AzureProvider) setup(azcfg AzureProviderConfig, arvcfg arvados.Cluster, dispatcherID string) (err error) {
209         az.azconfig = azcfg
210         az.arvconfig = arvcfg
211         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
212         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
213         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
214
215         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
216         if err != nil {
217                 return err
218         }
219
220         authorizer, err := auth.ClientCredentialsConfig{
221                 ClientID:     az.azconfig.ClientID,
222                 ClientSecret: az.azconfig.ClientSecret,
223                 TenantID:     az.azconfig.TenantID,
224                 Resource:     az.azureEnv.ResourceManagerEndpoint,
225                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
226         }.Authorizer()
227         if err != nil {
228                 return err
229         }
230
231         vmClient.Authorizer = authorizer
232         netClient.Authorizer = authorizer
233         storageAcctClient.Authorizer = authorizer
234
235         az.vmClient = &VirtualMachinesClientImpl{vmClient}
236         az.netClient = &InterfacesClientImpl{netClient}
237         az.storageAcctClient = storageAcctClient
238
239         az.dispatcherID = dispatcherID
240         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
241
242         return nil
243 }
244
245 func (az *AzureProvider) Create(ctx context.Context,
246         instanceType arvados.InstanceType,
247         imageId ImageID,
248         newTags InstanceTags) (Instance, error) {
249
250         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
251         if err != nil {
252                 return nil, err
253         }
254
255         name = az.namePrefix + name
256         log.Printf("name is %v", name)
257
258         timestamp := time.Now().Format(time.RFC3339Nano)
259
260         tags := make(map[string]*string)
261         tags["created-at"] = &timestamp
262         for k, v := range newTags {
263                 tags["dispatch-"+k] = &v
264         }
265
266         nicParameters := network.Interface{
267                 Location: &az.azconfig.Location,
268                 Tags:     tags,
269                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
270                         IPConfigurations: &[]network.InterfaceIPConfiguration{
271                                 network.InterfaceIPConfiguration{
272                                         Name: to.StringPtr("ip1"),
273                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
274                                                 Subnet: &network.Subnet{
275                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
276                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
277                                                                 az.azconfig.SubscriptionID,
278                                                                 az.azconfig.ResourceGroup,
279                                                                 az.azconfig.Network,
280                                                                 az.azconfig.Subnet)),
281                                                 },
282                                                 PrivateIPAllocationMethod: network.Dynamic,
283                                         },
284                                 },
285                         },
286                 },
287         }
288         nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
289         if err != nil {
290                 return nil, WrapAzureError(err)
291         }
292
293         log.Printf("Created NIC %v", *nic.ID)
294
295         instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
296                 az.azconfig.StorageAccount,
297                 az.azureEnv.StorageEndpointSuffix,
298                 az.azconfig.BlobContainer,
299                 name)
300
301         log.Printf("URI instance vhd %v", instance_vhd)
302
303         tags["arvados-instance-type"] = &instanceType.Name
304
305         vmParameters := compute.VirtualMachine{
306                 Location: &az.azconfig.Location,
307                 Tags:     tags,
308                 VirtualMachineProperties: &compute.VirtualMachineProperties{
309                         HardwareProfile: &compute.HardwareProfile{
310                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
311                         },
312                         StorageProfile: &compute.StorageProfile{
313                                 OsDisk: &compute.OSDisk{
314                                         OsType:       compute.Linux,
315                                         Name:         to.StringPtr(name + "-os"),
316                                         CreateOption: compute.FromImage,
317                                         Image: &compute.VirtualHardDisk{
318                                                 URI: to.StringPtr(string(imageId)),
319                                         },
320                                         Vhd: &compute.VirtualHardDisk{
321                                                 URI: &instance_vhd,
322                                         },
323                                 },
324                         },
325                         NetworkProfile: &compute.NetworkProfile{
326                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
327                                         compute.NetworkInterfaceReference{
328                                                 ID: nic.ID,
329                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
330                                                         Primary: to.BoolPtr(true),
331                                                 },
332                                         },
333                                 },
334                         },
335                         OsProfile: &compute.OSProfile{
336                                 ComputerName:  &name,
337                                 AdminUsername: to.StringPtr("arvados"),
338                                 LinuxConfiguration: &compute.LinuxConfiguration{
339                                         DisablePasswordAuthentication: to.BoolPtr(true),
340                                         SSH: &compute.SSHConfiguration{
341                                                 PublicKeys: &[]compute.SSHPublicKey{
342                                                         compute.SSHPublicKey{
343                                                                 Path:    to.StringPtr("/home/arvados/.ssh/authorized_keys"),
344                                                                 KeyData: to.StringPtr(az.azconfig.AuthorizedKey),
345                                                         },
346                                                 },
347                                         },
348                                 },
349                                 //CustomData: to.StringPtr(""),
350                         },
351                 },
352         }
353
354         vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
355         if err != nil {
356                 return nil, WrapAzureError(err)
357         }
358
359         return &AzureInstance{
360                 instanceType: instanceType,
361                 provider:     az,
362                 nic:          nic,
363                 vm:           vm,
364         }, nil
365 }
366
367 func (az *AzureProvider) Instances(ctx context.Context) ([]Instance, error) {
368         interfaces, err := az.ManageNics(ctx)
369         if err != nil {
370                 return nil, err
371         }
372
373         result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
374         if err != nil {
375                 return nil, WrapAzureError(err)
376         }
377
378         instances := make([]Instance, 0)
379
380         for ; result.NotDone(); err = result.Next() {
381                 if err != nil {
382                         return nil, WrapAzureError(err)
383                 }
384                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) &&
385                         result.Value().Tags["arvados-instance-type"] != nil {
386                         instances = append(instances, &AzureInstance{
387                                 provider:     az,
388                                 vm:           result.Value(),
389                                 nic:          interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
390                                 instanceType: az.arvconfig.InstanceTypes[(*result.Value().Tags["arvados-instance-type"])]})
391                 }
392         }
393         return instances, nil
394 }
395
396 func (az *AzureProvider) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
397         result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
398         if err != nil {
399                 return nil, WrapAzureError(err)
400         }
401
402         interfaces := make(map[string]network.Interface)
403
404         timestamp := time.Now()
405         wg := sync.WaitGroup{}
406         deletechannel := make(chan string, 20)
407         defer func() {
408                 wg.Wait()
409                 close(deletechannel)
410         }()
411         for i := 0; i < 4; i += 1 {
412                 go func() {
413                         for {
414                                 nicname, ok := <-deletechannel
415                                 if !ok {
416                                         return
417                                 }
418                                 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
419                                 if delerr != nil {
420                                         log.Printf("Error deleting %v: %v", nicname, delerr)
421                                 } else {
422                                         log.Printf("Deleted %v", nicname)
423                                 }
424                                 wg.Done()
425                         }
426                 }()
427         }
428
429         for ; result.NotDone(); err = result.Next() {
430                 if err != nil {
431                         log.Printf("Error listing nics: %v", err)
432                         return interfaces, nil
433                 }
434                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
435                         if result.Value().VirtualMachine != nil {
436                                 interfaces[*result.Value().ID] = result.Value()
437                         } else {
438                                 if result.Value().Tags["created-at"] != nil {
439                                         created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
440                                         if err == nil {
441                                                 //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
442                                                 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
443                                                         log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
444                                                         wg.Add(1)
445                                                         deletechannel <- *result.Value().Name
446                                                 }
447                                         }
448                                 }
449                         }
450                 }
451         }
452         return interfaces, nil
453 }
454
455 func (az *AzureProvider) ManageBlobs(ctx context.Context) {
456         result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
457         if err != nil {
458                 log.Printf("Couldn't get account keys %v", err)
459                 return
460         }
461
462         key1 := *(*result.Keys)[0].Value
463         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
464         if err != nil {
465                 log.Printf("Couldn't make client %v", err)
466                 return
467         }
468
469         blobsvc := client.GetBlobService()
470         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
471
472         timestamp := time.Now()
473         wg := sync.WaitGroup{}
474         deletechannel := make(chan storage.Blob, 20)
475         defer func() {
476                 wg.Wait()
477                 close(deletechannel)
478         }()
479         for i := 0; i < 4; i += 1 {
480                 go func() {
481                         for {
482                                 blob, ok := <-deletechannel
483                                 if !ok {
484                                         return
485                                 }
486                                 err := blob.Delete(nil)
487                                 if err != nil {
488                                         log.Printf("error deleting %v: %v", blob.Name, err)
489                                 } else {
490                                         log.Printf("Deleted blob %v", blob.Name)
491                                 }
492                                 wg.Done()
493                         }
494                 }()
495         }
496
497         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
498
499         for {
500                 response, err := blobcont.ListBlobs(page)
501                 if err != nil {
502                         log.Printf("Error listing blobs %v", err)
503                         return
504                 }
505                 for _, b := range response.Blobs {
506                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
507                         if b.Properties.BlobType == storage.BlobTypePage &&
508                                 b.Properties.LeaseState == "available" &&
509                                 b.Properties.LeaseStatus == "unlocked" &&
510                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
511
512                                 log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
513                                 wg.Add(1)
514                                 deletechannel <- b
515                         }
516                 }
517                 if response.NextMarker != "" {
518                         page.Marker = response.NextMarker
519                 } else {
520                         break
521                 }
522         }
523 }
524
525 type AzureInstance struct {
526         instanceType arvados.InstanceType
527         provider     *AzureProvider
528         nic          network.Interface
529         vm           compute.VirtualMachine
530 }
531
532 func (ai *AzureInstance) String() string {
533         return *ai.vm.Name
534 }
535
536 func (ai *AzureInstance) InstanceType() arvados.InstanceType {
537         return ai.instanceType
538 }
539
540 func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
541         tags := make(map[string]*string)
542
543         for k, v := range ai.vm.Tags {
544                 if !strings.HasPrefix(k, "dispatch-") {
545                         tags[k] = v
546                 }
547         }
548         for k, v := range newTags {
549                 tags["dispatch-"+k] = &v
550         }
551
552         vmParameters := compute.VirtualMachine{
553                 Location: &ai.provider.azconfig.Location,
554                 Tags:     tags,
555         }
556         vm, err := ai.provider.vmClient.CreateOrUpdate(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
557         if err != nil {
558                 return WrapAzureError(err)
559         }
560         ai.vm = vm
561
562         return nil
563 }
564
565 func (ai *AzureInstance) GetTags(ctx context.Context) (InstanceTags, error) {
566         tags := make(map[string]string)
567
568         for k, v := range ai.vm.Tags {
569                 if strings.HasPrefix(k, "dispatch-") {
570                         tags[k[9:]] = *v
571                 }
572         }
573
574         return tags, nil
575 }
576
577 func (ai *AzureInstance) Destroy(ctx context.Context) error {
578         _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
579         return WrapAzureError(err)
580 }
581
582 func (ai *AzureInstance) Address() string {
583         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
584 }