1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
20 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
21 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
22 "github.com/Azure/azure-sdk-for-go/storage"
23 "github.com/Azure/go-autorest/autorest"
24 "github.com/Azure/go-autorest/autorest/azure"
25 "github.com/Azure/go-autorest/autorest/azure/auth"
26 "github.com/Azure/go-autorest/autorest/to"
27 "github.com/jmcvetta/randutil"
30 type AzureProviderConfig struct {
31 SubscriptionID string `json:"subscription_id"`
32 ClientID string `json:"key"`
33 ClientSecret string `json:"secret"`
34 TenantID string `json:"tenant_id"`
35 CloudEnv string `json:"cloud_environment"`
36 ResourceGroup string `json:"resource_group"`
37 Location string `json:"region"`
38 Network string `json:"network"`
39 Subnet string `json:"subnet"`
40 StorageAccount string `json:"storage_account"`
41 BlobContainer string `json:"blob_container"`
42 Image string `json:"image"`
43 AuthorizedKey string `json:"authorized_key"`
44 DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
47 type VirtualMachinesClientWrapper interface {
48 CreateOrUpdate(ctx context.Context,
49 resourceGroupName string,
51 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
52 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
53 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
56 type VirtualMachinesClientImpl struct {
57 inner compute.VirtualMachinesClient
60 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
61 resourceGroupName string,
63 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
65 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
67 return compute.VirtualMachine{}, WrapAzureError(err)
69 future.WaitForCompletionRef(ctx, cl.inner.Client)
70 r, err := future.Result(cl.inner)
71 return r, WrapAzureError(err)
74 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
75 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
77 return nil, WrapAzureError(err)
79 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
80 return future.Response(), WrapAzureError(err)
83 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
84 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
85 return r, WrapAzureError(err)
88 type InterfacesClientWrapper interface {
89 CreateOrUpdate(ctx context.Context,
90 resourceGroupName string,
91 networkInterfaceName string,
92 parameters network.Interface) (result network.Interface, err error)
93 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
94 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
97 type InterfacesClientImpl struct {
98 inner network.InterfacesClient
101 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
102 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
104 return nil, WrapAzureError(err)
106 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
107 return future.Response(), WrapAzureError(err)
110 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
111 resourceGroupName string,
112 networkInterfaceName string,
113 parameters network.Interface) (result network.Interface, err error) {
115 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
117 return network.Interface{}, WrapAzureError(err)
119 future.WaitForCompletionRef(ctx, cl.inner.Client)
120 r, err := future.Result(cl.inner)
121 return r, WrapAzureError(err)
124 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
125 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
126 return r, WrapAzureError(err)
129 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
131 type AzureRateLimitError struct {
133 earliestRetry time.Time
136 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
137 return ar.earliestRetry
140 type AzureQuotaError struct {
144 func (ar *AzureQuotaError) IsQuotaError() bool {
148 func WrapAzureError(err error) error {
149 de, ok := err.(autorest.DetailedError)
153 rq, ok := de.Original.(*azure.RequestError)
157 if rq.Response == nil {
160 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
162 ra := rq.Response.Header["Retry-After"][0]
163 earliestRetry, parseErr := http.ParseTime(ra)
165 // Could not parse as a timestamp, must be number of seconds
166 dur, parseErr := strconv.ParseInt(ra, 10, 64)
168 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
172 // Couldn't make sense of retry-after,
173 // so set retry to 20 seconds
174 earliestRetry = time.Now().Add(20 * time.Second)
176 return &AzureRateLimitError{*rq, earliestRetry}
178 if rq.ServiceError == nil {
181 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
182 return &AzureQuotaError{*rq}
187 type AzureProvider struct {
188 azconfig AzureProviderConfig
189 arvconfig arvados.Cluster
190 vmClient VirtualMachinesClientWrapper
191 netClient InterfacesClientWrapper
192 storageAcctClient storageacct.AccountsClient
193 azureEnv azure.Environment
194 interfaces map[string]network.Interface
199 func NewAzureProvider(azcfg AzureProviderConfig, arvcfg arvados.Cluster, dispatcherID string) (prv Provider, err error) {
200 ap := AzureProvider{}
201 err = ap.setup(azcfg, arvcfg, dispatcherID)
208 func (az *AzureProvider) setup(azcfg AzureProviderConfig, arvcfg arvados.Cluster, dispatcherID string) (err error) {
210 az.arvconfig = arvcfg
211 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
212 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
213 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
215 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
220 authorizer, err := auth.ClientCredentialsConfig{
221 ClientID: az.azconfig.ClientID,
222 ClientSecret: az.azconfig.ClientSecret,
223 TenantID: az.azconfig.TenantID,
224 Resource: az.azureEnv.ResourceManagerEndpoint,
225 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
231 vmClient.Authorizer = authorizer
232 netClient.Authorizer = authorizer
233 storageAcctClient.Authorizer = authorizer
235 az.vmClient = &VirtualMachinesClientImpl{vmClient}
236 az.netClient = &InterfacesClientImpl{netClient}
237 az.storageAcctClient = storageAcctClient
239 az.dispatcherID = dispatcherID
240 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
245 func (az *AzureProvider) Create(ctx context.Context,
246 instanceType arvados.InstanceType,
248 newTags InstanceTags) (Instance, error) {
250 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
255 name = az.namePrefix + name
256 log.Printf("name is %v", name)
258 timestamp := time.Now().Format(time.RFC3339Nano)
260 tags := make(map[string]*string)
261 tags["created-at"] = ×tamp
262 for k, v := range newTags {
263 tags["dispatch-"+k] = &v
266 nicParameters := network.Interface{
267 Location: &az.azconfig.Location,
269 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
270 IPConfigurations: &[]network.InterfaceIPConfiguration{
271 network.InterfaceIPConfiguration{
272 Name: to.StringPtr("ip1"),
273 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
274 Subnet: &network.Subnet{
275 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
276 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
277 az.azconfig.SubscriptionID,
278 az.azconfig.ResourceGroup,
280 az.azconfig.Subnet)),
282 PrivateIPAllocationMethod: network.Dynamic,
288 nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
290 return nil, WrapAzureError(err)
293 log.Printf("Created NIC %v", *nic.ID)
295 instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
296 az.azconfig.StorageAccount,
297 az.azureEnv.StorageEndpointSuffix,
298 az.azconfig.BlobContainer,
301 log.Printf("URI instance vhd %v", instance_vhd)
303 tags["arvados-instance-type"] = &instanceType.Name
305 vmParameters := compute.VirtualMachine{
306 Location: &az.azconfig.Location,
308 VirtualMachineProperties: &compute.VirtualMachineProperties{
309 HardwareProfile: &compute.HardwareProfile{
310 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
312 StorageProfile: &compute.StorageProfile{
313 OsDisk: &compute.OSDisk{
314 OsType: compute.Linux,
315 Name: to.StringPtr(name + "-os"),
316 CreateOption: compute.FromImage,
317 Image: &compute.VirtualHardDisk{
318 URI: to.StringPtr(string(imageId)),
320 Vhd: &compute.VirtualHardDisk{
325 NetworkProfile: &compute.NetworkProfile{
326 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
327 compute.NetworkInterfaceReference{
329 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
330 Primary: to.BoolPtr(true),
335 OsProfile: &compute.OSProfile{
337 AdminUsername: to.StringPtr("arvados"),
338 LinuxConfiguration: &compute.LinuxConfiguration{
339 DisablePasswordAuthentication: to.BoolPtr(true),
340 SSH: &compute.SSHConfiguration{
341 PublicKeys: &[]compute.SSHPublicKey{
342 compute.SSHPublicKey{
343 Path: to.StringPtr("/home/arvados/.ssh/authorized_keys"),
344 KeyData: to.StringPtr(az.azconfig.AuthorizedKey),
349 //CustomData: to.StringPtr(""),
354 vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
356 return nil, WrapAzureError(err)
359 return &AzureInstance{
360 instanceType: instanceType,
367 func (az *AzureProvider) Instances(ctx context.Context) ([]Instance, error) {
368 interfaces, err := az.ManageNics(ctx)
373 result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
375 return nil, WrapAzureError(err)
378 instances := make([]Instance, 0)
380 for ; result.NotDone(); err = result.Next() {
382 return nil, WrapAzureError(err)
384 if strings.HasPrefix(*result.Value().Name, az.namePrefix) &&
385 result.Value().Tags["arvados-instance-type"] != nil {
386 instances = append(instances, &AzureInstance{
389 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
390 instanceType: az.arvconfig.InstanceTypes[(*result.Value().Tags["arvados-instance-type"])]})
393 return instances, nil
396 func (az *AzureProvider) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
397 result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
399 return nil, WrapAzureError(err)
402 interfaces := make(map[string]network.Interface)
404 timestamp := time.Now()
405 wg := sync.WaitGroup{}
406 deletechannel := make(chan string, 20)
411 for i := 0; i < 4; i += 1 {
414 nicname, ok := <-deletechannel
418 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
420 log.Printf("Error deleting %v: %v", nicname, delerr)
422 log.Printf("Deleted %v", nicname)
429 for ; result.NotDone(); err = result.Next() {
431 log.Printf("Error listing nics: %v", err)
432 return interfaces, nil
434 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
435 if result.Value().VirtualMachine != nil {
436 interfaces[*result.Value().ID] = result.Value()
438 if result.Value().Tags["created-at"] != nil {
439 created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
441 //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
442 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
443 log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
445 deletechannel <- *result.Value().Name
452 return interfaces, nil
455 func (az *AzureProvider) ManageBlobs(ctx context.Context) {
456 result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
458 log.Printf("Couldn't get account keys %v", err)
462 key1 := *(*result.Keys)[0].Value
463 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
465 log.Printf("Couldn't make client %v", err)
469 blobsvc := client.GetBlobService()
470 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
472 timestamp := time.Now()
473 wg := sync.WaitGroup{}
474 deletechannel := make(chan storage.Blob, 20)
479 for i := 0; i < 4; i += 1 {
482 blob, ok := <-deletechannel
486 err := blob.Delete(nil)
488 log.Printf("error deleting %v: %v", blob.Name, err)
490 log.Printf("Deleted blob %v", blob.Name)
497 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
500 response, err := blobcont.ListBlobs(page)
502 log.Printf("Error listing blobs %v", err)
505 for _, b := range response.Blobs {
506 age := timestamp.Sub(time.Time(b.Properties.LastModified))
507 if b.Properties.BlobType == storage.BlobTypePage &&
508 b.Properties.LeaseState == "available" &&
509 b.Properties.LeaseStatus == "unlocked" &&
510 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
512 log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
517 if response.NextMarker != "" {
518 page.Marker = response.NextMarker
525 type AzureInstance struct {
526 instanceType arvados.InstanceType
527 provider *AzureProvider
528 nic network.Interface
529 vm compute.VirtualMachine
532 func (ai *AzureInstance) String() string {
536 func (ai *AzureInstance) InstanceType() arvados.InstanceType {
537 return ai.instanceType
540 func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
541 tags := make(map[string]*string)
543 for k, v := range ai.vm.Tags {
544 if !strings.HasPrefix(k, "dispatch-") {
548 for k, v := range newTags {
549 tags["dispatch-"+k] = &v
552 vmParameters := compute.VirtualMachine{
553 Location: &ai.provider.azconfig.Location,
556 vm, err := ai.provider.vmClient.CreateOrUpdate(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
558 return WrapAzureError(err)
565 func (ai *AzureInstance) GetTags(ctx context.Context) (InstanceTags, error) {
566 tags := make(map[string]string)
568 for k, v := range ai.vm.Tags {
569 if strings.HasPrefix(k, "dispatch-") {
577 func (ai *AzureInstance) Destroy(ctx context.Context) error {
578 _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
579 return WrapAzureError(err)
582 func (ai *AzureInstance) Address() string {
583 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress