Merge branch '21357-favorites-names'
[arvados.git] / lib / cloud / azure / azure.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package azure
6
7 import (
8         "context"
9         "encoding/base64"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "net/http"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cloud"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
23         "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
24         storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/Azure/go-autorest/autorest"
27         "github.com/Azure/go-autorest/autorest/azure"
28         "github.com/Azure/go-autorest/autorest/azure/auth"
29         "github.com/Azure/go-autorest/autorest/to"
30         "github.com/jmcvetta/randutil"
31         "github.com/prometheus/client_golang/prometheus"
32         "github.com/sirupsen/logrus"
33         "golang.org/x/crypto/ssh"
34 )
35
36 // Driver is the azure implementation of the cloud.Driver interface.
37 var Driver = cloud.DriverFunc(newAzureInstanceSet)
38
39 type azureInstanceSetConfig struct {
40         SubscriptionID                 string
41         ClientID                       string
42         ClientSecret                   string
43         TenantID                       string
44         CloudEnvironment               string
45         ResourceGroup                  string
46         ImageResourceGroup             string
47         Location                       string
48         Network                        string
49         NetworkResourceGroup           string
50         Subnet                         string
51         StorageAccount                 string
52         BlobContainer                  string
53         SharedImageGalleryName         string
54         SharedImageGalleryImageVersion string
55         DeleteDanglingResourcesAfter   arvados.Duration
56         AdminUsername                  string
57 }
58
59 type containerWrapper interface {
60         GetBlobReference(name string) *storage.Blob
61         ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
62 }
63
64 type virtualMachinesClientWrapper interface {
65         createOrUpdate(ctx context.Context,
66                 resourceGroupName string,
67                 VMName string,
68                 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
69         delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
70         listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
71 }
72
73 type virtualMachinesClientImpl struct {
74         inner compute.VirtualMachinesClient
75 }
76
77 func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
78         resourceGroupName string,
79         VMName string,
80         parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
81
82         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
83         if err != nil {
84                 return compute.VirtualMachine{}, wrapAzureError(err)
85         }
86         future.WaitForCompletionRef(ctx, cl.inner.Client)
87         r, err := future.Result(cl.inner)
88         return r, wrapAzureError(err)
89 }
90
91 func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
92         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
93         if err != nil {
94                 return nil, wrapAzureError(err)
95         }
96         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
97         return future.Response(), wrapAzureError(err)
98 }
99
100 func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
101         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
102         return r, wrapAzureError(err)
103 }
104
105 type interfacesClientWrapper interface {
106         createOrUpdate(ctx context.Context,
107                 resourceGroupName string,
108                 networkInterfaceName string,
109                 parameters network.Interface) (result network.Interface, err error)
110         delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
111         listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
112 }
113
114 type interfacesClientImpl struct {
115         inner network.InterfacesClient
116 }
117
118 func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
119         future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
120         if err != nil {
121                 return nil, wrapAzureError(err)
122         }
123         err = future.WaitForCompletionRef(ctx, cl.inner.Client)
124         return future.Response(), wrapAzureError(err)
125 }
126
127 func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
128         resourceGroupName string,
129         networkInterfaceName string,
130         parameters network.Interface) (result network.Interface, err error) {
131
132         future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
133         if err != nil {
134                 return network.Interface{}, wrapAzureError(err)
135         }
136         future.WaitForCompletionRef(ctx, cl.inner.Client)
137         r, err := future.Result(cl.inner)
138         return r, wrapAzureError(err)
139 }
140
141 func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
142         r, err := cl.inner.ListComplete(ctx, resourceGroupName)
143         return r, wrapAzureError(err)
144 }
145
146 type disksClientWrapper interface {
147         listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
148         delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
149 }
150
151 type disksClientImpl struct {
152         inner compute.DisksClient
153 }
154
155 func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
156         r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
157         return r, wrapAzureError(err)
158 }
159
160 func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
161         r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
162         return r, wrapAzureError(err)
163 }
164
165 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
166
167 type azureRateLimitError struct {
168         azure.RequestError
169         firstRetry time.Time
170 }
171
172 func (ar *azureRateLimitError) EarliestRetry() time.Time {
173         return ar.firstRetry
174 }
175
176 type azureQuotaError struct {
177         azure.RequestError
178 }
179
180 func (ar *azureQuotaError) IsQuotaError() bool {
181         return true
182 }
183
184 func wrapAzureError(err error) error {
185         de, ok := err.(autorest.DetailedError)
186         if !ok {
187                 return err
188         }
189         rq, ok := de.Original.(*azure.RequestError)
190         if !ok {
191                 return err
192         }
193         if rq.Response == nil {
194                 return err
195         }
196         if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
197                 // API throttling
198                 ra := rq.Response.Header["Retry-After"][0]
199                 earliestRetry, parseErr := http.ParseTime(ra)
200                 if parseErr != nil {
201                         // Could not parse as a timestamp, must be number of seconds
202                         dur, parseErr := strconv.ParseInt(ra, 10, 64)
203                         if parseErr == nil {
204                                 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
205                         } else {
206                                 // Couldn't make sense of retry-after,
207                                 // so set retry to 20 seconds
208                                 earliestRetry = time.Now().Add(20 * time.Second)
209                         }
210                 }
211                 return &azureRateLimitError{*rq, earliestRetry}
212         }
213         if rq.ServiceError == nil {
214                 return err
215         }
216         if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
217                 return &azureQuotaError{*rq}
218         }
219         return err
220 }
221
222 type azureInstanceSet struct {
223         azconfig           azureInstanceSetConfig
224         vmClient           virtualMachinesClientWrapper
225         netClient          interfacesClientWrapper
226         disksClient        disksClientWrapper
227         imageResourceGroup string
228         blobcont           containerWrapper
229         azureEnv           azure.Environment
230         interfaces         map[string]network.Interface
231         dispatcherID       string
232         namePrefix         string
233         ctx                context.Context
234         stopFunc           context.CancelFunc
235         stopWg             sync.WaitGroup
236         deleteNIC          chan string
237         deleteBlob         chan storage.Blob
238         deleteDisk         chan compute.Disk
239         logger             logrus.FieldLogger
240 }
241
242 func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (prv cloud.InstanceSet, err error) {
243         azcfg := azureInstanceSetConfig{}
244         err = json.Unmarshal(config, &azcfg)
245         if err != nil {
246                 return nil, err
247         }
248
249         az := azureInstanceSet{logger: logger}
250         az.ctx, az.stopFunc = context.WithCancel(context.Background())
251         err = az.setup(azcfg, string(dispatcherID))
252         if err != nil {
253                 az.stopFunc()
254                 return nil, err
255         }
256         return &az, nil
257 }
258
259 func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
260         az.azconfig = azcfg
261         vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
262         netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
263         disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
264         storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
265
266         az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
267         if err != nil {
268                 return err
269         }
270
271         authorizer, err := auth.ClientCredentialsConfig{
272                 ClientID:     az.azconfig.ClientID,
273                 ClientSecret: az.azconfig.ClientSecret,
274                 TenantID:     az.azconfig.TenantID,
275                 Resource:     az.azureEnv.ResourceManagerEndpoint,
276                 AADEndpoint:  az.azureEnv.ActiveDirectoryEndpoint,
277         }.Authorizer()
278         if err != nil {
279                 return err
280         }
281
282         vmClient.Authorizer = authorizer
283         netClient.Authorizer = authorizer
284         disksClient.Authorizer = authorizer
285         storageAcctClient.Authorizer = authorizer
286
287         az.vmClient = &virtualMachinesClientImpl{vmClient}
288         az.netClient = &interfacesClientImpl{netClient}
289         az.disksClient = &disksClientImpl{disksClient}
290
291         az.imageResourceGroup = az.azconfig.ImageResourceGroup
292         if az.imageResourceGroup == "" {
293                 az.imageResourceGroup = az.azconfig.ResourceGroup
294         }
295
296         var client storage.Client
297         if az.azconfig.StorageAccount != "" && az.azconfig.BlobContainer != "" {
298                 result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
299                 if err != nil {
300                         az.logger.WithError(err).Warn("Couldn't get account keys")
301                         return err
302                 }
303
304                 key1 := *(*result.Keys)[0].Value
305                 client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
306                 if err != nil {
307                         az.logger.WithError(err).Warn("Couldn't make client")
308                         return err
309                 }
310
311                 blobsvc := client.GetBlobService()
312                 az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
313         } else if az.azconfig.StorageAccount != "" || az.azconfig.BlobContainer != "" {
314                 az.logger.Error("Invalid configuration: StorageAccount and BlobContainer must both be empty or both be set")
315         }
316
317         az.dispatcherID = dispatcherID
318         az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
319
320         go func() {
321                 az.stopWg.Add(1)
322                 defer az.stopWg.Done()
323
324                 tk := time.NewTicker(5 * time.Minute)
325                 for {
326                         select {
327                         case <-az.ctx.Done():
328                                 tk.Stop()
329                                 return
330                         case <-tk.C:
331                                 if az.blobcont != nil {
332                                         az.manageBlobs()
333                                 }
334                                 az.manageDisks()
335                         }
336                 }
337         }()
338
339         az.deleteNIC = make(chan string)
340         az.deleteBlob = make(chan storage.Blob)
341         az.deleteDisk = make(chan compute.Disk)
342
343         for i := 0; i < 4; i++ {
344                 go func() {
345                         for nicname := range az.deleteNIC {
346                                 _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
347                                 if delerr != nil {
348                                         az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
349                                 } else {
350                                         az.logger.Printf("Deleted NIC %v", nicname)
351                                 }
352                         }
353                 }()
354                 go func() {
355                         for blob := range az.deleteBlob {
356                                 err := blob.Delete(nil)
357                                 if err != nil {
358                                         az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
359                                 } else {
360                                         az.logger.Printf("Deleted blob %v", blob.Name)
361                                 }
362                         }
363                 }()
364                 go func() {
365                         for disk := range az.deleteDisk {
366                                 _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
367                                 if err != nil {
368                                         az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
369                                 } else {
370                                         az.logger.Printf("Deleted disk %v", *disk.Name)
371                                 }
372                         }
373                 }()
374         }
375
376         return nil
377 }
378
379 func (az *azureInstanceSet) cleanupNic(nic network.Interface) {
380         _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
381         if delerr != nil {
382                 az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
383         }
384 }
385
386 func (az *azureInstanceSet) Create(
387         instanceType arvados.InstanceType,
388         imageID cloud.ImageID,
389         newTags cloud.InstanceTags,
390         initCommand cloud.InitCommand,
391         publicKey ssh.PublicKey) (cloud.Instance, error) {
392
393         az.stopWg.Add(1)
394         defer az.stopWg.Done()
395
396         if instanceType.AddedScratch > 0 {
397                 return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
398         }
399
400         name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
401         if err != nil {
402                 return nil, err
403         }
404
405         name = az.namePrefix + name
406
407         tags := map[string]*string{}
408         for k, v := range newTags {
409                 tags[k] = to.StringPtr(v)
410         }
411         tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
412
413         networkResourceGroup := az.azconfig.NetworkResourceGroup
414         if networkResourceGroup == "" {
415                 networkResourceGroup = az.azconfig.ResourceGroup
416         }
417
418         nicParameters := network.Interface{
419                 Location: &az.azconfig.Location,
420                 Tags:     tags,
421                 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
422                         IPConfigurations: &[]network.InterfaceIPConfiguration{
423                                 {
424                                         Name: to.StringPtr("ip1"),
425                                         InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
426                                                 Subnet: &network.Subnet{
427                                                         ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
428                                                                 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
429                                                                 az.azconfig.SubscriptionID,
430                                                                 networkResourceGroup,
431                                                                 az.azconfig.Network,
432                                                                 az.azconfig.Subnet)),
433                                                 },
434                                                 PrivateIPAllocationMethod: network.Dynamic,
435                                         },
436                                 },
437                         },
438                 },
439         }
440         nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
441         if err != nil {
442                 return nil, wrapAzureError(err)
443         }
444
445         var blobname string
446         customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
447         var storageProfile *compute.StorageProfile
448
449         re := regexp.MustCompile(`^http(s?)://`)
450         if re.MatchString(string(imageID)) {
451                 if az.blobcont == nil {
452                         az.cleanupNic(nic)
453                         return nil, wrapAzureError(errors.New("Invalid configuration: can't configure unmanaged image URL without StorageAccount and BlobContainer"))
454                 }
455                 blobname = fmt.Sprintf("%s-os.vhd", name)
456                 instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
457                         az.azconfig.StorageAccount,
458                         az.azureEnv.StorageEndpointSuffix,
459                         az.azconfig.BlobContainer,
460                         blobname)
461                 az.logger.Warn("using deprecated unmanaged image, see https://doc.arvados.org/ to migrate to managed disks")
462                 storageProfile = &compute.StorageProfile{
463                         OsDisk: &compute.OSDisk{
464                                 OsType:       compute.Linux,
465                                 Name:         to.StringPtr(name + "-os"),
466                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
467                                 Image: &compute.VirtualHardDisk{
468                                         URI: to.StringPtr(string(imageID)),
469                                 },
470                                 Vhd: &compute.VirtualHardDisk{
471                                         URI: &instanceVhd,
472                                 },
473                         },
474                 }
475         } else {
476                 id := to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID))
477                 if az.azconfig.SharedImageGalleryName != "" && az.azconfig.SharedImageGalleryImageVersion != "" {
478                         id = to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/galleries/" + az.azconfig.SharedImageGalleryName + "/images/" + string(imageID) + "/versions/" + az.azconfig.SharedImageGalleryImageVersion)
479                 } else if az.azconfig.SharedImageGalleryName != "" || az.azconfig.SharedImageGalleryImageVersion != "" {
480                         az.cleanupNic(nic)
481                         return nil, wrapAzureError(errors.New("Invalid configuration: SharedImageGalleryName and SharedImageGalleryImageVersion must both be set or both be empty"))
482                 }
483                 storageProfile = &compute.StorageProfile{
484                         ImageReference: &compute.ImageReference{
485                                 ID: id,
486                         },
487                         OsDisk: &compute.OSDisk{
488                                 OsType:       compute.Linux,
489                                 Name:         to.StringPtr(name + "-os"),
490                                 CreateOption: compute.DiskCreateOptionTypesFromImage,
491                         },
492                 }
493         }
494
495         vmParameters := compute.VirtualMachine{
496                 Location: &az.azconfig.Location,
497                 Tags:     tags,
498                 VirtualMachineProperties: &compute.VirtualMachineProperties{
499                         HardwareProfile: &compute.HardwareProfile{
500                                 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
501                         },
502                         StorageProfile: storageProfile,
503                         NetworkProfile: &compute.NetworkProfile{
504                                 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
505                                         {
506                                                 ID: nic.ID,
507                                                 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
508                                                         Primary: to.BoolPtr(true),
509                                                 },
510                                         },
511                                 },
512                         },
513                         OsProfile: &compute.OSProfile{
514                                 ComputerName:  &name,
515                                 AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
516                                 LinuxConfiguration: &compute.LinuxConfiguration{
517                                         DisablePasswordAuthentication: to.BoolPtr(true),
518                                 },
519                                 CustomData: &customData,
520                         },
521                 },
522         }
523
524         if publicKey != nil {
525                 vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH = &compute.SSHConfiguration{
526                         PublicKeys: &[]compute.SSHPublicKey{
527                                 {
528                                         Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
529                                         KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
530                                 },
531                         },
532                 }
533         }
534
535         if instanceType.Preemptible {
536                 // Setting maxPrice to -1 is the equivalent of paying spot price, up to the
537                 // normal price. This means the node will not be pre-empted for price
538                 // reasons. It may still be pre-empted for capacity reasons though. And
539                 // Azure offers *no* SLA on spot instances.
540                 var maxPrice float64 = -1
541                 vmParameters.VirtualMachineProperties.Priority = compute.Spot
542                 vmParameters.VirtualMachineProperties.EvictionPolicy = compute.Delete
543                 vmParameters.VirtualMachineProperties.BillingProfile = &compute.BillingProfile{MaxPrice: &maxPrice}
544         }
545
546         vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
547         if err != nil {
548                 // Do some cleanup. Otherwise, an unbounded number of new unused nics and
549                 // blobs can pile up during times when VMs can't be created and the
550                 // dispatcher keeps retrying, because the garbage collection in manageBlobs
551                 // and manageNics is only triggered periodically. This is most important
552                 // for nics, because those are subject to a quota.
553                 az.cleanupNic(nic)
554
555                 if blobname != "" {
556                         _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
557                         if delerr != nil {
558                                 az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
559                         }
560                 }
561
562                 // Leave cleaning up of managed disks to the garbage collection in manageDisks()
563
564                 return nil, wrapAzureError(err)
565         }
566
567         return &azureInstance{
568                 provider: az,
569                 nic:      nic,
570                 vm:       vm,
571         }, nil
572 }
573
574 func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
575         az.stopWg.Add(1)
576         defer az.stopWg.Done()
577
578         interfaces, err := az.manageNics()
579         if err != nil {
580                 return nil, err
581         }
582
583         result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
584         if err != nil {
585                 return nil, wrapAzureError(err)
586         }
587
588         var instances []cloud.Instance
589         for ; result.NotDone(); err = result.Next() {
590                 if err != nil {
591                         return nil, wrapAzureError(err)
592                 }
593                 instances = append(instances, &azureInstance{
594                         provider: az,
595                         vm:       result.Value(),
596                         nic:      interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
597                 })
598         }
599         return instances, nil
600 }
601
602 // manageNics returns a list of Azure network interface resources.
603 // Also performs garbage collection of NICs which have "namePrefix",
604 // are not associated with a virtual machine and have a "created-at"
605 // time more than DeleteDanglingResourcesAfter (to prevent racing and
606 // deleting newly created NICs) in the past are deleted.
607 func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
608         az.stopWg.Add(1)
609         defer az.stopWg.Done()
610
611         result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
612         if err != nil {
613                 return nil, wrapAzureError(err)
614         }
615
616         interfaces := make(map[string]network.Interface)
617
618         timestamp := time.Now()
619         for ; result.NotDone(); err = result.Next() {
620                 if err != nil {
621                         az.logger.WithError(err).Warnf("Error listing nics")
622                         return interfaces, nil
623                 }
624                 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
625                         if result.Value().VirtualMachine != nil {
626                                 interfaces[*result.Value().ID] = result.Value()
627                         } else {
628                                 if result.Value().Tags["created-at"] != nil {
629                                         createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
630                                         if err == nil {
631                                                 if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
632                                                         az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
633                                                         az.deleteNIC <- *result.Value().Name
634                                                 }
635                                         }
636                                 }
637                         }
638                 }
639         }
640         return interfaces, nil
641 }
642
643 // manageBlobs garbage collects blobs (VM disk images) in the
644 // configured storage account container.  It will delete blobs which
645 // have "namePrefix", are "available" (which means they are not
646 // leased to a VM) and haven't been modified for
647 // DeleteDanglingResourcesAfter seconds.
648 func (az *azureInstanceSet) manageBlobs() {
649
650         page := storage.ListBlobsParameters{Prefix: az.namePrefix}
651         timestamp := time.Now()
652
653         for {
654                 response, err := az.blobcont.ListBlobs(page)
655                 if err != nil {
656                         az.logger.WithError(err).Warn("Error listing blobs")
657                         return
658                 }
659                 for _, b := range response.Blobs {
660                         age := timestamp.Sub(time.Time(b.Properties.LastModified))
661                         if b.Properties.BlobType == storage.BlobTypePage &&
662                                 b.Properties.LeaseState == "available" &&
663                                 b.Properties.LeaseStatus == "unlocked" &&
664                                 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
665
666                                 az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
667                                 az.deleteBlob <- b
668                         }
669                 }
670                 if response.NextMarker != "" {
671                         page.Marker = response.NextMarker
672                 } else {
673                         break
674                 }
675         }
676 }
677
678 // manageDisks garbage collects managed compute disks (VM disk images) in the
679 // configured resource group.  It will delete disks which have "namePrefix",
680 // are "unattached" (which means they are not leased to a VM) and were created
681 // more than DeleteDanglingResourcesAfter seconds ago.  (Azure provides no
682 // modification timestamp on managed disks, there is only a creation timestamp)
683 func (az *azureInstanceSet) manageDisks() {
684
685         re := regexp.MustCompile(`^` + regexp.QuoteMeta(az.namePrefix) + `.*-os$`)
686         threshold := time.Now().Add(-az.azconfig.DeleteDanglingResourcesAfter.Duration())
687
688         response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
689         if err != nil {
690                 az.logger.WithError(err).Warn("Error listing disks")
691                 return
692         }
693
694         for ; response.NotDone(); err = response.Next() {
695                 if err != nil {
696                         az.logger.WithError(err).Warn("Error getting next page of disks")
697                         return
698                 }
699                 for _, d := range response.Values() {
700                         if d.DiskProperties.DiskState == compute.Unattached &&
701                                 d.Name != nil && re.MatchString(*d.Name) &&
702                                 d.DiskProperties.TimeCreated.ToTime().Before(threshold) {
703
704                                 az.logger.Printf("Disk %v is unlocked and was created at %+v, will delete", *d.Name, d.DiskProperties.TimeCreated.ToTime())
705                                 az.deleteDisk <- d
706                         }
707                 }
708         }
709 }
710
711 func (az *azureInstanceSet) Stop() {
712         az.stopFunc()
713         az.stopWg.Wait()
714         close(az.deleteNIC)
715         close(az.deleteBlob)
716         close(az.deleteDisk)
717 }
718
719 type azureInstance struct {
720         provider *azureInstanceSet
721         nic      network.Interface
722         vm       compute.VirtualMachine
723 }
724
725 func (ai *azureInstance) ID() cloud.InstanceID {
726         return cloud.InstanceID(*ai.vm.ID)
727 }
728
729 func (ai *azureInstance) String() string {
730         return *ai.vm.Name
731 }
732
733 func (ai *azureInstance) ProviderType() string {
734         return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
735 }
736
737 func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
738         ai.provider.stopWg.Add(1)
739         defer ai.provider.stopWg.Done()
740
741         tags := map[string]*string{}
742         for k, v := range ai.vm.Tags {
743                 tags[k] = v
744         }
745         for k, v := range newTags {
746                 tags[k] = to.StringPtr(v)
747         }
748
749         vmParameters := compute.VirtualMachine{
750                 Location: &ai.provider.azconfig.Location,
751                 Tags:     tags,
752         }
753         vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
754         if err != nil {
755                 return wrapAzureError(err)
756         }
757         ai.vm = vm
758
759         return nil
760 }
761
762 func (ai *azureInstance) Tags() cloud.InstanceTags {
763         tags := cloud.InstanceTags{}
764         for k, v := range ai.vm.Tags {
765                 tags[k] = *v
766         }
767         return tags
768 }
769
770 func (ai *azureInstance) Destroy() error {
771         ai.provider.stopWg.Add(1)
772         defer ai.provider.stopWg.Done()
773
774         _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
775         return wrapAzureError(err)
776 }
777
778 func (ai *azureInstance) Address() string {
779         if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
780                 return ""
781         } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
782                 return ""
783         } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
784                 return ""
785         } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
786                 return ""
787         } else {
788                 return *addr
789         }
790 }
791
792 func (ai *azureInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
793         return nil
794 }
795
796 func (ai *azureInstance) RemoteUser() string {
797         return ai.provider.azconfig.AdminUsername
798 }
799
800 func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
801         return cloud.ErrNotImplemented
802 }