16106: update Azure preemptible node code after real world testing. Add
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "net/http"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
23         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
24         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/Azure/go-autorest/autorest"
27         "github.com/Azure/go-autorest/autorest/azure"
28         "github.com/Azure/go-autorest/autorest/azure/auth"
29         "github.com/Azure/go-autorest/autorest/to"
30         "github.com/jmcvetta/randutil"
31         "github.com/sirupsen/logrus"
32         "golang.org/x/crypto/ssh"
33 )
34
35 // Driver is the azure implementation of the cloud.Driver interface.
36 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37
38 type azureInstanceSetConfig struct {
39         SubscriptionID                 string
40         ClientID                       string
41         ClientSecret                   string
42         TenantID                       string
43         CloudEnvironment               string
44         ResourceGroup                  string
45         ImageResourceGroup             string
46         Location                       string
47         Network                        string
48         NetworkResourceGroup           string
49         Subnet                         string
50         StorageAccount                 string
51         BlobContainer                  string
52         SharedImageGalleryName         string
53         SharedImageGalleryImageVersion string
54         DeleteDanglingResourcesAfter   arvados.Duration
55         AdminUsername                  string
56 }
57
58 type containerWrapper interface {
59         GetBlobReference(name string) *storage.Blob
60         ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
61 }
62
63 type virtualMachinesClientWrapper interface {
64         createOrUpdate(ctx context.Context,
65                 resourceGroupName string,
66                 VMName string,
67                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
68         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
69         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
70 }
71
72 type virtualMachinesClientImpl struct {
73         inner compute.VirtualMachinesClient
74 }
75
76 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
77         resourceGroupName string,
78         VMName string,
79         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
80
81         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
82         if err != nil {
83                 return compute.VirtualMachine{}, wrapAzureError(err)
84         }
85         future.WaitForCompletionRef(ctx, cl.inner.Client)
86         r, err := future.Result(cl.inner)
87         return r, wrapAzureError(err)
88 }
89
90 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
91         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
92         if err != nil {
93                 return nil, wrapAzureError(err)
94         }
95         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
96         return future.Response(), wrapAzureError(err)
97 }
98
99 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
100         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
101         return r, wrapAzureError(err)
102 }
103
104 type interfacesClientWrapper interface {
105         createOrUpdate(ctx context.Context,
106                 resourceGroupName string,
107                 networkInterfaceName string,
108                 parameters network.Interface) (result network.Interface, err error)
109         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
110         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
111 }
112
113 type interfacesClientImpl struct {
114         inner network.InterfacesClient
115 }
116
117 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
118         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
119         if err != nil {
120                 return nil, wrapAzureError(err)
121         }
122         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
123         return future.Response(), wrapAzureError(err)
124 }
125
126 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
127         resourceGroupName string,
128         networkInterfaceName string,
129         parameters network.Interface) (result network.Interface, err error) {
130
131         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
132         if err != nil {
133                 return network.Interface{}, wrapAzureError(err)
134         }
135         future.WaitForCompletionRef(ctx, cl.inner.Client)
136         r, err := future.Result(cl.inner)
137         return r, wrapAzureError(err)
138 }
139
140 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
141         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
142         return r, wrapAzureError(err)
143 }
144
145 type disksClientWrapper interface {
146         listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
147         delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
148 }
149
150 type disksClientImpl struct {
151         inner compute.DisksClient
152 }
153
154 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
155         r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
156         return r, wrapAzureError(err)
157 }
158
159 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
160         r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
161         return r, wrapAzureError(err)
162 }
163
164 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
165
166 type azureRateLimitError struct {
167         azure.RequestError
168         firstRetry time.Time
169 }
170
171 func (ar *azureRateLimitError) EarliestRetry() time.Time {
172         return ar.firstRetry
173 }
174
175 type azureQuotaError struct {
176         azure.RequestError
177 }
178
179 func (ar *azureQuotaError) IsQuotaError() bool {
180         return true
181 }
182
183 func wrapAzureError(err error) error {
184         de, ok := err.(autorest.DetailedError)
185         if !ok {
186                 return err
187         }
188         rq, ok := de.Original.(*azure.RequestError)
189         if !ok {
190                 return err
191         }
192         if rq.Response == nil {
193                 return err
194         }
195         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
196                 // API throttling
197                 ra := rq.Response.Header["Retry-After"][0]
198                 earliestRetry, parseErr := http.ParseTime(ra)
199                 if parseErr != nil {
200                         // Could not parse as a timestamp, must be number of seconds
201                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
202                         if parseErr == nil {
203                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
204                         } else {
205                                 // Couldn't make sense of retry-after,
206                                 // so set retry to 20 seconds
207                                 earliestRetry = time.Now().Add(20 * time.Second)
208                         }
209                 }
210                 return &azureRateLimitError{*rq, earliestRetry}
211         }
212         if rq.ServiceError == nil {
213                 return err
214         }
215         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
216                 return &azureQuotaError{*rq}
217         }
218         return err
219 }
220
221 type azureInstanceSet struct {
222         azconfig           azureInstanceSetConfig
223         vmClient           virtualMachinesClientWrapper
224         netClient          interfacesClientWrapper
225         disksClient        disksClientWrapper
226         imageResourceGroup string
227         blobcont           containerWrapper
228         azureEnv           azure.Environment
229         interfaces         map[string]network.Interface
230         dispatcherID       string
231         namePrefix         string
232         ctx                context.Context
233         stopFunc           context.CancelFunc
234         stopWg             sync.WaitGroup
235         deleteNIC          chan string
236         deleteBlob         chan storage.Blob
237         deleteDisk         chan compute.Disk
238         logger             logrus.FieldLogger
239 }
240
241 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
242         azcfg := azureInstanceSetConfig{}
243         err = json.Unmarshal(config, &azcfg)
244         if err != nil {
245                 return nil, err
246         }
247
248         az := azureInstanceSet{logger: logger}
249         az.ctx, az.stopFunc = context.WithCancel(context.Background())
250         err = az.setup(azcfg, string(dispatcherID))
251         if err != nil {
252                 az.stopFunc()
253                 return nil, err
254         }
255         return &az, nil
256 }
257
258 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
259         az.azconfig = azcfg
260         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
261         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
262         disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
263         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
264
265         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
266         if err != nil {
267                 return err
268         }
269
270         authorizer, err := auth.ClientCredentialsConfig{
271                 ClientID:     az.azconfig.ClientID,
272                 ClientSecret: az.azconfig.ClientSecret,
273                 TenantID:     az.azconfig.TenantID,
274                 Resource:     az.azureEnv.ResourceManagerEndpoint,
275                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
276         }.Authorizer()
277         if err != nil {
278                 return err
279         }
280
281         vmClient.Authorizer = authorizer
282         netClient.Authorizer = authorizer
283         disksClient.Authorizer = authorizer
284         storageAcctClient.Authorizer = authorizer
285
286         az.vmClient = &virtualMachinesClientImpl{vmClient}
287         az.netClient = &interfacesClientImpl{netClient}
288         az.disksClient = &disksClientImpl{disksClient}
289
290         az.imageResourceGroup = az.azconfig.ImageResourceGroup
291         if az.imageResourceGroup == "" {
292                 az.imageResourceGroup = az.azconfig.ResourceGroup
293         }
294
295         var client storage.Client
296         if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
297                 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
298                 if err != nil {
299                         az.logger.WithError(err).Warn("Couldn't get account keys")
300                         return err
301                 }
302
303                 key1 := *(*result.Keys)[0].Value
304                 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
305                 if err != nil {
306                         az.logger.WithError(err).Warn("Couldn't make client")
307                         return err
308                 }
309
310                 blobsvc := client.GetBlobService()
311                 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
312         } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
313                 az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
314         }
315
316         az.dispatcherID = dispatcherID
317         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
318
319         go func() {
320                 az.stopWg.Add(1)
321                 defer az.stopWg.Done()
322
323                 tk := time.NewTicker(5 * time.Minute)
324                 for {
325                         select {
326                         case <-az.ctx.Done():
327                                 tk.Stop()
328                                 return
329                         case <-tk.C:
330                                 if az.blobcont != nil {
331                                         az.manageBlobs()
332                                 }
333                                 az.manageDisks()
334                         }
335                 }
336         }()
337
338         az.deleteNIC = make(chan string)
339         az.deleteBlob = make(chan storage.Blob)
340         az.deleteDisk = make(chan compute.Disk)
341
342         for i := 0; i < 4; i++ {
343                 go func() {
344                         for nicname := range az.deleteNIC {
345                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
346                                 if delerr != nil {
347                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
348                                 } else {
349                                         az.logger.Printf("Deleted NIC %v", nicname)
350                                 }
351                         }
352                 }()
353                 go func() {
354                         for blob := range az.deleteBlob {
355                                 err := blob.Delete(nil)
356                                 if err != nil {
357                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
358                                 } else {
359                                         az.logger.Printf("Deleted blob %v", blob.Name)
360                                 }
361                         }
362                 }()
363                 go func() {
364                         for disk := range az.deleteDisk {
365                                 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
366                                 if err != nil {
367                                         az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
368                                 } else {
369                                         az.logger.Printf("Deleted disk %v", *disk.Name)
370                                 }
371                         }
372                 }()
373         }
374
375         return nil
376 }
377
378 func (az *azureInstanceSet) cleanupNic(nic network.Interface) {
379         _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
380         if delerr != nil {
381                 az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
382         }
383 }
384
385 func (az *azureInstanceSet) Create(
386         instanceType arvados.InstanceType,
387         imageID cloud.ImageID,
388         newTags cloud.InstanceTags,
389         initCommand cloud.InitCommand,
390         publicKey ssh.PublicKey) (cloud.Instance, error) {
391
392         az.stopWg.Add(1)
393         defer az.stopWg.Done()
394
395         if instanceType.AddedScratch > 0 {
396                 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
397         }
398
399         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
400         if err != nil {
401                 return nil, err
402         }
403
404         name = az.namePrefix + name
405
406         tags := map[string]*string{}
407         for k, v := range newTags {
408                 tags[k] = to.StringPtr(v)
409         }
410         tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
411
412         networkResourceGroup := az.azconfig.NetworkResourceGroup
413         if networkResourceGroup == "" {
414                 networkResourceGroup = az.azconfig.ResourceGroup
415         }
416
417         nicParameters := network.Interface{
418                 Location: &az.azconfig.Location,
419                 Tags:     tags,
420                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
421                         IPConfigurations: &[]network.InterfaceIPConfiguration{
422                                 {
423                                         Name: to.StringPtr("ip1"),
424                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
425                                                 Subnet: &network.Subnet{
426                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
427                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
428                                                                 az.azconfig.SubscriptionID,
429                                                                 networkResourceGroup,
430                                                                 az.azconfig.Network,
431                                                                 az.azconfig.Subnet)),
432                                                 },
433                                                 PrivateIPAllocationMethod: network.Dynamic,
434                                         },
435                                 },
436                         },
437                 },
438         }
439         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
440         if err != nil {
441                 return nil, wrapAzureError(err)
442         }
443
444         var blobname string
445         customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
446         var storageProfile *compute.StorageProfile
447
448         re := regexp.MustCompile(`^http(s?)://`)
449         if re.MatchString(string(imageID)) {
450                 if az.blobcont == nil {
451                         az.cleanupNic(nic)
452                         return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
453                 }
454                 blobname = fmt.Sprintf("%s-os.vhd", name)
455                 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
456                         az.azconfig.StorageAccount,
457                         az.azureEnv.StorageEndpointSuffix,
458                         az.azconfig.BlobContainer,
459                         blobname)
460                 az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
461                 storageProfile = &compute.StorageProfile{
462                         OsDisk: &compute.OSDisk{
463                                 OsType:       compute.Linux,
464                                 Name:         to.StringPtr(name + "-os"),
465                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
466                                 Image: &compute.VirtualHardDisk{
467                                         URI: to.StringPtr(string(imageID)),
468                                 },
469                                 Vhd: &compute.VirtualHardDisk{
470                                         URI: &instanceVhd,
471                                 },
472                         },
473                 }
474         } else {
475                 id := to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID))
476                 if az.azconfig.SharedImageGalleryName != "" && az.azconfig.SharedImageGalleryImageVersion != "" {
477                         id = to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/galleries/" + az.azconfig.SharedImageGalleryName + "/images/" + string(imageID) + "/versions/" + az.azconfig.SharedImageGalleryImageVersion)
478                 } else if az.azconfig.SharedImageGalleryName != "" || az.azconfig.SharedImageGalleryImageVersion != "" {
479                         az.cleanupNic(nic)
480                         return nil, wrapAzureError(errors.New("Invalid configuration: SharedImageGalleryName and SharedImageGalleryImageVersion must both be set or both be empty"))
481                 }
482                 storageProfile = &compute.StorageProfile{
483                         ImageReference: &compute.ImageReference{
484                                 ID: id,
485                         },
486                         OsDisk: &compute.OSDisk{
487                                 OsType:       compute.Linux,
488                                 Name:         to.StringPtr(name + "-os"),
489                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
490                         },
491                 }
492         }
493
494         vmParameters := compute.VirtualMachine{
495                 Location: &az.azconfig.Location,
496                 Tags:     tags,
497                 VirtualMachineProperties: &compute.VirtualMachineProperties{
498                         HardwareProfile: &compute.HardwareProfile{
499                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
500                         },
501                         StorageProfile: storageProfile,
502                         NetworkProfile: &compute.NetworkProfile{
503                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
504                                         {
505                                                 ID: nic.ID,
506                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
507                                                         Primary: to.BoolPtr(true),
508                                                 },
509                                         },
510                                 },
511                         },
512                         OsProfile: &compute.OSProfile{
513                                 ComputerName:  &name,
514                                 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
515                                 LinuxConfiguration: &compute.LinuxConfiguration{
516                                         DisablePasswordAuthentication: to.BoolPtr(true),
517                                         SSH: &compute.SSHConfiguration{
518                                                 PublicKeys: &[]compute.SSHPublicKey{
519                                                         {
520                                                                 Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
521                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
522                                                         },
523                                                 },
524                                         },
525                                 },
526                                 CustomData: &customData,
527                         },
528                 },
529         }
530
531         var maxPrice float64
532         if instanceType.Preemptible {
533                 // Setting maxPrice to -1 is the equivalent of paying spot price, up to the
534                 // normal price. This means the node will not be pre-empted for price
535                 // reasons. It may still be pre-empted for capacity reasons though. And
536                 // Azure offers *no* SLA on spot instances.
537                 maxPrice = -1
538                 vmParameters.VirtualMachineProperties.Priority = compute.Spot
539                 vmParameters.VirtualMachineProperties.EvictionPolicy = compute.Delete
540                 vmParameters.VirtualMachineProperties.BillingProfile = &compute.BillingProfile{MaxPrice: &maxPrice}
541         }
542
543         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
544         if err != nil {
545                 // Do some cleanup. Otherwise, an unbounded number of new unused nics and
546                 // blobs can pile up during times when VMs can't be created and the
547                 // dispatcher keeps retrying, because the garbage collection in manageBlobs
548                 // and manageNics is only triggered periodically. This is most important
549                 // for nics, because those are subject to a quota.
550                 az.cleanupNic(nic)
551
552                 if blobname != "" {
553                         _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
554                         if delerr != nil {
555                                 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
556                         }
557                 }
558
559                 // Leave cleaning up of managed disks to the garbage collection in manageDisks()
560
561                 return nil, wrapAzureError(err)
562         }
563
564         return &azureInstance{
565                 provider: az,
566                 nic:      nic,
567                 vm:       vm,
568         }, nil
569 }
570
571 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
572         az.stopWg.Add(1)
573         defer az.stopWg.Done()
574
575         interfaces, err := az.manageNics()
576         if err != nil {
577                 return nil, err
578         }
579
580         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
581         if err != nil {
582                 return nil, wrapAzureError(err)
583         }
584
585         var instances []cloud.Instance
586         for ; result.NotDone(); err = result.Next() {
587                 if err != nil {
588                         return nil, wrapAzureError(err)
589                 }
590                 instances = append(instances, &azureInstance{
591                         provider: az,
592                         vm:       result.Value(),
593                         nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
594                 })
595         }
596         return instances, nil
597 }
598
599 // manageNics returns a list of Azure network interface resources.
600 // Also performs garbage collection of NICs which have "namePrefix",
601 // are not associated with a virtual machine and have a "created-at"
602 // time more than DeleteDanglingResourcesAfter (to prevent racing and
603 // deleting newly created NICs) in the past are deleted.
604 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
605         az.stopWg.Add(1)
606         defer az.stopWg.Done()
607
608         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
609         if err != nil {
610                 return nil, wrapAzureError(err)
611         }
612
613         interfaces := make(map[string]network.Interface)
614
615         timestamp := time.Now()
616         for ; result.NotDone(); err = result.Next() {
617                 if err != nil {
618                         az.logger.WithError(err).Warnf("Error listing nics")
619                         return interfaces, nil
620                 }
621                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
622                         if result.Value().VirtualMachine != nil {
623                                 interfaces[*result.Value().ID] = result.Value()
624                         } else {
625                                 if result.Value().Tags["created-at"] != nil {
626                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
627                                         if err == nil {
628                                                 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
629                                                         az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
630                                                         az.deleteNIC <- *result.Value().Name
631                                                 }
632                                         }
633                                 }
634                         }
635                 }
636         }
637         return interfaces, nil
638 }
639
640 // manageBlobs garbage collects blobs (VM disk images) in the
641 // configured storage account container.  It will delete blobs which
642 // have "namePrefix", are "available" (which means they are not
643 // leased to a VM) and haven't been modified for
644 // DeleteDanglingResourcesAfter seconds.
645 func (az *azureInstanceSet) manageBlobs() {
646
647         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
648         timestamp := time.Now()
649
650         for {
651                 response, err := az.blobcont.ListBlobs(page)
652                 if err != nil {
653                         az.logger.WithError(err).Warn("Error listing blobs")
654                         return
655                 }
656                 for _, b := range response.Blobs {
657                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
658                         if b.Properties.BlobType == storage.BlobTypePage &&
659                                 b.Properties.LeaseState == "available" &&
660                                 b.Properties.LeaseStatus == "unlocked" &&
661                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
662
663                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
664                                 az.deleteBlob <- b
665                         }
666                 }
667                 if response.NextMarker != "" {
668                         page.Marker = response.NextMarker
669                 } else {
670                         break
671                 }
672         }
673 }
674
675 // manageDisks garbage collects managed compute disks (VM disk images) in the
676 // configured resource group.  It will delete disks which have "namePrefix",
677 // are "unattached" (which means they are not leased to a VM) and were created
678 // more than DeleteDanglingResourcesAfter seconds ago.  (Azure provides no
679 // modification timestamp on managed disks, there is only a creation timestamp)
680 func (az *azureInstanceSet) manageDisks() {
681
682         re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
683         threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
684
685         response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
686         if err != nil {
687                 az.logger.WithError(err).Warn("Error listing disks")
688                 return
689         }
690
691         for ; response.NotDone(); err = response.Next() {
692                 if err != nil {
693                         az.logger.WithError(err).Warn("Error getting next page of disks")
694                         return
695                 }
696                 for _, d := range response.Values() {
697                         if d.DiskProperties.DiskState == compute.Unattached &&
698                                 d.Name != nil && re.MatchString(*d.Name) &&
699                                 d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
700
701                                 az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
702                                 az.deleteDisk <- d
703                         }
704                 }
705         }
706 }
707
708 func (az *azureInstanceSet) Stop() {
709         az.stopFunc()
710         az.stopWg.Wait()
711         close(az.deleteNIC)
712         close(az.deleteBlob)
713         close(az.deleteDisk)
714 }
715
716 type azureInstance struct {
717         provider *azureInstanceSet
718         nic      network.Interface
719         vm       compute.VirtualMachine
720 }
721
722 func (ai *azureInstance) ID() cloud.InstanceID {
723         return cloud.InstanceID(*ai.vm.ID)
724 }
725
726 func (ai *azureInstance) String() string {
727         return *ai.vm.Name
728 }
729
730 func (ai *azureInstance) ProviderType() string {
731         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
732 }
733
734 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
735         ai.provider.stopWg.Add(1)
736         defer ai.provider.stopWg.Done()
737
738         tags := map[string]*string{}
739         for k, v := range ai.vm.Tags {
740                 tags[k] = v
741         }
742         for k, v := range newTags {
743                 tags[k] = to.StringPtr(v)
744         }
745
746         vmParameters := compute.VirtualMachine{
747                 Location: &ai.provider.azconfig.Location,
748                 Tags:     tags,
749         }
750         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
751         if err != nil {
752                 return wrapAzureError(err)
753         }
754         ai.vm = vm
755
756         return nil
757 }
758
759 func (ai *azureInstance) Tags() cloud.InstanceTags {
760         tags := cloud.InstanceTags{}
761         for k, v := range ai.vm.Tags {
762                 tags[k] = *v
763         }
764         return tags
765 }
766
767 func (ai *azureInstance) Destroy() error {
768         ai.provider.stopWg.Add(1)
769         defer ai.provider.stopWg.Done()
770
771         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
772         return wrapAzureError(err)
773 }
774
775 func (ai *azureInstance) Address() string {
776         if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
777                 return ""
778         } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
779                 return ""
780         } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
781                 return ""
782         } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
783                 return ""
784         } else {
785                 return *addr
786         }
787 }
788
789 func (ai *azureInstance) RemoteUser() string {
790         return ai.provider.azconfig.AdminUsername
791 }
792
793 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
794         return cloud.ErrNotImplemented
795 }