13964: Don't panic running tests on stubs
[arvados.git] / lib / dispatchcloud / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "context"
9         "fmt"
10         "log"
11         "net/http"
12         "regexp"
13         "strconv"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
19         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
20         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
21         "github.com/Azure/azure-sdk-for-go/storage"
22         "github.com/Azure/go-autorest/autorest"
23         "github.com/Azure/go-autorest/autorest/azure"
24         "github.com/Azure/go-autorest/autorest/azure/auth"
25         "github.com/Azure/go-autorest/autorest/to"
26         "github.com/jmcvetta/randutil"
27 )
28
29 type AzureProviderConfig struct {
30         SubscriptionID               string  `json:"subscription_id"`
31         ClientID                     string  `json:"key"`
32         ClientSecret                 string  `json:"secret"`
33         TenantID                     string  `json:"tenant_id"`
34         CloudEnv                     string  `json:"cloud_environment"`
35         ResourceGroup                string  `json:"resource_group"`
36         Location                     string  `json:"region"`
37         Network                      string  `json:"network"`
38         Subnet                       string  `json:"subnet"`
39         StorageAccount               string  `json:"storage_account"`
40         BlobContainer                string  `json:"blob_container"`
41         Image                        string  `json:"image"`
42         AuthorizedKey                string  `json:"authorized_key"`
43         DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
44 }
45
46 type VirtualMachinesClientWrapper interface {
47         CreateOrUpdate(ctx context.Context,
48                 resourceGroupName string,
49                 VMName string,
50                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
51         Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
52         ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
53 }
54
55 type VirtualMachinesClientImpl struct {
56         inner compute.VirtualMachinesClient
57 }
58
59 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
60         resourceGroupName string,
61         VMName string,
62         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
63
64         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
65         if err != nil {
66                 return compute.VirtualMachine{}, WrapAzureError(err)
67         }
68         future.WaitForCompletionRef(ctx, cl.inner.Client)
69         r, err := future.Result(cl.inner)
70         return r, WrapAzureError(err)
71 }
72
73 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
74         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
75         if err != nil {
76                 return nil, WrapAzureError(err)
77         }
78         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
79         return future.Response(), WrapAzureError(err)
80 }
81
82 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
83         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
84         return r, WrapAzureError(err)
85 }
86
87 type InterfacesClientWrapper interface {
88         CreateOrUpdate(ctx context.Context,
89                 resourceGroupName string,
90                 networkInterfaceName string,
91                 parameters network.Interface) (result network.Interface, err error)
92         Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
93         ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
94 }
95
96 type InterfacesClientImpl struct {
97         inner network.InterfacesClient
98 }
99
100 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
101         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
102         if err != nil {
103                 return nil, WrapAzureError(err)
104         }
105         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
106         return future.Response(), WrapAzureError(err)
107 }
108
109 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
110         resourceGroupName string,
111         networkInterfaceName string,
112         parameters network.Interface) (result network.Interface, err error) {
113
114         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
115         if err != nil {
116                 return network.Interface{}, WrapAzureError(err)
117         }
118         future.WaitForCompletionRef(ctx, cl.inner.Client)
119         r, err := future.Result(cl.inner)
120         return r, WrapAzureError(err)
121 }
122
123 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
124         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
125         return r, WrapAzureError(err)
126 }
127
128 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
129
130 type AzureRateLimitError struct {
131         azure.RequestError
132         earliestRetry time.Time
133 }
134
135 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
136         return ar.earliestRetry
137 }
138
139 type AzureQuotaError struct {
140         azure.RequestError
141 }
142
143 func (ar *AzureQuotaError) IsQuotaError() bool {
144         return true
145 }
146
147 func WrapAzureError(err error) error {
148         de, ok := err.(autorest.DetailedError)
149         if !ok {
150                 return err
151         }
152         rq, ok := de.Original.(*azure.RequestError)
153         if !ok {
154                 return err
155         }
156         if rq.Response == nil {
157                 return err
158         }
159         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
160                 // API throttling
161                 ra := rq.Response.Header["Retry-After"][0]
162                 earliestRetry, parseErr := http.ParseTime(ra)
163                 if parseErr != nil {
164                         // Could not parse as a timestamp, must be number of seconds
165                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
166                         if parseErr != nil {
167                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
168                         }
169                 }
170                 if parseErr != nil {
171                         // Couldn't make sense of retry-after,
172                         // so set retry to 20 seconds
173                         earliestRetry = time.Now().Add(20 * time.Second)
174                 }
175                 return &AzureRateLimitError{*rq, earliestRetry}
176         }
177         if rq.ServiceError == nil {
178                 return err
179         }
180         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
181                 return &AzureQuotaError{*rq}
182         }
183         return err
184 }
185
186 type AzureProvider struct {
187         azconfig          AzureProviderConfig
188         arvconfig         arvados.Cluster
189         vmClient          VirtualMachinesClientWrapper
190         netClient         InterfacesClientWrapper
191         storageAcctClient storageacct.AccountsClient
192         azureEnv          azure.Environment
193         interfaces        map[string]network.Interface
194 }
195
196 func NewAzureProvider(azcfg AzureProviderConfig, arvcfg arvados.Cluster) (prv Provider, err error) {
197         ap := AzureProvider{}
198         err = ap.setup(azcfg, arvcfg)
199         if err != nil {
200                 return nil, err
201         }
202         return &ap, nil
203 }
204
205 func (az *AzureProvider) setup(azcfg AzureProviderConfig, arvcfg arvados.Cluster) (err error) {
206         az.azconfig = azcfg
207         az.arvconfig = arvcfg
208         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
209         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
210         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
211
212         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
213         if err != nil {
214                 return err
215         }
216
217         authorizer, err := auth.ClientCredentialsConfig{
218                 ClientID:     az.azconfig.ClientID,
219                 ClientSecret: az.azconfig.ClientSecret,
220                 TenantID:     az.azconfig.TenantID,
221                 Resource:     az.azureEnv.ResourceManagerEndpoint,
222                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
223         }.Authorizer()
224         if err != nil {
225                 return err
226         }
227
228         vmClient.Authorizer = authorizer
229         netClient.Authorizer = authorizer
230         storageAcctClient.Authorizer = authorizer
231
232         az.vmClient = &VirtualMachinesClientImpl{vmClient}
233         az.netClient = &InterfacesClientImpl{netClient}
234         az.storageAcctClient = storageAcctClient
235
236         return nil
237 }
238
239 func (az *AzureProvider) Create(ctx context.Context,
240         instanceType arvados.InstanceType,
241         imageId ImageID,
242         instanceTag []InstanceTag) (Instance, error) {
243
244         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
245         if err != nil {
246                 return nil, err
247         }
248
249         name = "compute-" + name
250         log.Printf("name is %v", name)
251
252         timestamp := time.Now().Format(time.RFC3339Nano)
253
254         nicParameters := network.Interface{
255                 Location: &az.azconfig.Location,
256                 Tags: map[string]*string{
257                         "arvados-class":   to.StringPtr("crunch-dynamic-compute"),
258                         "arvados-cluster": &az.arvconfig.ClusterID,
259                         "created-at":      &timestamp,
260                 },
261                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
262                         IPConfigurations: &[]network.InterfaceIPConfiguration{
263                                 network.InterfaceIPConfiguration{
264                                         Name: to.StringPtr("ip1"),
265                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
266                                                 Subnet: &network.Subnet{
267                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
268                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
269                                                                 az.azconfig.SubscriptionID,
270                                                                 az.azconfig.ResourceGroup,
271                                                                 az.azconfig.Network,
272                                                                 az.azconfig.Subnet)),
273                                                 },
274                                                 PrivateIPAllocationMethod: network.Dynamic,
275                                         },
276                                 },
277                         },
278                 },
279         }
280         nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
281         if err != nil {
282                 return nil, WrapAzureError(err)
283         }
284
285         log.Printf("Created NIC %v", *nic.ID)
286
287         instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
288                 az.azconfig.StorageAccount,
289                 az.azureEnv.StorageEndpointSuffix,
290                 az.azconfig.BlobContainer,
291                 name)
292
293         log.Printf("URI instance vhd %v", instance_vhd)
294
295         vmParameters := compute.VirtualMachine{
296                 Location: &az.azconfig.Location,
297                 Tags: map[string]*string{
298                         "arvados-class":         to.StringPtr("crunch-dynamic-compute"),
299                         "arvados-instance-type": &instanceType.Name,
300                         "arvados-cluster":       &az.arvconfig.ClusterID,
301                         "created-at":            &timestamp,
302                 },
303                 VirtualMachineProperties: &compute.VirtualMachineProperties{
304                         HardwareProfile: &compute.HardwareProfile{
305                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
306                         },
307                         StorageProfile: &compute.StorageProfile{
308                                 OsDisk: &compute.OSDisk{
309                                         OsType:       compute.Linux,
310                                         Name:         to.StringPtr(fmt.Sprintf("%v-os", name)),
311                                         CreateOption: compute.FromImage,
312                                         Image: &compute.VirtualHardDisk{
313                                                 URI: to.StringPtr(string(imageId)),
314                                         },
315                                         Vhd: &compute.VirtualHardDisk{
316                                                 URI: &instance_vhd,
317                                         },
318                                 },
319                         },
320                         NetworkProfile: &compute.NetworkProfile{
321                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
322                                         compute.NetworkInterfaceReference{
323                                                 ID: nic.ID,
324                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
325                                                         Primary: to.BoolPtr(true),
326                                                 },
327                                         },
328                                 },
329                         },
330                         OsProfile: &compute.OSProfile{
331                                 ComputerName:  &name,
332                                 AdminUsername: to.StringPtr("arvados"),
333                                 LinuxConfiguration: &compute.LinuxConfiguration{
334                                         DisablePasswordAuthentication: to.BoolPtr(true),
335                                         SSH: &compute.SSHConfiguration{
336                                                 PublicKeys: &[]compute.SSHPublicKey{
337                                                         compute.SSHPublicKey{
338                                                                 Path:    to.StringPtr("/home/arvados/.ssh/authorized_keys"),
339                                                                 KeyData: to.StringPtr(az.azconfig.AuthorizedKey),
340                                                         },
341                                                 },
342                                         },
343                                 },
344                                 //CustomData: to.StringPtr(""),
345                         },
346                 },
347         }
348
349         vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
350         if err != nil {
351                 return nil, WrapAzureError(err)
352         }
353
354         return &AzureInstance{
355                 instanceType: instanceType,
356                 provider:     az,
357                 nic:          nic,
358                 vm:           vm,
359         }, nil
360 }
361
362 func (az *AzureProvider) Instances(ctx context.Context) ([]Instance, error) {
363         interfaces, err := az.ManageNics(ctx)
364         if err != nil {
365                 return nil, err
366         }
367
368         result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
369         if err != nil {
370                 return nil, WrapAzureError(err)
371         }
372
373         instances := make([]Instance, 0)
374
375         for ; result.NotDone(); err = result.Next() {
376                 if err != nil {
377                         return nil, WrapAzureError(err)
378                 }
379                 if result.Value().Tags["arvados-class"] != nil &&
380                         result.Value().Tags["arvados-instance-type"] != nil &&
381                         (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
382                         instances = append(instances, &AzureInstance{
383                                 provider:     az,
384                                 vm:           result.Value(),
385                                 nic:          interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
386                                 instanceType: az.arvconfig.InstanceTypes[(*result.Value().Tags["arvados-instance-type"])]})
387                 }
388         }
389         return instances, nil
390 }
391
392 func (az *AzureProvider) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
393         result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
394         if err != nil {
395                 return nil, WrapAzureError(err)
396         }
397
398         interfaces := make(map[string]network.Interface)
399
400         timestamp := time.Now()
401         wg := sync.WaitGroup{}
402         deletechannel := make(chan string, 20)
403         defer func() {
404                 wg.Wait()
405                 close(deletechannel)
406         }()
407         for i := 0; i < 4; i += 1 {
408                 go func() {
409                         for {
410                                 nicname, ok := <-deletechannel
411                                 if !ok {
412                                         return
413                                 }
414                                 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
415                                 if delerr != nil {
416                                         log.Printf("Error deleting %v: %v", nicname, delerr)
417                                 } else {
418                                         log.Printf("Deleted %v", nicname)
419                                 }
420                                 wg.Done()
421                         }
422                 }()
423         }
424
425         for ; result.NotDone(); err = result.Next() {
426                 if err != nil {
427                         log.Printf("Error listing nics: %v", err)
428                         return interfaces, WrapAzureError(nil)
429                 }
430                 if result.Value().Tags["arvados-class"] != nil &&
431                         (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
432
433                         if result.Value().VirtualMachine != nil {
434                                 interfaces[*result.Value().ID] = result.Value()
435                         } else {
436
437                                 if result.Value().Tags["created-at"] != nil {
438                                         created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
439                                         if err == nil {
440                                                 //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
441                                                 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
442                                                         log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
443                                                         wg.Add(1)
444                                                         deletechannel <- *result.Value().Name
445                                                 }
446                                         }
447                                 }
448                         }
449                 }
450         }
451         return interfaces, nil
452 }
453
454 func (az *AzureProvider) ManageBlobs(ctx context.Context) {
455         result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
456         if err != nil {
457                 log.Printf("Couldn't get account keys %v", err)
458                 return
459         }
460
461         key1 := *(*result.Keys)[0].Value
462         client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
463         if err != nil {
464                 log.Printf("Couldn't make client %v", err)
465                 return
466         }
467
468         blobsvc := client.GetBlobService()
469         blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
470
471         timestamp := time.Now()
472         wg := sync.WaitGroup{}
473         deletechannel := make(chan storage.Blob, 20)
474         defer func() {
475                 wg.Wait()
476                 close(deletechannel)
477         }()
478         for i := 0; i < 4; i += 1 {
479                 go func() {
480                         for {
481                                 blob, ok := <-deletechannel
482                                 if !ok {
483                                         return
484                                 }
485                                 err := blob.Delete(nil)
486                                 if err != nil {
487                                         log.Printf("error deleting %v: %v", blob.Name, err)
488                                 } else {
489                                         log.Printf("Deleted blob %v", blob.Name)
490                                 }
491                                 wg.Done()
492                         }
493                 }()
494         }
495
496         page := storage.ListBlobsParameters{Prefix: "compute-"}
497
498         for {
499                 response, err := blobcont.ListBlobs(page)
500                 if err != nil {
501                         log.Printf("Error listing blobs %v", err)
502                         return
503                 }
504                 for _, b := range response.Blobs {
505                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
506                         if b.Properties.BlobType == storage.BlobTypePage &&
507                                 b.Properties.LeaseState == "available" &&
508                                 b.Properties.LeaseStatus == "unlocked" &&
509                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
510
511                                 log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
512                                 wg.Add(1)
513                                 deletechannel <- b
514                         }
515                 }
516                 if response.NextMarker != "" {
517                         page.Marker = response.NextMarker
518                 } else {
519                         break
520                 }
521         }
522 }
523
524 type AzureInstance struct {
525         instanceType arvados.InstanceType
526         provider     *AzureProvider
527         nic          network.Interface
528         vm           compute.VirtualMachine
529 }
530
531 func (ai *AzureInstance) String() string {
532         return *ai.vm.Name
533 }
534
535 func (ai *AzureInstance) InstanceType() arvados.InstanceType {
536         return ai.instanceType
537 }
538
539 func (ai *AzureInstance) SetTags([]InstanceTag) error {
540         return nil
541 }
542
543 func (ai *AzureInstance) GetTags() ([]InstanceTag, error) {
544         return nil, nil
545 }
546
547 func (ai *AzureInstance) Destroy(ctx context.Context) error {
548         _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
549         return WrapAzureError(err)
550 }
551
552 func (ai *AzureInstance) Address() string {
553         return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
554 }