1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
21 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
22 storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
23 "github.com/Azure/azure-sdk-for-go/storage"
24 "github.com/Azure/go-autorest/autorest"
25 "github.com/Azure/go-autorest/autorest/azure"
26 "github.com/Azure/go-autorest/autorest/azure/auth"
27 "github.com/Azure/go-autorest/autorest/to"
28 "github.com/jmcvetta/randutil"
29 "golang.org/x/crypto/ssh"
32 type AzureInstanceSetConfig struct {
33 SubscriptionID string `json:"subscription_id"`
34 ClientID string `json:"key"`
35 ClientSecret string `json:"secret"`
36 TenantID string `json:"tenant_id"`
37 CloudEnv string `json:"cloud_environment"`
38 ResourceGroup string `json:"resource_group"`
39 Location string `json:"region"`
40 Network string `json:"network"`
41 Subnet string `json:"subnet"`
42 StorageAccount string `json:"storage_account"`
43 BlobContainer string `json:"blob_container"`
44 Image string `json:"image"`
45 DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
48 type VirtualMachinesClientWrapper interface {
49 CreateOrUpdate(ctx context.Context,
50 resourceGroupName string,
52 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
53 Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
54 ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
57 type VirtualMachinesClientImpl struct {
58 inner compute.VirtualMachinesClient
61 func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
62 resourceGroupName string,
64 parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
66 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
68 return compute.VirtualMachine{}, WrapAzureError(err)
70 future.WaitForCompletionRef(ctx, cl.inner.Client)
71 r, err := future.Result(cl.inner)
72 return r, WrapAzureError(err)
75 func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
76 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
78 return nil, WrapAzureError(err)
80 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
81 return future.Response(), WrapAzureError(err)
84 func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
85 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
86 return r, WrapAzureError(err)
89 type InterfacesClientWrapper interface {
90 CreateOrUpdate(ctx context.Context,
91 resourceGroupName string,
92 networkInterfaceName string,
93 parameters network.Interface) (result network.Interface, err error)
94 Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
95 ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
98 type InterfacesClientImpl struct {
99 inner network.InterfacesClient
102 func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
103 future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
105 return nil, WrapAzureError(err)
107 err = future.WaitForCompletionRef(ctx, cl.inner.Client)
108 return future.Response(), WrapAzureError(err)
111 func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
112 resourceGroupName string,
113 networkInterfaceName string,
114 parameters network.Interface) (result network.Interface, err error) {
116 future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
118 return network.Interface{}, WrapAzureError(err)
120 future.WaitForCompletionRef(ctx, cl.inner.Client)
121 r, err := future.Result(cl.inner)
122 return r, WrapAzureError(err)
125 func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
126 r, err := cl.inner.ListComplete(ctx, resourceGroupName)
127 return r, WrapAzureError(err)
130 var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
132 type AzureRateLimitError struct {
134 earliestRetry time.Time
137 func (ar *AzureRateLimitError) EarliestRetry() time.Time {
138 return ar.earliestRetry
141 type AzureQuotaError struct {
145 func (ar *AzureQuotaError) IsQuotaError() bool {
149 func WrapAzureError(err error) error {
150 de, ok := err.(autorest.DetailedError)
154 rq, ok := de.Original.(*azure.RequestError)
158 if rq.Response == nil {
161 if rq.Response.StatusCode == 429 || len(rq.Response.Header["Retry-After"]) >= 1 {
163 ra := rq.Response.Header["Retry-After"][0]
164 earliestRetry, parseErr := http.ParseTime(ra)
166 // Could not parse as a timestamp, must be number of seconds
167 dur, parseErr := strconv.ParseInt(ra, 10, 64)
169 earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
173 // Couldn't make sense of retry-after,
174 // so set retry to 20 seconds
175 earliestRetry = time.Now().Add(20 * time.Second)
177 return &AzureRateLimitError{*rq, earliestRetry}
179 if rq.ServiceError == nil {
182 if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
183 return &AzureQuotaError{*rq}
188 type AzureInstanceSet struct {
189 azconfig AzureInstanceSetConfig
190 vmClient VirtualMachinesClientWrapper
191 netClient InterfacesClientWrapper
192 storageAcctClient storageacct.AccountsClient
193 azureEnv azure.Environment
194 interfaces map[string]network.Interface
199 func NewAzureInstanceSet(config map[string]interface{}, dispatcherID string) (prv InstanceProvider, err error) {
200 azcfg := AzureInstanceSetConfig{}
201 err = mapstructure.Decode(config, &azcfg)
205 ap := AzureInstanceSet{}
206 err = ap.setup(azcfg, dispatcherID)
213 func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
215 vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
216 netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
217 storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
219 az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnv)
224 authorizer, err := auth.ClientCredentialsConfig{
225 ClientID: az.azconfig.ClientID,
226 ClientSecret: az.azconfig.ClientSecret,
227 TenantID: az.azconfig.TenantID,
228 Resource: az.azureEnv.ResourceManagerEndpoint,
229 AADEndpoint: az.azureEnv.ActiveDirectoryEndpoint,
235 vmClient.Authorizer = authorizer
236 netClient.Authorizer = authorizer
237 storageAcctClient.Authorizer = authorizer
239 az.vmClient = &VirtualMachinesClientImpl{vmClient}
240 az.netClient = &InterfacesClientImpl{netClient}
241 az.storageAcctClient = storageAcctClient
243 az.dispatcherID = dispatcherID
244 az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
249 func (az *AzureInstanceSet) Create(ctx context.Context,
250 instanceType arvados.InstanceType,
252 newTags InstanceTags,
253 publicKey ssh.PublicKey) (Instance, error) {
255 if len(newTags["node-token"]) == 0 {
256 return nil, fmt.Errorf("Must provide tag 'node-token'")
259 name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
264 name = az.namePrefix + name
265 log.Printf("name is %v", name)
267 timestamp := time.Now().Format(time.RFC3339Nano)
269 tags := make(map[string]*string)
270 tags["created-at"] = ×tamp
271 for k, v := range newTags {
273 tags["dispatch-"+k] = &newstr
276 tags["dispatch-instance-type"] = &instanceType.Name
278 nicParameters := network.Interface{
279 Location: &az.azconfig.Location,
281 InterfacePropertiesFormat: &network.InterfacePropertiesFormat{
282 IPConfigurations: &[]network.InterfaceIPConfiguration{
283 network.InterfaceIPConfiguration{
284 Name: to.StringPtr("ip1"),
285 InterfaceIPConfigurationPropertiesFormat: &network.InterfaceIPConfigurationPropertiesFormat{
286 Subnet: &network.Subnet{
287 ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
288 "/Microsoft.Network/virtualnetworks/%s/subnets/%s",
289 az.azconfig.SubscriptionID,
290 az.azconfig.ResourceGroup,
292 az.azconfig.Subnet)),
294 PrivateIPAllocationMethod: network.Dynamic,
300 nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
302 return nil, WrapAzureError(err)
305 log.Printf("Created NIC %v", *nic.ID)
307 instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
308 az.azconfig.StorageAccount,
309 az.azureEnv.StorageEndpointSuffix,
310 az.azconfig.BlobContainer,
313 log.Printf("URI instance vhd %v", instance_vhd)
315 customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
316 echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
318 vmParameters := compute.VirtualMachine{
319 Location: &az.azconfig.Location,
321 VirtualMachineProperties: &compute.VirtualMachineProperties{
322 HardwareProfile: &compute.HardwareProfile{
323 VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
325 StorageProfile: &compute.StorageProfile{
326 OsDisk: &compute.OSDisk{
327 OsType: compute.Linux,
328 Name: to.StringPtr(name + "-os"),
329 CreateOption: compute.FromImage,
330 Image: &compute.VirtualHardDisk{
331 URI: to.StringPtr(string(imageId)),
333 Vhd: &compute.VirtualHardDisk{
338 NetworkProfile: &compute.NetworkProfile{
339 NetworkInterfaces: &[]compute.NetworkInterfaceReference{
340 compute.NetworkInterfaceReference{
342 NetworkInterfaceReferenceProperties: &compute.NetworkInterfaceReferenceProperties{
343 Primary: to.BoolPtr(true),
348 OsProfile: &compute.OSProfile{
350 AdminUsername: to.StringPtr("crunch"),
351 LinuxConfiguration: &compute.LinuxConfiguration{
352 DisablePasswordAuthentication: to.BoolPtr(true),
353 SSH: &compute.SSHConfiguration{
354 PublicKeys: &[]compute.SSHPublicKey{
355 compute.SSHPublicKey{
356 Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
357 KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
362 CustomData: &customData,
367 vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
369 return nil, WrapAzureError(err)
372 return &AzureInstance{
379 func (az *AzureInstanceSet) Instances(ctx context.Context) ([]Instance, error) {
380 interfaces, err := az.ManageNics(ctx)
385 result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
387 return nil, WrapAzureError(err)
390 instances := make([]Instance, 0)
392 for ; result.NotDone(); err = result.Next() {
394 return nil, WrapAzureError(err)
396 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
397 instances = append(instances, &AzureInstance{
400 nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
403 return instances, nil
406 func (az *AzureInstanceSet) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
407 result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
409 return nil, WrapAzureError(err)
412 interfaces := make(map[string]network.Interface)
414 timestamp := time.Now()
415 wg := sync.WaitGroup{}
416 deletechannel := make(chan string, 20)
421 for i := 0; i < 4; i += 1 {
424 nicname, ok := <-deletechannel
428 _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
430 log.Printf("Error deleting %v: %v", nicname, delerr)
432 log.Printf("Deleted %v", nicname)
439 for ; result.NotDone(); err = result.Next() {
441 log.Printf("Error listing nics: %v", err)
442 return interfaces, nil
444 if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
445 if result.Value().VirtualMachine != nil {
446 interfaces[*result.Value().ID] = result.Value()
448 if result.Value().Tags["created-at"] != nil {
449 created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
451 //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
452 if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
453 log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
455 deletechannel <- *result.Value().Name
462 return interfaces, nil
465 func (az *AzureInstanceSet) ManageBlobs(ctx context.Context) {
466 result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
468 log.Printf("Couldn't get account keys %v", err)
472 key1 := *(*result.Keys)[0].Value
473 client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
475 log.Printf("Couldn't make client %v", err)
479 blobsvc := client.GetBlobService()
480 blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
482 timestamp := time.Now()
483 wg := sync.WaitGroup{}
484 deletechannel := make(chan storage.Blob, 20)
489 for i := 0; i < 4; i += 1 {
492 blob, ok := <-deletechannel
496 err := blob.Delete(nil)
498 log.Printf("error deleting %v: %v", blob.Name, err)
500 log.Printf("Deleted blob %v", blob.Name)
507 page := storage.ListBlobsParameters{Prefix: az.namePrefix}
510 response, err := blobcont.ListBlobs(page)
512 log.Printf("Error listing blobs %v", err)
515 for _, b := range response.Blobs {
516 age := timestamp.Sub(time.Time(b.Properties.LastModified))
517 if b.Properties.BlobType == storage.BlobTypePage &&
518 b.Properties.LeaseState == "available" &&
519 b.Properties.LeaseStatus == "unlocked" &&
520 age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
522 log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
527 if response.NextMarker != "" {
528 page.Marker = response.NextMarker
535 func (az *AzureInstanceSet) Stop() {
538 type AzureInstance struct {
539 provider *AzureInstanceSet
540 nic network.Interface
541 vm compute.VirtualMachine
544 func (ai *AzureInstance) ID() InstanceID {
545 return InstanceID(*ai.vm.ID)
548 func (ai *AzureInstance) String() string {
552 func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
553 tags := make(map[string]*string)
555 for k, v := range ai.vm.Tags {
556 if !strings.HasPrefix(k, "dispatch-") {
560 for k, v := range newTags {
562 tags["dispatch-"+k] = &newstr
565 vmParameters := compute.VirtualMachine{
566 Location: &ai.provider.azconfig.Location,
569 vm, err := ai.provider.vmClient.CreateOrUpdate(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
571 return WrapAzureError(err)
578 func (ai *AzureInstance) Tags(ctx context.Context) (InstanceTags, error) {
579 tags := make(map[string]string)
581 for k, v := range ai.vm.Tags {
582 if strings.HasPrefix(k, "dispatch-") {
590 func (ai *AzureInstance) Destroy(ctx context.Context) error {
591 _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
592 return WrapAzureError(err)
595 func (ai *AzureInstance) Address() string {
596 return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
599 func (ai *AzureInstance) VerifyPublicKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
600 remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
602 tags, _ := ai.Tags(ctx)
604 tg := tags["ssh-pubkey-fingerprint"]
606 if remoteFingerprint == tg {
609 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
613 nodetokenTag := tags["node-token"]
614 if nodetokenTag == "" {
615 return fmt.Errorf("Missing node token tag")
618 sess, err := client.NewSession()
623 nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
628 nodetoken := strings.TrimSpace(string(nodetokenbytes))
630 expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
631 log.Printf("%q %q", nodetoken, expectedToken)
633 if strings.TrimSpace(nodetoken) != expectedToken {
634 return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
637 sess, err = client.NewSession()
642 keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
647 sp := strings.Split(string(keyfingerprintbytes), " ")
649 log.Printf("%q %q", remoteFingerprint, sp[1])
651 if remoteFingerprint != sp[1] {
652 return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
655 tags["ssh-pubkey-fingerprint"] = sp[1]
656 delete(tags, "node-token")
657 ai.SetTags(ctx, tags)