import (
"context"
"encoding/base64"
+ "encoding/json"
"fmt"
"net/http"
"regexp"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/Azure/go-autorest/autorest/to"
"github.com/jmcvetta/randutil"
- "github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)
-type AzureInstanceSetConfig struct {
- SubscriptionID string "SubscriptionID"
- ClientID string "ClientID"
- ClientSecret string "ClientSecret"
- TenantID string "TenantID"
- CloudEnvironment string "CloudEnvironment"
- ResourceGroup string "ResourceGroup"
- Location string "Location"
- Network string "Network"
- Subnet string "Subnet"
- StorageAccount string "StorageAccount"
- BlobContainer string "BlobContainer"
- DeleteDanglingResourcesAfter float64 "DeleteDanglingResourcesAfter"
-}
-
-type VirtualMachinesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+// Driver is the azure implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newAzureInstanceSet)
+
+type azureInstanceSetConfig struct {
+ SubscriptionID string
+ ClientID string
+ ClientSecret string
+ TenantID string
+ CloudEnvironment string
+ ResourceGroup string
+ Location string
+ Network string
+ Subnet string
+ StorageAccount string
+ BlobContainer string
+ DeleteDanglingResourcesAfter arvados.Duration
+}
+
+type virtualMachinesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error)
- Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error)
}
-type VirtualMachinesClientImpl struct {
+type virtualMachinesClientImpl struct {
inner compute.VirtualMachinesClient
}
-func (cl *VirtualMachinesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *virtualMachinesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, VMName, parameters)
if err != nil {
- return compute.VirtualMachine{}, WrapAzureError(err)
+ return compute.VirtualMachine{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *virtualMachinesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *VirtualMachinesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
+func (cl *virtualMachinesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result compute.VirtualMachineListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-type InterfacesClientWrapper interface {
- CreateOrUpdate(ctx context.Context,
+type interfacesClientWrapper interface {
+ createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error)
- Delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
- ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
+ delete(ctx context.Context, resourceGroupName string, networkInterfaceName string) (result *http.Response, err error)
+ listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error)
}
-type InterfacesClientImpl struct {
+type interfacesClientImpl struct {
inner network.InterfacesClient
}
-func (cl *InterfacesClientImpl) Delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
+func (cl *interfacesClientImpl) delete(ctx context.Context, resourceGroupName string, VMName string) (result *http.Response, err error) {
future, err := cl.inner.Delete(ctx, resourceGroupName, VMName)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
err = future.WaitForCompletionRef(ctx, cl.inner.Client)
- return future.Response(), WrapAzureError(err)
+ return future.Response(), wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) CreateOrUpdate(ctx context.Context,
+func (cl *interfacesClientImpl) createOrUpdate(ctx context.Context,
resourceGroupName string,
networkInterfaceName string,
parameters network.Interface) (result network.Interface, err error) {
future, err := cl.inner.CreateOrUpdate(ctx, resourceGroupName, networkInterfaceName, parameters)
if err != nil {
- return network.Interface{}, WrapAzureError(err)
+ return network.Interface{}, wrapAzureError(err)
}
future.WaitForCompletionRef(ctx, cl.inner.Client)
r, err := future.Result(cl.inner)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
-func (cl *InterfacesClientImpl) ListComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
+func (cl *interfacesClientImpl) listComplete(ctx context.Context, resourceGroupName string) (result network.InterfaceListResultIterator, err error) {
r, err := cl.inner.ListComplete(ctx, resourceGroupName)
- return r, WrapAzureError(err)
+ return r, wrapAzureError(err)
}
var quotaRe = regexp.MustCompile(`(?i:exceed|quota|limit)`)
-type AzureRateLimitError struct {
+type azureRateLimitError struct {
azure.RequestError
- earliestRetry time.Time
+ firstRetry time.Time
}
-func (ar *AzureRateLimitError) EarliestRetry() time.Time {
- return ar.earliestRetry
+func (ar *azureRateLimitError) EarliestRetry() time.Time {
+ return ar.firstRetry
}
-type AzureQuotaError struct {
+type azureQuotaError struct {
azure.RequestError
}
-func (ar *AzureQuotaError) IsQuotaError() bool {
+func (ar *azureQuotaError) IsQuotaError() bool {
return true
}
-func WrapAzureError(err error) error {
+func wrapAzureError(err error) error {
de, ok := err.(autorest.DetailedError)
if !ok {
return err
earliestRetry = time.Now().Add(20 * time.Second)
}
}
- return &AzureRateLimitError{*rq, earliestRetry}
+ return &azureRateLimitError{*rq, earliestRetry}
}
if rq.ServiceError == nil {
return err
}
if quotaRe.FindString(rq.ServiceError.Code) != "" || quotaRe.FindString(rq.ServiceError.Message) != "" {
- return &AzureQuotaError{*rq}
+ return &azureQuotaError{*rq}
}
return err
}
-type AzureInstanceSet struct {
- azconfig AzureInstanceSetConfig
- vmClient VirtualMachinesClientWrapper
- netClient InterfacesClientWrapper
+type azureInstanceSet struct {
+ azconfig azureInstanceSetConfig
+ vmClient virtualMachinesClientWrapper
+ netClient interfacesClientWrapper
storageAcctClient storageacct.AccountsClient
azureEnv azure.Environment
interfaces map[string]network.Interface
logger logrus.FieldLogger
}
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
- azcfg := AzureInstanceSetConfig{}
- if err = mapstructure.Decode(config, &azcfg); err != nil {
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+ azcfg := azureInstanceSetConfig{}
+ err = json.Unmarshal(config, &azcfg)
+ if err != nil {
return nil, err
}
- ap := AzureInstanceSet{logger: logger}
+
+ ap := azureInstanceSet{logger: logger}
err = ap.setup(azcfg, string(dispatcherID))
if err != nil {
return nil, err
return &ap, nil
}
-func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID string) (err error) {
+func (az *azureInstanceSet) setup(azcfg azureInstanceSetConfig, dispatcherID string) (err error) {
az.azconfig = azcfg
vmClient := compute.NewVirtualMachinesClient(az.azconfig.SubscriptionID)
netClient := network.NewInterfacesClient(az.azconfig.SubscriptionID)
netClient.Authorizer = authorizer
storageAcctClient.Authorizer = authorizer
- az.vmClient = &VirtualMachinesClientImpl{vmClient}
- az.netClient = &InterfacesClientImpl{netClient}
+ az.vmClient = &virtualMachinesClientImpl{vmClient}
+ az.netClient = &interfacesClientImpl{netClient}
az.storageAcctClient = storageAcctClient
az.dispatcherID = dispatcherID
tk.Stop()
return
case <-tk.C:
- az.ManageBlobs()
+ az.manageBlobs()
}
}
}()
if !ok {
return
}
- _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+ _, delerr := az.netClient.delete(context.Background(), az.azconfig.ResourceGroup, nicname)
if delerr != nil {
az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
} else {
return nil
}
-func (az *AzureInstanceSet) Create(
+func (az *azureInstanceSet) Create(
instanceType arvados.InstanceType,
imageID cloud.ImageID,
newTags cloud.InstanceTags,
},
},
}
- nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
+ nic, err := az.netClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
instanceVhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
},
}
- vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
+ vm, err := az.vmClient.createOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
- return &AzureInstance{
+ return &azureInstance{
provider: az,
nic: nic,
vm: vm,
}, nil
}
-func (az *AzureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+func (az *azureInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- interfaces, err := az.ManageNics()
+ interfaces, err := az.manageNics()
if err != nil {
return nil, err
}
- result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.vmClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
instances := make([]cloud.Instance, 0)
for ; result.NotDone(); err = result.Next() {
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &AzureInstance{
+ instances = append(instances, &azureInstance{
provider: az,
vm: result.Value(),
nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
// not associated with a virtual machine and have a "create-at" time
// more than DeleteDanglingResourcesAfter (to prevent racing and
// deleting newly created NICs) in the past are deleted.
-func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
+func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
+ result, err := az.netClient.listComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
- return nil, WrapAzureError(err)
+ return nil, wrapAzureError(err)
}
interfaces := make(map[string]network.Interface)
if result.Value().Tags["created-at"] != nil {
createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+ if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
az.deleteNIC <- *result.Value().Name
}
// have "namePrefix", are "available" (which means they are not
// leased to a VM) and haven't been modified for
// DeleteDanglingResourcesAfter seconds.
-func (az *AzureInstanceSet) ManageBlobs() {
+func (az *azureInstanceSet) manageBlobs() {
result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
if err != nil {
az.logger.WithError(err).Warn("Couldn't get account keys")
if b.Properties.BlobType == storage.BlobTypePage &&
b.Properties.LeaseState == "available" &&
b.Properties.LeaseStatus == "unlocked" &&
- age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
+ age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
az.deleteBlob <- b
}
}
-func (az *AzureInstanceSet) Stop() {
+func (az *azureInstanceSet) Stop() {
az.stopFunc()
az.stopWg.Wait()
close(az.deleteNIC)
close(az.deleteBlob)
}
-type AzureInstance struct {
- provider *AzureInstanceSet
+type azureInstance struct {
+ provider *azureInstanceSet
nic network.Interface
vm compute.VirtualMachine
}
-func (ai *AzureInstance) ID() cloud.InstanceID {
+func (ai *azureInstance) ID() cloud.InstanceID {
return cloud.InstanceID(*ai.vm.ID)
}
-func (ai *AzureInstance) String() string {
+func (ai *azureInstance) String() string {
return *ai.vm.Name
}
-func (ai *AzureInstance) ProviderType() string {
+func (ai *azureInstance) ProviderType() string {
return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
}
-func (ai *AzureInstance) SetTags(newTags cloud.InstanceTags) error {
+func (ai *azureInstance) SetTags(newTags cloud.InstanceTags) error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
Location: &ai.provider.azconfig.Location,
Tags: tags,
}
- vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
+ vm, err := ai.provider.vmClient.createOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
if err != nil {
- return WrapAzureError(err)
+ return wrapAzureError(err)
}
ai.vm = vm
return nil
}
-func (ai *AzureInstance) Tags() cloud.InstanceTags {
+func (ai *azureInstance) Tags() cloud.InstanceTags {
tags := make(map[string]string)
for k, v := range ai.vm.Tags {
return tags
}
-func (ai *AzureInstance) Destroy() error {
+func (ai *azureInstance) Destroy() error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
- return WrapAzureError(err)
+ _, err := ai.provider.vmClient.delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
+ return wrapAzureError(err)
}
-func (ai *AzureInstance) Address() string {
+func (ai *azureInstance) Address() string {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
-func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
+func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
if tg != "" {
if remoteFingerprint == tg {
return nil
- } else {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
}
+ return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
}
nodetokenTag := tags["node-token"]