16625: implement review feedback.
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "net/http"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
23         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
24         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/Azure/go-autorest/autorest"
27         "github.com/Azure/go-autorest/autorest/azure"
28         "github.com/Azure/go-autorest/autorest/azure/auth"
29         "github.com/Azure/go-autorest/autorest/to"
30         "github.com/jmcvetta/randutil"
31         "github.com/sirupsen/logrus"
32         "golang.org/x/crypto/ssh"
33 )
34
35 // Driver is the azure implementation of the cloud.Driver interface.
36 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37
38 type azureInstanceSetConfig struct {
39         SubscriptionID               string
40         ClientID                     string
41         ClientSecret                 string
42         TenantID                     string
43         CloudEnvironment             string
44         ResourceGroup                string
45         ImageResourceGroup           string
46         Location                     string
47         Network                      string
48         NetworkResourceGroup         string
49         Subnet                       string
50         StorageAccount               string
51         BlobContainer                string
52         DeleteDanglingResourcesAfter arvados.Duration
53         AdminUsername                string
54 }
55
56 type containerWrapper interface {
57         GetBlobReference(name string) *storage.Blob
58         ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
59 }
60
61 type virtualMachinesClientWrapper interface {
62         createOrUpdate(ctx context.Context,
63                 resourceGroupName string,
64                 VMName string,
65                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
66         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
67         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
68 }
69
70 type virtualMachinesClientImpl struct {
71         inner compute.VirtualMachinesClient
72 }
73
74 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
75         resourceGroupName string,
76         VMName string,
77         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
78
79         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
80         if err != nil {
81                 return compute.VirtualMachine{}, wrapAzureError(err)
82         }
83         future.WaitForCompletionRef(ctx, cl.inner.Client)
84         r, err := future.Result(cl.inner)
85         return r, wrapAzureError(err)
86 }
87
88 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
89         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
90         if err != nil {
91                 return nil, wrapAzureError(err)
92         }
93         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
94         return future.Response(), wrapAzureError(err)
95 }
96
97 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
98         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
99         return r, wrapAzureError(err)
100 }
101
102 type interfacesClientWrapper interface {
103         createOrUpdate(ctx context.Context,
104                 resourceGroupName string,
105                 networkInterfaceName string,
106                 parameters network.Interface) (result network.Interface, err error)
107         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
108         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
109 }
110
111 type interfacesClientImpl struct {
112         inner network.InterfacesClient
113 }
114
115 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
116         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
117         if err != nil {
118                 return nil, wrapAzureError(err)
119         }
120         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
121         return future.Response(), wrapAzureError(err)
122 }
123
124 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
125         resourceGroupName string,
126         networkInterfaceName string,
127         parameters network.Interface) (result network.Interface, err error) {
128
129         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
130         if err != nil {
131                 return network.Interface{}, wrapAzureError(err)
132         }
133         future.WaitForCompletionRef(ctx, cl.inner.Client)
134         r, err := future.Result(cl.inner)
135         return r, wrapAzureError(err)
136 }
137
138 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
139         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
140         return r, wrapAzureError(err)
141 }
142
143 type disksClientWrapper interface {
144         listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
145         delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
146 }
147
148 type disksClientImpl struct {
149         inner compute.DisksClient
150 }
151
152 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
153         r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
154         return r, wrapAzureError(err)
155 }
156
157 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
158         r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
159         return r, wrapAzureError(err)
160 }
161
162 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
163
164 type azureRateLimitError struct {
165         azure.RequestError
166         firstRetry time.Time
167 }
168
169 func (ar *azureRateLimitError) EarliestRetry() time.Time {
170         return ar.firstRetry
171 }
172
173 type azureQuotaError struct {
174         azure.RequestError
175 }
176
177 func (ar *azureQuotaError) IsQuotaError() bool {
178         return true
179 }
180
181 func wrapAzureError(err error) error {
182         de, ok := err.(autorest.DetailedError)
183         if !ok {
184                 return err
185         }
186         rq, ok := de.Original.(*azure.RequestError)
187         if !ok {
188                 return err
189         }
190         if rq.Response == nil {
191                 return err
192         }
193         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
194                 // API throttling
195                 ra := rq.Response.Header["Retry-After"][0]
196                 earliestRetry, parseErr := http.ParseTime(ra)
197                 if parseErr != nil {
198                         // Could not parse as a timestamp, must be number of seconds
199                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
200                         if parseErr == nil {
201                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
202                         } else {
203                                 // Couldn't make sense of retry-after,
204                                 // so set retry to 20 seconds
205                                 earliestRetry = time.Now().Add(20 * time.Second)
206                         }
207                 }
208                 return &azureRateLimitError{*rq, earliestRetry}
209         }
210         if rq.ServiceError == nil {
211                 return err
212         }
213         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
214                 return &azureQuotaError{*rq}
215         }
216         return err
217 }
218
219 type azureInstanceSet struct {
220         azconfig           azureInstanceSetConfig
221         vmClient           virtualMachinesClientWrapper
222         netClient          interfacesClientWrapper
223         disksClient        disksClientWrapper
224         imageResourceGroup string
225         blobcont           containerWrapper
226         azureEnv           azure.Environment
227         interfaces         map[string]network.Interface
228         dispatcherID       string
229         namePrefix         string
230         ctx                context.Context
231         stopFunc           context.CancelFunc
232         stopWg             sync.WaitGroup
233         deleteNIC          chan string
234         deleteBlob         chan storage.Blob
235         deleteDisk         chan compute.Disk
236         logger             logrus.FieldLogger
237 }
238
239 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
240         azcfg := azureInstanceSetConfig{}
241         err = json.Unmarshal(config, &azcfg)
242         if err != nil {
243                 return nil, err
244         }
245
246         az := azureInstanceSet{logger: logger}
247         az.ctx, az.stopFunc = context.WithCancel(context.Background())
248         err = az.setup(azcfg, string(dispatcherID))
249         if err != nil {
250                 az.stopFunc()
251                 return nil, err
252         }
253         return &az, nil
254 }
255
256 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
257         az.azconfig = azcfg
258         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
259         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
260         disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
261         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
262
263         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
264         if err != nil {
265                 return err
266         }
267
268         authorizer, err := auth.ClientCredentialsConfig{
269                 ClientID:     az.azconfig.ClientID,
270                 ClientSecret: az.azconfig.ClientSecret,
271                 TenantID:     az.azconfig.TenantID,
272                 Resource:     az.azureEnv.ResourceManagerEndpoint,
273                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
274         }.Authorizer()
275         if err != nil {
276                 return err
277         }
278
279         vmClient.Authorizer = authorizer
280         netClient.Authorizer = authorizer
281         disksClient.Authorizer = authorizer
282         storageAcctClient.Authorizer = authorizer
283
284         az.vmClient = &virtualMachinesClientImpl{vmClient}
285         az.netClient = &interfacesClientImpl{netClient}
286         az.disksClient = &disksClientImpl{disksClient}
287
288         az.imageResourceGroup = az.azconfig.ImageResourceGroup
289         if az.imageResourceGroup == "" {
290                 az.imageResourceGroup = az.azconfig.ResourceGroup
291         }
292
293         var client storage.Client
294         if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
295                 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
296                 if err != nil {
297                         az.logger.WithError(err).Warn("Couldn't get account keys")
298                         return err
299                 }
300
301                 key1 := *(*result.Keys)[0].Value
302                 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
303                 if err != nil {
304                         az.logger.WithError(err).Warn("Couldn't make client")
305                         return err
306                 }
307
308                 blobsvc := client.GetBlobService()
309                 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
310         } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
311                 az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
312         }
313
314         az.dispatcherID = dispatcherID
315         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
316
317         go func() {
318                 az.stopWg.Add(1)
319                 defer az.stopWg.Done()
320
321                 tk := time.NewTicker(5 * time.Minute)
322                 for {
323                         select {
324                         case <-az.ctx.Done():
325                                 tk.Stop()
326                                 return
327                         case <-tk.C:
328                                 if az.blobcont != nil {
329                                         az.manageBlobs()
330                                 }
331                                 az.manageDisks()
332                         }
333                 }
334         }()
335
336         az.deleteNIC = make(chan string)
337         az.deleteBlob = make(chan storage.Blob)
338         az.deleteDisk = make(chan compute.Disk)
339
340         for i := 0; i < 4; i++ {
341                 go func() {
342                         for nicname := range az.deleteNIC {
343                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
344                                 if delerr != nil {
345                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
346                                 } else {
347                                         az.logger.Printf("Deleted NIC %v", nicname)
348                                 }
349                         }
350                 }()
351                 go func() {
352                         for blob := range az.deleteBlob {
353                                 err := blob.Delete(nil)
354                                 if err != nil {
355                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
356                                 } else {
357                                         az.logger.Printf("Deleted blob %v", blob.Name)
358                                 }
359                         }
360                 }()
361                 go func() {
362                         for disk := range az.deleteDisk {
363                                 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
364                                 if err != nil {
365                                         az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
366                                 } else {
367                                         az.logger.Printf("Deleted disk %v", *disk.Name)
368                                 }
369                         }
370                 }()
371         }
372
373         return nil
374 }
375
376 func (az *azureInstanceSet) Create(
377         instanceType arvados.InstanceType,
378         imageID cloud.ImageID,
379         newTags cloud.InstanceTags,
380         initCommand cloud.InitCommand,
381         publicKey ssh.PublicKey) (cloud.Instance, error) {
382
383         az.stopWg.Add(1)
384         defer az.stopWg.Done()
385
386         if instanceType.AddedScratch > 0 {
387                 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
388         }
389
390         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
391         if err != nil {
392                 return nil, err
393         }
394
395         name = az.namePrefix + name
396
397         tags := map[string]*string{}
398         for k, v := range newTags {
399                 tags[k] = to.StringPtr(v)
400         }
401         tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
402
403         networkResourceGroup := az.azconfig.NetworkResourceGroup
404         if networkResourceGroup == "" {
405                 networkResourceGroup = az.azconfig.ResourceGroup
406         }
407
408         nicParameters := network.Interface{
409                 Location: &az.azconfig.Location,
410                 Tags:     tags,
411                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
412                         IPConfigurations: &[]network.InterfaceIPConfiguration{
413                                 network.InterfaceIPConfiguration{
414                                         Name: to.StringPtr("ip1"),
415                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
416                                                 Subnet: &network.Subnet{
417                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
418                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
419                                                                 az.azconfig.SubscriptionID,
420                                                                 networkResourceGroup,
421                                                                 az.azconfig.Network,
422                                                                 az.azconfig.Subnet)),
423                                                 },
424                                                 PrivateIPAllocationMethod: network.Dynamic,
425                                         },
426                                 },
427                         },
428                 },
429         }
430         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
431         if err != nil {
432                 return nil, wrapAzureError(err)
433         }
434
435         var blobname string
436         customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
437         var storageProfile *compute.StorageProfile
438
439         re := regexp.MustCompile(`^http(s?)://`)
440         if re.MatchString(string(imageID)) && az.blobcont != nil {
441                 blobname = fmt.Sprintf("%s-os.vhd", name)
442                 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
443                         az.azconfig.StorageAccount,
444                         az.azureEnv.StorageEndpointSuffix,
445                         az.azconfig.BlobContainer,
446                         blobname)
447                 az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
448                 storageProfile = &compute.StorageProfile{
449                         OsDisk: &compute.OSDisk{
450                                 OsType:       compute.Linux,
451                                 Name:         to.StringPtr(name + "-os"),
452                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
453                                 Image: &compute.VirtualHardDisk{
454                                         URI: to.StringPtr(string(imageID)),
455                                 },
456                                 Vhd: &compute.VirtualHardDisk{
457                                         URI: &instanceVhd,
458                                 },
459                         },
460                 }
461         } else if az.blobcont == nil {
462                 storageProfile = &compute.StorageProfile{
463                         ImageReference: &compute.ImageReference{
464                                 ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
465                         },
466                         OsDisk: &compute.OSDisk{
467                                 OsType:       compute.Linux,
468                                 Name:         to.StringPtr(name + "-os"),
469                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
470                         },
471                 }
472         } else {
473                 return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
474         }
475
476         vmParameters := compute.VirtualMachine{
477                 Location: &az.azconfig.Location,
478                 Tags:     tags,
479                 VirtualMachineProperties: &compute.VirtualMachineProperties{
480                         HardwareProfile: &compute.HardwareProfile{
481                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
482                         },
483                         StorageProfile: storageProfile,
484                         NetworkProfile: &compute.NetworkProfile{
485                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
486                                         compute.NetworkInterfaceReference{
487                                                 ID: nic.ID,
488                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
489                                                         Primary: to.BoolPtr(true),
490                                                 },
491                                         },
492                                 },
493                         },
494                         OsProfile: &compute.OSProfile{
495                                 ComputerName:  &name,
496                                 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
497                                 LinuxConfiguration: &compute.LinuxConfiguration{
498                                         DisablePasswordAuthentication: to.BoolPtr(true),
499                                         SSH: &compute.SSHConfiguration{
500                                                 PublicKeys: &[]compute.SSHPublicKey{
501                                                         {
502                                                                 Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
503                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
504                                                         },
505                                                 },
506                                         },
507                                 },
508                                 CustomData: &customData,
509                         },
510                 },
511         }
512
513         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
514         if err != nil {
515                 if az.blobcont != nil {
516                         _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
517                         if delerr != nil {
518                                 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
519                         }
520                 }
521                 // Leave cleaning up of managed disks to the garbage collection in manageDisks()
522
523                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
524                 if delerr != nil {
525                         az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
526                 }
527
528                 return nil, wrapAzureError(err)
529         }
530
531         return &azureInstance{
532                 provider: az,
533                 nic:      nic,
534                 vm:       vm,
535         }, nil
536 }
537
538 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
539         az.stopWg.Add(1)
540         defer az.stopWg.Done()
541
542         interfaces, err := az.manageNics()
543         if err != nil {
544                 return nil, err
545         }
546
547         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
548         if err != nil {
549                 return nil, wrapAzureError(err)
550         }
551
552         var instances []cloud.Instance
553         for ; result.NotDone(); err = result.Next() {
554                 if err != nil {
555                         return nil, wrapAzureError(err)
556                 }
557                 instances = append(instances, &azureInstance{
558                         provider: az,
559                         vm:       result.Value(),
560                         nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
561                 })
562         }
563         return instances, nil
564 }
565
566 // manageNics returns a list of Azure network interface resources.
567 // Also performs garbage collection of NICs which have "namePrefix",
568 // are not associated with a virtual machine and have a "created-at"
569 // time more than DeleteDanglingResourcesAfter (to prevent racing and
570 // deleting newly created NICs) in the past are deleted.
571 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
572         az.stopWg.Add(1)
573         defer az.stopWg.Done()
574
575         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
576         if err != nil {
577                 return nil, wrapAzureError(err)
578         }
579
580         interfaces := make(map[string]network.Interface)
581
582         timestamp := time.Now()
583         for ; result.NotDone(); err = result.Next() {
584                 if err != nil {
585                         az.logger.WithError(err).Warnf("Error listing nics")
586                         return interfaces, nil
587                 }
588                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
589                         if result.Value().VirtualMachine != nil {
590                                 interfaces[*result.Value().ID] = result.Value()
591                         } else {
592                                 if result.Value().Tags["created-at"] != nil {
593                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
594                                         if err == nil {
595                                                 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
596                                                         az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
597                                                         az.deleteNIC <- *result.Value().Name
598                                                 }
599                                         }
600                                 }
601                         }
602                 }
603         }
604         return interfaces, nil
605 }
606
607 // manageBlobs garbage collects blobs (VM disk images) in the
608 // configured storage account container.  It will delete blobs which
609 // have "namePrefix", are "available" (which means they are not
610 // leased to a VM) and haven't been modified for
611 // DeleteDanglingResourcesAfter seconds.
612 func (az *azureInstanceSet) manageBlobs() {
613
614         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
615         timestamp := time.Now()
616
617         for {
618                 response, err := az.blobcont.ListBlobs(page)
619                 if err != nil {
620                         az.logger.WithError(err).Warn("Error listing blobs")
621                         return
622                 }
623                 for _, b := range response.Blobs {
624                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
625                         if b.Properties.BlobType == storage.BlobTypePage &&
626                                 b.Properties.LeaseState == "available" &&
627                                 b.Properties.LeaseStatus == "unlocked" &&
628                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
629
630                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
631                                 az.deleteBlob <- b
632                         }
633                 }
634                 if response.NextMarker != "" {
635                         page.Marker = response.NextMarker
636                 } else {
637                         break
638                 }
639         }
640 }
641
642 // manageDisks garbage collects managed compute disks (VM disk images) in the
643 // configured resource group.  It will delete disks which have "namePrefix",
644 // are "unattached" (which means they are not leased to a VM) and were created
645 // more than DeleteDanglingResourcesAfter seconds ago.  (Azure provides no
646 // modification timestamp on managed disks, there is only a creation timestamp)
647 func (az *azureInstanceSet) manageDisks() {
648
649         re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
650         threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
651
652         response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
653         if err != nil {
654                 az.logger.WithError(err).Warn("Error listing disks")
655                 return
656         }
657
658         for ; response.NotDone(); err = response.Next() {
659                 for _, d := range response.Values() {
660                         if d.DiskProperties.DiskState == compute.Unattached &&
661                                 d.Name != nil && re.MatchString(*d.Name) &&
662                                 d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
663
664                                 az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
665                                 az.deleteDisk <- d
666                         }
667                 }
668         }
669 }
670
671 func (az *azureInstanceSet) Stop() {
672         az.stopFunc()
673         az.stopWg.Wait()
674         close(az.deleteNIC)
675         close(az.deleteBlob)
676         close(az.deleteDisk)
677 }
678
679 type azureInstance struct {
680         provider *azureInstanceSet
681         nic      network.Interface
682         vm       compute.VirtualMachine
683 }
684
685 func (ai *azureInstance) ID() cloud.InstanceID {
686         return cloud.InstanceID(*ai.vm.ID)
687 }
688
689 func (ai *azureInstance) String() string {
690         return *ai.vm.Name
691 }
692
693 func (ai *azureInstance) ProviderType() string {
694         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
695 }
696
697 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
698         ai.provider.stopWg.Add(1)
699         defer ai.provider.stopWg.Done()
700
701         tags := map[string]*string{}
702         for k, v := range ai.vm.Tags {
703                 tags[k] = v
704         }
705         for k, v := range newTags {
706                 tags[k] = to.StringPtr(v)
707         }
708
709         vmParameters := compute.VirtualMachine{
710                 Location: &ai.provider.azconfig.Location,
711                 Tags:     tags,
712         }
713         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
714         if err != nil {
715                 return wrapAzureError(err)
716         }
717         ai.vm = vm
718
719         return nil
720 }
721
722 func (ai *azureInstance) Tags() cloud.InstanceTags {
723         tags := cloud.InstanceTags{}
724         for k, v := range ai.vm.Tags {
725                 tags[k] = *v
726         }
727         return tags
728 }
729
730 func (ai *azureInstance) Destroy() error {
731         ai.provider.stopWg.Add(1)
732         defer ai.provider.stopWg.Done()
733
734         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
735         return wrapAzureError(err)
736 }
737
738 func (ai *azureInstance) Address() string {
739         if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
740                 return ""
741         } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
742                 return ""
743         } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
744                 return ""
745         } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
746                 return ""
747         } else {
748                 return *addr
749         }
750 }
751
752 func (ai *azureInstance) RemoteUser() string {
753         return ai.provider.azconfig.AdminUsername
754 }
755
756 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
757         return cloud.ErrNotImplemented
758 }