import (
"context"
"encoding/base64"
+ "encoding/json"
"fmt"
"net/http"
"regexp"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-06-01/compute"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-06-01/network"
storageacct "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-02-01/storage"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
- "github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
-type AzureInstanceSetConfig struct {
+// Driver is the azure implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newAzureInstanceSet)
+
+type azureInstanceSetConfig struct {
SubscriptionID string
ClientID string
ClientSecret string
TenantID string
CloudEnvironment string
ResourceGroup string
+ ImageResourceGroup string
Location string
Network string
+ NetworkResourceGroup string
Subnet string
StorageAccount string
BlobContainer string
DeleteDanglingResourcesAfter arvados.Duration
+ AdminUsername string
+}
+
+type containerWrapper interface {
+ GetBlobReference(name string) *storage.Blob
+ ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
}
-type VirtualMachinesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+type virtualMachinesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
- Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
}
-type VirtualMachinesClientImpl struct {
+type virtualMachinesClientImpl struct {
inner compute.VirtualMachinesClient
}
-func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
if err != nil {
- return compute.VirtualMachine{}, WrapAzureError(err)
+ return compute.VirtualMachine{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-type InterfacesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+type interfacesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error)
- Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
}
-type InterfacesClientImpl struct {
+type interfacesClientImpl struct {
inner network.InterfacesClient
}
-func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
if err != nil {
- return network.Interface{}, WrapAzureError(err)
+ return network.Interface{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
+}
+
+type disksClientWrapper interface {
+ listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error)
+ delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error)
+}
+
+type disksClientImpl struct {
+ inner compute.DisksClient
+}
+
+func (cl *disksClientImpl) listByResourceGroup(ctx context.Context, resourceGroupName string) (result compute.DiskListPage, err error) {
+ r, err := cl.inner.ListByResourceGroup(ctx, resourceGroupName)
+ return r, wrapAzureError(err)
+}
+
+func (cl *disksClientImpl) delete(ctx context.Context, resourceGroupName string, diskName string) (result compute.DisksDeleteFuture, err error) {
+ r, err := cl.inner.Delete(ctx, resourceGroupName, diskName)
+ return r, wrapAzureError(err)
}
var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
-type AzureRateLimitError struct {
+type azureRateLimitError struct {
azure.RequestError
- earliestRetry time.Time
+ firstRetry time.Time
}
-func (ar *AzureRateLimitError) EarliestRetry() time.Time {
- return ar.earliestRetry
+func (ar *azureRateLimitError) EarliestRetry() time.Time {
+ return ar.firstRetry
}
-type AzureQuotaError struct {
+type azureQuotaError struct {
azure.RequestError
}
-func (ar *AzureQuotaError) IsQuotaError() bool {
+func (ar *azureQuotaError) IsQuotaError() bool {
return true
}
-func WrapAzureError(err error) error {
+func wrapAzureError(err error) error {
de, ok := err.(autorest.DetailedError)
if !ok {
return err
earliestRetry = time.Now().Add(20 * time.Second)
}
}
- return &AzureRateLimitError{*rq, earliestRetry}
+ return &azureRateLimitError{*rq, earliestRetry}
}
if rq.ServiceError == nil {
return err
}
if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
- return &AzureQuotaError{*rq}
+ return &azureQuotaError{*rq}
}
return err
}
-type AzureInstanceSet struct {
- azconfig AzureInstanceSetConfig
- vmClient VirtualMachinesClientWrapper
- netClient InterfacesClientWrapper
- storageAcctClient storageacct.AccountsClient
- azureEnv azure.Environment
- interfaces map[string]network.Interface
- dispatcherID string
- namePrefix string
- ctx context.Context
- stopFunc context.CancelFunc
- stopWg sync.WaitGroup
- deleteNIC chan string
- deleteBlob chan storage.Blob
- logger logrus.FieldLogger
-}
-
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
- azcfg := AzureInstanceSetConfig{}
-
- decoderConfig := mapstructure.DecoderConfig{
- DecodeHook: arvados.DurationMapStructureDecodeHook(),
- Result: &azcfg}
-
- decoder, err := mapstructure.NewDecoder(&decoderConfig)
+type azureInstanceSet struct {
+ azconfig azureInstanceSetConfig
+ vmClient virtualMachinesClientWrapper
+ netClient interfacesClientWrapper
+ disksClient disksClientWrapper
+ imageResourceGroup string
+ blobcont containerWrapper
+ azureEnv azure.Environment
+ interfaces map[string]network.Interface
+ dispatcherID string
+ namePrefix string
+ ctx context.Context
+ stopFunc context.CancelFunc
+ stopWg sync.WaitGroup
+ deleteNIC chan string
+ deleteBlob chan storage.Blob
+ deleteDisk chan compute.Disk
+ logger logrus.FieldLogger
+}
+
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+ azcfg := azureInstanceSetConfig{}
+ err = json.Unmarshal(config, &azcfg)
if err != nil {
return nil, err
}
- if err = decoder.Decode(config); err != nil {
- return nil, err
- }
- ap := AzureInstanceSet{logger: logger}
- err = ap.setup(azcfg, string(dispatcherID))
+ az := azureInstanceSet{logger: logger}
+ az.ctx, az.stopFunc = context.WithCancel(context.Background())
+ err = az.setup(azcfg, string(dispatcherID))
if err != nil {
+ az.stopFunc()
return nil, err
}
- return &ap, nil
+ return &az, nil
}
-func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
+func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
az.azconfig = azcfg
vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
+ disksClient := compute.NewDisksClient(az.azconfig.SubscriptionID)
storageAcctClient := storageacct.NewAccountsClient(az.azconfig.SubscriptionID)
az.azureEnv, err = azure.EnvironmentFromName(az.azconfig.CloudEnvironment)
vmClient.Authorizer = authorizer
netClient.Authorizer = authorizer
+ disksClient.Authorizer = authorizer
storageAcctClient.Authorizer = authorizer
- az.vmClient = &VirtualMachinesClientImpl{vmClient}
- az.netClient = &InterfacesClientImpl{netClient}
- az.storageAcctClient = storageAcctClient
+ az.vmClient = &virtualMachinesClientImpl{vmClient}
+ az.netClient = &interfacesClientImpl{netClient}
+ az.disksClient = &disksClientImpl{disksClient}
+
+ az.imageResourceGroup = az.azconfig.ImageResourceGroup
+ if az.imageResourceGroup == "" {
+ az.imageResourceGroup = az.azconfig.ResourceGroup
+ }
+
+ var client storage.Client
+ if az.azconfig.StorageAccount != "" {
+ result, err := storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't get account keys")
+ return err
+ }
+
+ key1 := *(*result.Keys)[0].Value
+ client, err = storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
+ if err != nil {
+ az.logger.WithError(err).Warn("Couldn't make client")
+ return err
+ }
+
+ blobsvc := client.GetBlobService()
+ az.blobcont = blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+ }
az.dispatcherID = dispatcherID
az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
- az.ctx, az.stopFunc = context.WithCancel(context.Background())
go func() {
az.stopWg.Add(1)
defer az.stopWg.Done()
tk.Stop()
return
case <-tk.C:
- az.ManageBlobs()
+ if az.blobcont != nil {
+ az.manageBlobs()
+ }
+ az.manageDisks()
}
}
}()
az.deleteNIC = make(chan string)
az.deleteBlob = make(chan storage.Blob)
+ az.deleteDisk = make(chan compute.Disk)
for i := 0; i < 4; i++ {
go func() {
if !ok {
return
}
- _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+ _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
if delerr != nil {
az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
} else {
}
}
}()
+ go func() {
+ for {
+ disk, ok := <-az.deleteDisk
+ if !ok {
+ return
+ }
+ _, err := az.disksClient.delete(az.ctx, az.imageResourceGroup, *disk.Name)
+ if err != nil {
+ az.logger.WithError(err).Warnf("Error deleting disk %+v", *disk.Name)
+ } else {
+ az.logger.Printf("Deleted disk %v", *disk.Name)
+ }
+ }
+ }()
}
return nil
}
-func (az *AzureInstanceSet) Create(
+func (az *azureInstanceSet) Create(
instanceType arvados.InstanceType,
imageID cloud.ImageID,
newTags cloud.InstanceTags,
+ initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- if len(newTags["node-token"]) == 0 {
- return nil, fmt.Errorf("Must provide tag 'node-token'")
+ if instanceType.AddedScratch > 0 {
+ return nil, fmt.Errorf("cannot create instance type %q: driver does not implement non-zero AddedScratch (%d)", instanceType.Name, instanceType.AddedScratch)
}
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
name = az.namePrefix + name
- timestamp := time.Now().Format(time.RFC3339Nano)
-
- tags := make(map[string]*string)
- tags["created-at"] = ×tamp
+ tags := map[string]*string{}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
}
+ tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
- tags["dispatch-instance-type"] = &instanceType.Name
+ networkResourceGroup := az.azconfig.NetworkResourceGroup
+ if networkResourceGroup == "" {
+ networkResourceGroup = az.azconfig.ResourceGroup
+ }
nicParameters := network.Interface{
Location: &az.azconfig.Location,
ID: to.StringPtr(fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers"+
"/Microsoft.Network/virtualnetworks/%s/subnets/%s",
az.azconfig.SubscriptionID,
- az.azconfig.ResourceGroup,
+ networkResourceGroup,
az.azconfig.Network,
az.azconfig.Subnet)),
},
},
},
}
- nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
+ nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
if err != nil {
- return nil, WrapAzureError(err)
- }
-
- instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
- az.azconfig.StorageAccount,
- az.azureEnv.StorageEndpointSuffix,
- az.azconfig.BlobContainer,
- name)
+ return nil, wrapAzureError(err)
+ }
+
+ blobname := fmt.Sprintf("%s-os.vhd", name)
+ customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
+ var storageProfile *compute.StorageProfile
+
+ re := regexp.MustCompile(`^http(s?)://`)
+ if re.MatchString(string(imageID)) {
+ instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s",
+ az.azconfig.StorageAccount,
+ az.azureEnv.StorageEndpointSuffix,
+ az.azconfig.BlobContainer,
+ blobname)
+ az.logger.Info("using deprecated VHD image")
+ storageProfile = &compute.StorageProfile{
+ OsDisk: &compute.OSDisk{
+ OsType: compute.Linux,
+ Name: to.StringPtr(name + "-os"),
+ CreateOption: compute.DiskCreateOptionTypesFromImage,
+ Image: &compute.VirtualHardDisk{
+ URI: to.StringPtr(string(imageID)),
+ },
+ Vhd: &compute.VirtualHardDisk{
+ URI: &instanceVhd,
+ },
+ },
+ }
+ } else {
+ az.logger.Info("using managed image")
- customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+ storageProfile = &compute.StorageProfile{
+ ImageReference: &compute.ImageReference{
+ ID: to.StringPtr("/subscriptions/" + az.azconfig.SubscriptionID + "/resourceGroups/" + az.imageResourceGroup + "/providers/Microsoft.Compute/images/" + string(imageID)),
+ },
+ OsDisk: &compute.OSDisk{
+ OsType: compute.Linux,
+ Name: to.StringPtr(name + "-os"),
+ CreateOption: compute.DiskCreateOptionTypesFromImage,
+ },
+ }
+ }
vmParameters := compute.VirtualMachine{
Location: &az.azconfig.Location,
HardwareProfile: &compute.HardwareProfile{
VMSize: compute.VirtualMachineSizeTypes(instanceType.ProviderType),
},
- StorageProfile: &compute.StorageProfile{
- OsDisk: &compute.OSDisk{
- OsType: compute.Linux,
- Name: to.StringPtr(name + "-os"),
- CreateOption: compute.FromImage,
- Image: &compute.VirtualHardDisk{
- URI: to.StringPtr(string(imageID)),
- },
- Vhd: &compute.VirtualHardDisk{
- URI: &instanceVhd,
- },
- },
- },
+ StorageProfile: storageProfile,
NetworkProfile: &compute.NetworkProfile{
NetworkInterfaces: &[]compute.NetworkInterfaceReference{
compute.NetworkInterfaceReference{
},
OsProfile: &compute.OSProfile{
ComputerName: &name,
- AdminUsername: to.StringPtr("crunch"),
+ AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
SSH: &compute.SSHConfiguration{
PublicKeys: &[]compute.SSHPublicKey{
- compute.SSHPublicKey{
- Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
},
},
},
}
- vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
+ vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
- return nil, WrapAzureError(err)
+ if az.blobcont != nil {
+ _, delerr := az.blobcont.GetBlobReference(blobname).DeleteIfExists(nil)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up vhd blob after failed create")
+ }
+ }
+
+ _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, *nic.Name)
+ if delerr != nil {
+ az.logger.WithError(delerr).Warnf("Error cleaning up NIC after failed create")
+ }
+
+ return nil, wrapAzureError(err)
}
- return &AzureInstance{
+ return &azureInstance{
provider: az,
nic: nic,
vm: vm,
}, nil
}
-func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- interfaces, err := az.ManageNics()
+ interfaces, err := az.manageNics()
if err != nil {
return nil, err
}
- result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
- instances := make([]cloud.Instance, 0)
-
+ var instances []cloud.Instance
for ; result.NotDone(); err = result.Next() {
if err != nil {
- return nil, WrapAzureError(err)
- }
- if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &AzureInstance{
- provider: az,
- vm: result.Value(),
- nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
+ return nil, wrapAzureError(err)
}
+ instances = append(instances, &azureInstance{
+ provider: az,
+ vm: result.Value(),
+ nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+ })
}
return instances, nil
}
// ManageNics returns a list of Azure network interface resources.
-// Also performs garbage collection of NICs which have "namePrefix", are
-// not associated with a virtual machine and have a "create-at" time
-// more than DeleteDanglingResourcesAfter (to prevent racing and
+// Also performs garbage collection of NICs which have "namePrefix",
+// are not associated with a virtual machine and have a "created-at"
+// time more than DeleteDanglingResourcesAfter (to prevent racing and
// deleting newly created NICs) in the past are deleted.
-func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
+func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
interfaces := make(map[string]network.Interface)
if result.Value().Tags["created-at"] != nil {
createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
- az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+ if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+ az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
az.deleteNIC <- *result.Value().Name
}
}
// have "namePrefix", are "available" (which means they are not
// leased to a VM) and haven't been modified for
// DeleteDanglingResourcesAfter seconds.
-func (az *AzureInstanceSet) ManageBlobs() {
- result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't get account keys")
- return
- }
-
- key1 := *(*result.Keys)[0].Value
- client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
- if err != nil {
- az.logger.WithError(err).Warn("Couldn't make client")
- return
- }
-
- blobsvc := client.GetBlobService()
- blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
+func (az *azureInstanceSet) manageBlobs() {
page := storage.ListBlobsParameters{Prefix: az.namePrefix}
timestamp := time.Now()
for {
- response, err := blobcont.ListBlobs(page)
+ response, err := az.blobcont.ListBlobs(page)
if err != nil {
az.logger.WithError(err).Warn("Error listing blobs")
return
}
}
-func (az *AzureInstanceSet) Stop() {
+// ManageDisks garbage collects managed compute disks (VM disk images) in the
+// configured resource group. It will delete disks which
+// have "namePrefix", are "available" (which means they are not
+// leased to a VM) and were created more than DeleteDanglingResourcesAfter seconds ago.
+// (Azure provides no modification timestamp on managed disks, there is only a
+// creation timestamp)
+func (az *azureInstanceSet) manageDisks() {
+
+ re := regexp.MustCompile(`^` + az.namePrefix + `.*-os$`)
+ timestamp := time.Now()
+
+ for {
+ response, err := az.disksClient.listByResourceGroup(az.ctx, az.imageResourceGroup)
+ if err != nil {
+ az.logger.WithError(err).Warn("Error listing disks")
+ return
+ }
+ for _, d := range response.Values() {
+ age := timestamp.Sub(d.DiskProperties.TimeCreated.ToTime())
+ if d.DiskProperties.DiskState == "Unattached" &&
+ re.MatchString(*d.Name) &&
+ age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
+
+ az.logger.Printf("Disk %v is unlocked and not modified for %v seconds, will delete", *d.Name, age.Seconds())
+ az.deleteDisk <- d
+ }
+ }
+ if response.Values() != nil {
+ response.Next()
+ } else {
+ break
+ }
+ }
+}
+
+func (az *azureInstanceSet) Stop() {
az.stopFunc()
az.stopWg.Wait()
close(az.deleteNIC)
close(az.deleteBlob)
}
-type AzureInstance struct {
- provider *AzureInstanceSet
+type azureInstance struct {
+ provider *azureInstanceSet
nic network.Interface
vm compute.VirtualMachine
}
-func (ai *AzureInstance) ID() cloud.InstanceID {
+func (ai *azureInstance) ID() cloud.InstanceID {
return cloud.InstanceID(*ai.vm.ID)
}
-func (ai *AzureInstance) String() string {
+func (ai *azureInstance) String() string {
return *ai.vm.Name
}
-func (ai *AzureInstance) ProviderType() string {
+func (ai *azureInstance) ProviderType() string {
return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
}
-func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
+func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- tags := make(map[string]*string)
-
+ tags := map[string]*string{}
for k, v := range ai.vm.Tags {
- if !strings.HasPrefix(k, "dispatch-") {
- tags[k] = v
- }
+ tags[k] = v
}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
}
vmParameters := compute.VirtualMachine{
Location: &ai.provider.azconfig.Location,
Tags: tags,
}
- vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
+ vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
if err != nil {
- return WrapAzureError(err)
+ return wrapAzureError(err)
}
ai.vm = vm
return nil
}
-func (ai *AzureInstance) Tags() cloud.InstanceTags {
- tags := make(map[string]string)
-
+func (ai *azureInstance) Tags() cloud.InstanceTags {
+ tags := cloud.InstanceTags{}
for k, v := range ai.vm.Tags {
- if strings.HasPrefix(k, "dispatch-") {
- tags[k[9:]] = *v
- }
+ tags[k] = *v
}
-
return tags
}
-func (ai *AzureInstance) Destroy() error {
+func (ai *azureInstance) Destroy() error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
- return WrapAzureError(err)
+ _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
+ return wrapAzureError(err)
}
-func (ai *AzureInstance) Address() string {
- return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
-}
-
-func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
- ai.provider.stopWg.Add(1)
- defer ai.provider.stopWg.Done()
-
- remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
- tags := ai.Tags()
-
- tg := tags["ssh-pubkey-fingerprint"]
- if tg != "" {
- if remoteFingerprint == tg {
- return nil
- } else {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
- }
- }
-
- nodetokenTag := tags["node-token"]
- if nodetokenTag == "" {
- return fmt.Errorf("Missing node token tag")
- }
-
- sess, err := client.NewSession()
- if err != nil {
- return err
- }
-
- nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
- if err != nil {
- return err
- }
-
- nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
- expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
- if strings.TrimSpace(nodetoken) != expectedToken {
- return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
- }
-
- sess, err = client.NewSession()
- if err != nil {
- return err
- }
-
- keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
- if err != nil {
- return err
+func (ai *azureInstance) Address() string {
+ if iprops := ai.nic.InterfacePropertiesFormat; iprops == nil {
+ return ""
+ } else if ipconfs := iprops.IPConfigurations; ipconfs == nil || len(*ipconfs) == 0 {
+ return ""
+ } else if ipconfprops := (*ipconfs)[0].InterfaceIPConfigurationPropertiesFormat; ipconfprops == nil {
+ return ""
+ } else if addr := ipconfprops.PrivateIPAddress; addr == nil {
+ return ""
+ } else {
+ return *addr
}
+}
- sp := strings.Split(string(keyfingerprintbytes), " ")
-
- if remoteFingerprint != sp[1] {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
- }
+func (ai *azureInstance) RemoteUser() string {
+ return ai.provider.azconfig.AdminUsername
+}
- tags["ssh-pubkey-fingerprint"] = sp[1]
- delete(tags, "node-token")
- ai.SetTags(tags)
- return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+ return cloud.ErrNotImplemented
}