16625: implement some more review feedback.
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "net/http"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
23         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
24         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/Azure/go-autorest/autorest"
27         "github.com/Azure/go-autorest/autorest/azure"
28         "github.com/Azure/go-autorest/autorest/azure/auth"
29         "github.com/Azure/go-autorest/autorest/to"
30         "github.com/jmcvetta/randutil"
31         "github.com/sirupsen/logrus"
32         "golang.org/x/crypto/ssh"
33 )
34
35 // Driver is the azure implementation of the cloud.Driver interface.
36 var Driver = cloud.DriverFunc(newAzureInstanceSet)
37
38 type azureInstanceSetConfig struct {
39         SubscriptionID               string
40         ClientID                     string
41         ClientSecret                 string
42         TenantID                     string
43         CloudEnvironment             string
44         ResourceGroup                string
45         ImageResourceGroup           string
46         Location                     string
47         Network                      string
48         NetworkResourceGroup         string
49         Subnet                       string
50         StorageAccount               string
51         BlobContainer                string
52         DeleteDanglingResourcesAfter arvados.Duration
53         AdminUsername                string
54 }
55
56 type containerWrapper interface {
57         GetBlobReference(name string) *storage.Blob
58         ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
59 }
60
61 type virtualMachinesClientWrapper interface {
62         createOrUpdate(ctx context.Context,
63                 resourceGroupName string,
64                 VMName string,
65                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
66         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
67         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
68 }
69
70 type virtualMachinesClientImpl struct {
71         inner compute.VirtualMachinesClient
72 }
73
74 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
75         resourceGroupName string,
76         VMName string,
77         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
78
79         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
80         if err != nil {
81                 return compute.VirtualMachine{}, wrapAzureError(err)
82         }
83         future.WaitForCompletionRef(ctx, cl.inner.Client)
84         r, err := future.Result(cl.inner)
85         return r, wrapAzureError(err)
86 }
87
88 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
89         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
90         if err != nil {
91                 return nil, wrapAzureError(err)
92         }
93         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
94         return future.Response(), wrapAzureError(err)
95 }
96
97 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
98         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
99         return r, wrapAzureError(err)
100 }
101
102 type interfacesClientWrapper interface {
103         createOrUpdate(ctx context.Context,
104                 resourceGroupName string,
105                 networkInterfaceName string,
106                 parameters network.Interface) (result network.Interface, err error)
107         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
108         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
109 }
110
111 type interfacesClientImpl struct {
112         inner network.InterfacesClient
113 }
114
115 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
116         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
117         if err != nil {
118                 return nil, wrapAzureError(err)
119         }
120         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
121         return future.Response(), wrapAzureError(err)
122 }
123
124 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
125         resourceGroupName string,
126         networkInterfaceName string,
127         parameters network.Interface) (result network.Interface, err error) {
128
129         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
130         if err != nil {
131                 return network.Interface{}, wrapAzureError(err)
132         }
133         future.WaitForCompletionRef(ctx, cl.inner.Client)
134         r, err := future.Result(cl.inner)
135         return r, wrapAzureError(err)
136 }
137
138 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
139         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
140         return r, wrapAzureError(err)
141 }
142
143 type disksClientWrapper interface {
144         listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
145         delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
146 }
147
148 type disksClientImpl struct {
149         inner compute.DisksClient
150 }
151
152 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
153         r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
154         return r, wrapAzureError(err)
155 }
156
157 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
158         r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
159         return r, wrapAzureError(err)
160 }
161
162 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
163
164 type azureRateLimitError struct {
165         azure.RequestError
166         firstRetry time.Time
167 }
168
169 func (ar *azureRateLimitError) EarliestRetry() time.Time {
170         return ar.firstRetry
171 }
172
173 type azureQuotaError struct {
174         azure.RequestError
175 }
176
177 func (ar *azureQuotaError) IsQuotaError() bool {
178         return true
179 }
180
181 func wrapAzureError(err error) error {
182         de, ok := err.(autorest.DetailedError)
183         if !ok {
184                 return err
185         }
186         rq, ok := de.Original.(*azure.RequestError)
187         if !ok {
188                 return err
189         }
190         if rq.Response == nil {
191                 return err
192         }
193         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
194                 // API throttling
195                 ra := rq.Response.Header["Retry-After"][0]
196                 earliestRetry, parseErr := http.ParseTime(ra)
197                 if parseErr != nil {
198                         // Could not parse as a timestamp, must be number of seconds
199                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
200                         if parseErr == nil {
201                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
202                         } else {
203                                 // Couldn't make sense of retry-after,
204                                 // so set retry to 20 seconds
205                                 earliestRetry = time.Now().Add(20 * time.Second)
206                         }
207                 }
208                 return &azureRateLimitError{*rq, earliestRetry}
209         }
210         if rq.ServiceError == nil {
211                 return err
212         }
213         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
214                 return &azureQuotaError{*rq}
215         }
216         return err
217 }
218
219 type azureInstanceSet struct {
220         azconfig           azureInstanceSetConfig
221         vmClient           virtualMachinesClientWrapper
222         netClient          interfacesClientWrapper
223         disksClient        disksClientWrapper
224         imageResourceGroup string
225         blobcont           containerWrapper
226         azureEnv           azure.Environment
227         interfaces         map[string]network.Interface
228         dispatcherID       string
229         namePrefix         string
230         ctx                context.Context
231         stopFunc           context.CancelFunc
232         stopWg             sync.WaitGroup
233         deleteNIC          chan string
234         deleteBlob         chan storage.Blob
235         deleteDisk         chan compute.Disk
236         logger             logrus.FieldLogger
237 }
238
239 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
240         azcfg := azureInstanceSetConfig{}
241         err = json.Unmarshal(config, &azcfg)
242         if err != nil {
243                 return nil, err
244         }
245
246         az := azureInstanceSet{logger: logger}
247         az.ctx, az.stopFunc = context.WithCancel(context.Background())
248         err = az.setup(azcfg, string(dispatcherID))
249         if err != nil {
250                 az.stopFunc()
251                 return nil, err
252         }
253         return &az, nil
254 }
255
256 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
257         az.azconfig = azcfg
258         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
259         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
260         disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
261         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
262
263         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
264         if err != nil {
265                 return err
266         }
267
268         authorizer, err := auth.ClientCredentialsConfig{
269                 ClientID:     az.azconfig.ClientID,
270                 ClientSecret: az.azconfig.ClientSecret,
271                 TenantID:     az.azconfig.TenantID,
272                 Resource:     az.azureEnv.ResourceManagerEndpoint,
273                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
274         }.Authorizer()
275         if err != nil {
276                 return err
277         }
278
279         vmClient.Authorizer = authorizer
280         netClient.Authorizer = authorizer
281         disksClient.Authorizer = authorizer
282         storageAcctClient.Authorizer = authorizer
283
284         az.vmClient = &virtualMachinesClientImpl{vmClient}
285         az.netClient = &interfacesClientImpl{netClient}
286         az.disksClient = &disksClientImpl{disksClient}
287
288         az.imageResourceGroup = az.azconfig.ImageResourceGroup
289         if az.imageResourceGroup == "" {
290                 az.imageResourceGroup = az.azconfig.ResourceGroup
291         }
292
293         var client storage.Client
294         if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
295                 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
296                 if err != nil {
297                         az.logger.WithError(err).Warn("Couldn't get account keys")
298                         return err
299                 }
300
301                 key1 := *(*result.Keys)[0].Value
302                 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
303                 if err != nil {
304                         az.logger.WithError(err).Warn("Couldn't make client")
305                         return err
306                 }
307
308                 blobsvc := client.GetBlobService()
309                 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
310         } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
311                 az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
312         }
313
314         az.dispatcherID = dispatcherID
315         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
316
317         go func() {
318                 az.stopWg.Add(1)
319                 defer az.stopWg.Done()
320
321                 tk := time.NewTicker(5 * time.Minute)
322                 for {
323                         select {
324                         case <-az.ctx.Done():
325                                 tk.Stop()
326                                 return
327                         case <-tk.C:
328                                 if az.blobcont != nil {
329                                         az.manageBlobs()
330                                 }
331                                 az.manageDisks()
332                         }
333                 }
334         }()
335
336         az.deleteNIC = make(chan string)
337         az.deleteBlob = make(chan storage.Blob)
338         az.deleteDisk = make(chan compute.Disk)
339
340         for i := 0; i < 4; i++ {
341                 go func() {
342                         for nicname := range az.deleteNIC {
343                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
344                                 if delerr != nil {
345                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
346                                 } else {
347                                         az.logger.Printf("Deleted NIC %v", nicname)
348                                 }
349                         }
350                 }()
351                 go func() {
352                         for blob := range az.deleteBlob {
353                                 err := blob.Delete(nil)
354                                 if err != nil {
355                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
356                                 } else {
357                                         az.logger.Printf("Deleted blob %v", blob.Name)
358                                 }
359                         }
360                 }()
361                 go func() {
362                         for disk := range az.deleteDisk {
363                                 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
364                                 if err != nil {
365                                         az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
366                                 } else {
367                                         az.logger.Printf("Deleted disk %v", *disk.Name)
368                                 }
369                         }
370                 }()
371         }
372
373         return nil
374 }
375
376 func (az *azureInstanceSet) Create(
377         instanceType arvados.InstanceType,
378         imageID cloud.ImageID,
379         newTags cloud.InstanceTags,
380         initCommand cloud.InitCommand,
381         publicKey ssh.PublicKey) (cloud.Instance, error) {
382
383         az.stopWg.Add(1)
384         defer az.stopWg.Done()
385
386         if instanceType.AddedScratch > 0 {
387                 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
388         }
389
390         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
391         if err != nil {
392                 return nil, err
393         }
394
395         name = az.namePrefix + name
396
397         tags := map[string]*string{}
398         for k, v := range newTags {
399                 tags[k] = to.StringPtr(v)
400         }
401         tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
402
403         networkResourceGroup := az.azconfig.NetworkResourceGroup
404         if networkResourceGroup == "" {
405                 networkResourceGroup = az.azconfig.ResourceGroup
406         }
407
408         nicParameters := network.Interface{
409                 Location: &az.azconfig.Location,
410                 Tags:     tags,
411                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
412                         IPConfigurations: &[]network.InterfaceIPConfiguration{
413                                 network.InterfaceIPConfiguration{
414                                         Name: to.StringPtr("ip1"),
415                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
416                                                 Subnet: &network.Subnet{
417                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
418                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
419                                                                 az.azconfig.SubscriptionID,
420                                                                 networkResourceGroup,
421                                                                 az.azconfig.Network,
422                                                                 az.azconfig.Subnet)),
423                                                 },
424                                                 PrivateIPAllocationMethod: network.Dynamic,
425                                         },
426                                 },
427                         },
428                 },
429         }
430         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
431         if err != nil {
432                 return nil, wrapAzureError(err)
433         }
434
435         var blobname string
436         customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
437         var storageProfile *compute.StorageProfile
438
439         re := regexp.MustCompile(`^http(s?)://`)
440         if re.MatchString(string(imageID)) {
441                 if az.blobcont == nil {
442                         return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
443                 }
444                 blobname = fmt.Sprintf("%s-os.vhd", name)
445                 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
446                         az.azconfig.StorageAccount,
447                         az.azureEnv.StorageEndpointSuffix,
448                         az.azconfig.BlobContainer,
449                         blobname)
450                 az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
451                 storageProfile = &compute.StorageProfile{
452                         OsDisk: &compute.OSDisk{
453                                 OsType:       compute.Linux,
454                                 Name:         to.StringPtr(name + "-os"),
455                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
456                                 Image: &compute.VirtualHardDisk{
457                                         URI: to.StringPtr(string(imageID)),
458                                 },
459                                 Vhd: &compute.VirtualHardDisk{
460                                         URI: &instanceVhd,
461                                 },
462                         },
463                 }
464         } else {
465                 storageProfile = &compute.StorageProfile{
466                         ImageReference: &compute.ImageReference{
467                                 ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
468                         },
469                         OsDisk: &compute.OSDisk{
470                                 OsType:       compute.Linux,
471                                 Name:         to.StringPtr(name + "-os"),
472                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
473                         },
474                 }
475         }
476
477         vmParameters := compute.VirtualMachine{
478                 Location: &az.azconfig.Location,
479                 Tags:     tags,
480                 VirtualMachineProperties: &compute.VirtualMachineProperties{
481                         HardwareProfile: &compute.HardwareProfile{
482                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
483                         },
484                         StorageProfile: storageProfile,
485                         NetworkProfile: &compute.NetworkProfile{
486                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
487                                         compute.NetworkInterfaceReference{
488                                                 ID: nic.ID,
489                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
490                                                         Primary: to.BoolPtr(true),
491                                                 },
492                                         },
493                                 },
494                         },
495                         OsProfile: &compute.OSProfile{
496                                 ComputerName:  &name,
497                                 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
498                                 LinuxConfiguration: &compute.LinuxConfiguration{
499                                         DisablePasswordAuthentication: to.BoolPtr(true),
500                                         SSH: &compute.SSHConfiguration{
501                                                 PublicKeys: &[]compute.SSHPublicKey{
502                                                         {
503                                                                 Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
504                                                                 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
505                                                         },
506                                                 },
507                                         },
508                                 },
509                                 CustomData: &customData,
510                         },
511                 },
512         }
513
514         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
515         if err != nil {
516                 // Do some cleanup. Otherwise, an unbounded number of new unused nics and
517                 // blobs can pile up during times when VMs can't be created and the
518                 // dispatcher keeps retrying, because the garbage collection in manageBlobs
519                 // and manageNics is only triggered periodically. This is most important
520                 // for nics, because those are subject to a quota.
521                 if blobname != "" {
522                         _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
523                         if delerr != nil {
524                                 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
525                         }
526                 }
527                 // Leave cleaning up of managed disks to the garbage collection in manageDisks()
528
529                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
530                 if delerr != nil {
531                         az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
532                 }
533
534                 return nil, wrapAzureError(err)
535         }
536
537         return &azureInstance{
538                 provider: az,
539                 nic:      nic,
540                 vm:       vm,
541         }, nil
542 }
543
544 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
545         az.stopWg.Add(1)
546         defer az.stopWg.Done()
547
548         interfaces, err := az.manageNics()
549         if err != nil {
550                 return nil, err
551         }
552
553         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
554         if err != nil {
555                 return nil, wrapAzureError(err)
556         }
557
558         var instances []cloud.Instance
559         for ; result.NotDone(); err = result.Next() {
560                 if err != nil {
561                         return nil, wrapAzureError(err)
562                 }
563                 instances = append(instances, &azureInstance{
564                         provider: az,
565                         vm:       result.Value(),
566                         nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
567                 })
568         }
569         return instances, nil
570 }
571
572 // manageNics returns a list of Azure network interface resources.
573 // Also performs garbage collection of NICs which have "namePrefix",
574 // are not associated with a virtual machine and have a "created-at"
575 // time more than DeleteDanglingResourcesAfter (to prevent racing and
576 // deleting newly created NICs) in the past are deleted.
577 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
578         az.stopWg.Add(1)
579         defer az.stopWg.Done()
580
581         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
582         if err != nil {
583                 return nil, wrapAzureError(err)
584         }
585
586         interfaces := make(map[string]network.Interface)
587
588         timestamp := time.Now()
589         for ; result.NotDone(); err = result.Next() {
590                 if err != nil {
591                         az.logger.WithError(err).Warnf("Error listing nics")
592                         return interfaces, nil
593                 }
594                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
595                         if result.Value().VirtualMachine != nil {
596                                 interfaces[*result.Value().ID] = result.Value()
597                         } else {
598                                 if result.Value().Tags["created-at"] != nil {
599                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
600                                         if err == nil {
601                                                 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
602                                                         az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
603                                                         az.deleteNIC <- *result.Value().Name
604                                                 }
605                                         }
606                                 }
607                         }
608                 }
609         }
610         return interfaces, nil
611 }
612
613 // manageBlobs garbage collects blobs (VM disk images) in the
614 // configured storage account container.  It will delete blobs which
615 // have "namePrefix", are "available" (which means they are not
616 // leased to a VM) and haven't been modified for
617 // DeleteDanglingResourcesAfter seconds.
618 func (az *azureInstanceSet) manageBlobs() {
619
620         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
621         timestamp := time.Now()
622
623         for {
624                 response, err := az.blobcont.ListBlobs(page)
625                 if err != nil {
626                         az.logger.WithError(err).Warn("Error listing blobs")
627                         return
628                 }
629                 for _, b := range response.Blobs {
630                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
631                         if b.Properties.BlobType == storage.BlobTypePage &&
632                                 b.Properties.LeaseState == "available" &&
633                                 b.Properties.LeaseStatus == "unlocked" &&
634                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
635
636                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
637                                 az.deleteBlob <- b
638                         }
639                 }
640                 if response.NextMarker != "" {
641                         page.Marker = response.NextMarker
642                 } else {
643                         break
644                 }
645         }
646 }
647
648 // manageDisks garbage collects managed compute disks (VM disk images) in the
649 // configured resource group.  It will delete disks which have "namePrefix",
650 // are "unattached" (which means they are not leased to a VM) and were created
651 // more than DeleteDanglingResourcesAfter seconds ago.  (Azure provides no
652 // modification timestamp on managed disks, there is only a creation timestamp)
653 func (az *azureInstanceSet) manageDisks() {
654
655         re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
656         threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
657
658         response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
659         if err != nil {
660                 az.logger.WithError(err).Warn("Error listing disks")
661                 return
662         }
663
664         for ; response.NotDone(); err = response.Next() {
665                 for _, d := range response.Values() {
666                         if d.DiskProperties.DiskState == compute.Unattached &&
667                                 d.Name != nil && re.MatchString(*d.Name) &&
668                                 d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
669
670                                 az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
671                                 az.deleteDisk <- d
672                         }
673                 }
674         }
675 }
676
677 func (az *azureInstanceSet) Stop() {
678         az.stopFunc()
679         az.stopWg.Wait()
680         close(az.deleteNIC)
681         close(az.deleteBlob)
682         close(az.deleteDisk)
683 }
684
685 type azureInstance struct {
686         provider *azureInstanceSet
687         nic      network.Interface
688         vm       compute.VirtualMachine
689 }
690
691 func (ai *azureInstance) ID() cloud.InstanceID {
692         return cloud.InstanceID(*ai.vm.ID)
693 }
694
695 func (ai *azureInstance) String() string {
696         return *ai.vm.Name
697 }
698
699 func (ai *azureInstance) ProviderType() string {
700         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
701 }
702
703 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
704         ai.provider.stopWg.Add(1)
705         defer ai.provider.stopWg.Done()
706
707         tags := map[string]*string{}
708         for k, v := range ai.vm.Tags {
709                 tags[k] = v
710         }
711         for k, v := range newTags {
712                 tags[k] = to.StringPtr(v)
713         }
714
715         vmParameters := compute.VirtualMachine{
716                 Location: &ai.provider.azconfig.Location,
717                 Tags:     tags,
718         }
719         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
720         if err != nil {
721                 return wrapAzureError(err)
722         }
723         ai.vm = vm
724
725         return nil
726 }
727
728 func (ai *azureInstance) Tags() cloud.InstanceTags {
729         tags := cloud.InstanceTags{}
730         for k, v := range ai.vm.Tags {
731                 tags[k] = *v
732         }
733         return tags
734 }
735
736 func (ai *azureInstance) Destroy() error {
737         ai.provider.stopWg.Add(1)
738         defer ai.provider.stopWg.Done()
739
740         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
741         return wrapAzureError(err)
742 }
743
744 func (ai *azureInstance) Address() string {
745         if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
746                 return ""
747         } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
748                 return ""
749         } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
750                 return ""
751         } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
752                 return ""
753         } else {
754                 return *addr
755         }
756 }
757
758 func (ai *azureInstance) RemoteUser() string {
759         return ai.provider.azconfig.AdminUsername
760 }
761
762 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
763         return cloud.ErrNotImplemented
764 }