netClient InterfacesClientWrapper
storageAcctClient storageacct.AccountsClient
azureEnv azure.Environment
+ interfaces map[string]network.Interface
}
func NewAzureProvider(azcfg AzureProviderConfig, arvcfg arvados.Cluster) (prv Provider, err error) {
},
}
- vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-compute", vmParameters)
+ vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
return nil, err
}
}
func (az *AzureProvider) Instances(ctx context.Context) ([]Instance, error) {
+ interfaces, err := az.ManageNics(ctx)
+ if err != nil {
+ return nil, err
+ }
+
result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
if err != nil {
return nil, err
if err != nil {
return nil, err
}
- log.Printf("%v", *result.Value().Name)
if result.Value().Tags["arvados-class"] != nil &&
(*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
instances = append(instances, &AzureInstance{
provider: az,
- vm: result.Value()})
+ vm: result.Value(),
+ nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
}
}
return instances, nil
}
-func (az *AzureProvider) DeleteDanglingNics(ctx context.Context) {
+func (az *AzureProvider) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
if err != nil {
- return
+ return nil, err
}
+ interfaces := make(map[string]network.Interface)
+
timestamp := time.Now()
wg := sync.WaitGroup{}
- defer wg.Wait()
+ deletechannel := make(chan string, 20)
+ defer func() {
+ wg.Wait()
+ close(deletechannel)
+ }()
+ for i := 0; i < 4; i += 1 {
+ go func() {
+ for {
+ nicname, ok := <-deletechannel
+ if !ok {
+ return
+ }
+ _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+ if delerr != nil {
+ log.Printf("Error deleting %v: %v", nicname, delerr)
+ } else {
+ log.Printf("Deleted %v", nicname)
+ }
+ wg.Done()
+ }
+ }()
+ }
+
for ; result.NotDone(); err = result.Next() {
if err != nil {
log.Printf("Error listing nics: %v", err)
- return
- }
- if !result.NotDone() {
- return
+ return interfaces, nil
}
if result.Value().Tags["arvados-class"] != nil &&
- (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" &&
- result.Value().VirtualMachine == nil {
-
- if result.Value().Tags["created-at"] != nil {
- created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
- if err == nil {
- log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
- if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
- log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
- wg.Add(1)
- go func(nicname string) {
- r, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
- log.Printf("%v %v", r, delerr)
- wg.Done()
- }(*result.Value().Name)
+ (*result.Value().Tags["arvados-class"]) == "crunch-dynamic-compute" {
+
+ if result.Value().VirtualMachine != nil {
+ interfaces[*result.Value().ID] = result.Value()
+ } else {
+
+ if result.Value().Tags["created-at"] != nil {
+ created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
+ if err == nil {
+ //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
+ if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+ log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+ wg.Add(1)
+ deletechannel <- *result.Value().Name
+ }
}
}
}
}
}
-
+ return interfaces, nil
}
-func (az *AzureProvider) DeleteDanglingBlobs(ctx context.Context) {
+func (az *AzureProvider) ManageBlobs(ctx context.Context) {
result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
if err != nil {
log.Printf("Couldn't get account keys %v", err)
blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
timestamp := time.Now()
+ wg := sync.WaitGroup{}
+ deletechannel := make(chan storage.Blob, 20)
+ defer func() {
+ wg.Wait()
+ close(deletechannel)
+ }()
+ for i := 0; i < 4; i += 1 {
+ go func() {
+ for {
+ blob, ok := <-deletechannel
+ if !ok {
+ return
+ }
+ err := blob.Delete(nil)
+ if err != nil {
+ log.Printf("error deleting %v: %v", blob.Name, err)
+ } else {
+ log.Printf("Deleted blob %v", blob.Name)
+ }
+ wg.Done()
+ }
+ }()
+ }
+
page := storage.ListBlobsParameters{Prefix: "compute-"}
for {
b.Properties.LeaseState == "available" &&
b.Properties.LeaseStatus == "unlocked" &&
age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+
log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
+ wg.Add(1)
+ deletechannel <- b
}
}
if response.NextMarker != "" {
}
func (ai *AzureInstance) String() string {
- return *ai.vm.ID
+ return *ai.vm.Name
}
func (ai *AzureInstance) ProviderType() string {
func (ai *AzureInstance) Destroy(ctx context.Context) error {
_, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
- // check response code
+ // check response code?
return err
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "context"
+ "flag"
+ "log"
+ "net/http"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+ "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
+ "github.com/Azure/go-autorest/autorest/to"
+ check "gopkg.in/check.v1"
+)
+
+type AzureProviderSuite struct{}
+
+var _ = check.Suite(&AzureProviderSuite{})
+
+type VirtualMachinesClientStub struct{}
+
+func (*VirtualMachinesClientStub) CreateOrUpdate(ctx context.Context,
+ resourceGroupName string,
+ VMName string,
+ parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
+ parameters.ID = &VMName
+ return parameters, nil
+}
+
+func (*VirtualMachinesClientStub) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+ return nil, nil
+}
+
+func (*VirtualMachinesClientStub) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+ return compute.VirtualMachineListResultIterator{}, nil
+}
+
+type InterfacesClientStub struct{}
+
+func (*InterfacesClientStub) CreateOrUpdate(ctx context.Context,
+ resourceGroupName string,
+ nicName string,
+ parameters network.Interface) (result network.Interface, err error) {
+ parameters.ID = to.StringPtr(nicName)
+ (*parameters.IPConfigurations)[0].PrivateIPAddress = to.StringPtr("192.168.5.5")
+ return parameters, nil
+}
+
+func (*InterfacesClientStub) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+ return nil, nil
+}
+
+func (*InterfacesClientStub) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+ return network.InterfaceListResultIterator{}, nil
+}
+
+var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
+
+func GetProvider() (Provider, ImageID, error) {
+ if *live != "" {
+ cfg := AzureProviderConfig{}
+ err := config.LoadFile(&cfg, *live)
+ if err != nil {
+ return nil, ImageID(""), err
+ }
+ ap, err := NewAzureProvider(cfg, arvados.Cluster{})
+ return ap, ImageID(cfg.Image), err
+ } else {
+ ap := AzureProvider{
+ azconfig: AzureProviderConfig{
+ BlobContainer: "vhds",
+ },
+ }
+ ap.vmClient = &VirtualMachinesClientStub{}
+ ap.netClient = &InterfacesClientStub{}
+ return &ap, ImageID("blob"), nil
+ }
+}
+
+func (*AzureProviderSuite) TestCreate(c *check.C) {
+ ap, img, err := GetProvider()
+ if err != nil {
+ c.Fatal("Error making provider", err)
+ }
+
+ inst, err := ap.Create(context.Background(),
+ arvados.InstanceType{
+ Name: "tiny",
+ ProviderType: "Standard_D1_v2",
+ VCPUs: 1,
+ RAM: 4000000000,
+ Scratch: 10000000000,
+ Price: .02,
+ Preemptible: false,
+ },
+ img,
+ []InstanceTag{"tag1"})
+
+ c.Assert(err, check.IsNil)
+
+ log.Printf("Result %v %v", inst.String(), inst.Address())
+}
+
+func (*AzureProviderSuite) TestListInstances(c *check.C) {
+ ap, _, err := GetProvider()
+ if err != nil {
+ c.Fatal("Error making provider", err)
+ }
+
+ l, err := ap.Instances(context.Background())
+
+ c.Assert(err, check.IsNil)
+
+ for _, i := range l {
+ log.Printf("%v %v", i.String(), i.Address())
+ }
+}
+
+func (*AzureProviderSuite) TestManageNics(c *check.C) {
+ ap, _, err := GetProvider()
+ if err != nil {
+ c.Fatal("Error making provider", err)
+ }
+
+ ap.(*AzureProvider).ManageNics(context.Background())
+}
+
+func (*AzureProviderSuite) TestManageBlobs(c *check.C) {
+ ap, _, err := GetProvider()
+ if err != nil {
+ c.Fatal("Error making provider", err)
+ }
+
+ ap.(*AzureProvider).ManageBlobs(context.Background())
+}
+
+func (*AzureProviderSuite) TestDestroyInstances(c *check.C) {
+ ap, _, err := GetProvider()
+ if err != nil {
+ c.Fatal("Error making provider", err)
+ }
+
+ l, err := ap.Instances(context.Background())
+ c.Assert(err, check.IsNil)
+
+ for _, i := range l {
+ c.Check(i.Destroy(context.Background()), check.IsNil)
+ }
+}