1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
19 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
20 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
21 "github.com/Azure/azure-sdk-for-go/storage"
22 "github.com/Azure/go-autorest/autorest"
23 "github.com/Azure/go-autorest/autorest/azure"
24 "github.com/Azure/go-autorest/autorest/azure/auth"
25 "github.com/Azure/go-autorest/autorest/to"
26 "github.com/jmcvetta/randutil"
29 type AzureProviderConfig struct {
30 SubscriptionID string `json:"subscription_id"`
31 ClientID string `json:"key"`
32 ClientSecret string `json:"secret"`
33 TenantID string `json:"tenant_id"`
34 CloudEnv string `json:"cloud_environment"`
35 ResourceGroup string `json:"resource_group"`
36 Location string `json:"region"`
37 Network string `json:"network"`
38 Subnet string `json:"subnet"`
39 StorageAccount string `json:"storage_account"`
40 BlobContainer string `json:"blob_container"`
41 Image string `json:"image"`
42 AuthorizedKey string `json:"authorized_key"`
43 DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
46 type VirtualMachinesClientWrapper interface {
47 CreateOrUpdate(ctx context.Context,
48 resourceGroupName string,
50 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
51 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
52 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
55 type VirtualMachinesClientImpl struct {
56 inner compute.VirtualMachinesClient
59 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
60 resourceGroupName string,
62 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
64 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
66 return compute.VirtualMachine{}, WrapAzureError(err)
68 future.WaitForCompletionRef(ctx, cl.inner.Client)
69 r, err := future.Result(cl.inner)
70 return r, WrapAzureError(err)
73 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
74 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
76 return nil, WrapAzureError(err)
78 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
79 return future.Response(), WrapAzureError(err)
82 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
83 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
84 return r, WrapAzureError(err)
87 type InterfacesClientWrapper interface {
88 CreateOrUpdate(ctx context.Context,
89 resourceGroupName string,
90 networkInterfaceName string,
91 parameters network.Interface) (result network.Interface, err error)
92 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
93 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
96 type InterfacesClientImpl struct {
97 inner network.InterfacesClient
100 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
101 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
103 return nil, WrapAzureError(err)
105 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
106 return future.Response(), WrapAzureError(err)
109 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
110 resourceGroupName string,
111 networkInterfaceName string,
112 parameters network.Interface) (result network.Interface, err error) {
114 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
116 return network.Interface{}, WrapAzureError(err)
118 future.WaitForCompletionRef(ctx, cl.inner.Client)
119 r, err := future.Result(cl.inner)
120 return r, WrapAzureError(err)
123 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
124 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
125 return r, WrapAzureError(err)
128 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
130 type AzureRateLimitError struct {
132 earliestRetry time.Time
135 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
136 return ar.earliestRetry
139 type AzureQuotaError struct {
143 func (ar *AzureQuotaError) IsQuotaError() bool {
147 func WrapAzureError(err error) error {
148 de, ok := err.(autorest.DetailedError)
152 rq, ok := de.Original.(*azure.RequestError)
156 if rq.Response == nil {
159 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
161 ra := rq.Response.Header["Retry-After"][0]
162 earliestRetry, parseErr := http.ParseTime(ra)
164 // Could not parse as a timestamp, must be number of seconds
165 dur, parseErr := strconv.ParseInt(ra, 10, 64)
167 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
171 // Couldn't make sense of retry-after,
172 // so set retry to 20 seconds
173 earliestRetry = time.Now().Add(20 * time.Second)
175 return &AzureRateLimitError{*rq, earliestRetry}
177 if rq.ServiceError == nil {
180 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
181 return &AzureQuotaError{*rq}
186 type AzureProvider struct {
187 azconfig AzureProviderConfig
188 arvconfig arvados.Cluster
189 vmClient VirtualMachinesClientWrapper
190 netClient InterfacesClientWrapper
191 storageAcctClient storageacct.AccountsClient
192 azureEnv azure.Environment
193 interfaces map[string]network.Interface
196 func NewAzureProvider(azcfg AzureProviderConfig, arvcfg arvados.Cluster) (prv Provider, err error) {
197 ap := AzureProvider{}
198 err = ap.setup(azcfg, arvcfg)
205 func (az *AzureProvider) setup(azcfg AzureProviderConfig, arvcfg arvados.Cluster) (err error) {
207 az.arvconfig = arvcfg
208 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
209 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
210 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
212 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
217 authorizer, err := auth.ClientCredentialsConfig{
218 ClientID: az.azconfig.ClientID,
219 ClientSecret: az.azconfig.ClientSecret,
220 TenantID: az.azconfig.TenantID,
221 Resource: az.azureEnv.ResourceManagerEndpoint,
222 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
228 vmClient.Authorizer = authorizer
229 netClient.Authorizer = authorizer
230 storageAcctClient.Authorizer = authorizer
232 az.vmClient = &VirtualMachinesClientImpl{vmClient}
233 az.netClient = &InterfacesClientImpl{netClient}
234 az.storageAcctClient = storageAcctClient
239 func (az *AzureProvider) Create(ctx context.Context,
240 instanceType arvados.InstanceType,
242 instanceTag []InstanceTag) (Instance, error) {
244 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
249 name = "compute-" + name
250 log.Printf("name is %v", name)
252 timestamp := time.Now().Format(time.RFC3339Nano)
254 nicParameters := network.Interface{
255 Location: &az.azconfig.Location,
256 Tags: map[string]*string{
257 "arvados-class": to.StringPtr("crunch-dynamic-compute"),
258 "arvados-cluster": &az.arvconfig.ClusterID,
259 "created-at": ×tamp,
261 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
262 IPConfigurations: &[]network.InterfaceIPConfiguration{
263 network.InterfaceIPConfiguration{
264 Name: to.StringPtr("ip1"),
265 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
266 Subnet: &network.Subnet{
267 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
268 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
269 az.azconfig.SubscriptionID,
270 az.azconfig.ResourceGroup,
272 az.azconfig.Subnet)),
274 PrivateIPAllocationMethod: network.Dynamic,
280 nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
282 return nil, WrapAzureError(err)
285 log.Printf("Created NIC %v", *nic.ID)
287 instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
288 az.azconfig.StorageAccount,
289 az.azureEnv.StorageEndpointSuffix,
290 az.azconfig.BlobContainer,
293 log.Printf("URI instance vhd %v", instance_vhd)
295 vmParameters := compute.VirtualMachine{
296 Location: &az.azconfig.Location,
297 Tags: map[string]*string{
298 "arvados-class": to.StringPtr("crunch-dynamic-compute"),
299 "arvados-instance-type": &instanceType.Name,
300 "arvados-cluster": &az.arvconfig.ClusterID,
301 "created-at": ×tamp,
303 VirtualMachineProperties: &compute.VirtualMachineProperties{
304 HardwareProfile: &compute.HardwareProfile{
305 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
307 StorageProfile: &compute.StorageProfile{
308 OsDisk: &compute.OSDisk{
309 OsType: compute.Linux,
310 Name: to.StringPtr(fmt.Sprintf("%v-os", name)),
311 CreateOption: compute.FromImage,
312 Image: &compute.VirtualHardDisk{
313 URI: to.StringPtr(string(imageId)),
315 Vhd: &compute.VirtualHardDisk{
320 NetworkProfile: &compute.NetworkProfile{
321 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
322 compute.NetworkInterfaceReference{
324 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
325 Primary: to.BoolPtr(true),
330 OsProfile: &compute.OSProfile{
332 AdminUsername: to.StringPtr("arvados"),
333 LinuxConfiguration: &compute.LinuxConfiguration{
334 DisablePasswordAuthentication: to.BoolPtr(true),
335 SSH: &compute.SSHConfiguration{
336 PublicKeys: &[]compute.SSHPublicKey{
337 compute.SSHPublicKey{
338 Path: to.StringPtr("/home/arvados/.ssh/authorized_keys"),
339 KeyData: to.StringPtr(az.azconfig.AuthorizedKey),
344 //CustomData: to.StringPtr(""),
349 vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
351 return nil, WrapAzureError(err)
354 return &AzureInstance{
355 instanceType: instanceType,
362 func (az *AzureProvider) Instances(ctx context.Context) ([]Instance, error) {
363 interfaces, err := az.ManageNics(ctx)
368 result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
370 return nil, WrapAzureError(err)
373 instances := make([]Instance, 0)
375 for ; result.NotDone(); err = result.Next() {
377 return nil, WrapAzureError(err)
379 if result.Value().Tags["arvados-class"] != nil &&
380 result.Value().Tags["arvados-instance-type"] != nil &&
381 (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
382 instances = append(instances, &AzureInstance{
385 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
386 instanceType: az.arvconfig.InstanceTypes[(*result.Value().Tags["arvados-instance-type"])]})
389 return instances, nil
392 func (az *AzureProvider) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
393 result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
395 return nil, WrapAzureError(err)
398 interfaces := make(map[string]network.Interface)
400 timestamp := time.Now()
401 wg := sync.WaitGroup{}
402 deletechannel := make(chan string, 20)
407 for i := 0; i < 4; i += 1 {
410 nicname, ok := <-deletechannel
414 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
416 log.Printf("Error deleting %v: %v", nicname, delerr)
418 log.Printf("Deleted %v", nicname)
425 for ; result.NotDone(); err = result.Next() {
427 log.Printf("Error listing nics: %v", err)
428 return interfaces, WrapAzureError(nil)
430 if result.Value().Tags["arvados-class"] != nil &&
431 (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
433 if result.Value().VirtualMachine != nil {
434 interfaces[*result.Value().ID] = result.Value()
437 if result.Value().Tags["created-at"] != nil {
438 created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
440 //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
441 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
442 log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
444 deletechannel <- *result.Value().Name
451 return interfaces, nil
454 func (az *AzureProvider) ManageBlobs(ctx context.Context) {
455 result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
457 log.Printf("Couldn't get account keys %v", err)
461 key1 := *(*result.Keys)[0].Value
462 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
464 log.Printf("Couldn't make client %v", err)
468 blobsvc := client.GetBlobService()
469 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
471 timestamp := time.Now()
472 wg := sync.WaitGroup{}
473 deletechannel := make(chan storage.Blob, 20)
478 for i := 0; i < 4; i += 1 {
481 blob, ok := <-deletechannel
485 err := blob.Delete(nil)
487 log.Printf("error deleting %v: %v", blob.Name, err)
489 log.Printf("Deleted blob %v", blob.Name)
496 page := storage.ListBlobsParameters{Prefix: "compute-"}
499 response, err := blobcont.ListBlobs(page)
501 log.Printf("Error listing blobs %v", err)
504 for _, b := range response.Blobs {
505 age := timestamp.Sub(time.Time(b.Properties.LastModified))
506 if b.Properties.BlobType == storage.BlobTypePage &&
507 b.Properties.LeaseState == "available" &&
508 b.Properties.LeaseStatus == "unlocked" &&
509 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
511 log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
516 if response.NextMarker != "" {
517 page.Marker = response.NextMarker
524 type AzureInstance struct {
525 instanceType arvados.InstanceType
526 provider *AzureProvider
527 nic network.Interface
528 vm compute.VirtualMachine
531 func (ai *AzureInstance) String() string {
535 func (ai *AzureInstance) InstanceType() arvados.InstanceType {
536 return ai.instanceType
539 func (ai *AzureInstance) SetTags([]InstanceTag) error {
543 func (ai *AzureInstance) GetTags() ([]InstanceTag, error) {
547 func (ai *AzureInstance) Destroy(ctx context.Context) error {
548 _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
549 return WrapAzureError(err)
552 func (ai *AzureInstance) Address() string {
553 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress