14325: Add worker state diagram.
[arvados.git] / lib / cloud / azure.go
index d9f1158fc36dded453b462e9df97fcc1b0adbd37..a194b33180b231cfb74964b11253d5aa6f8d0667 100644 (file)
@@ -8,7 +8,6 @@ import (
        "context"
        "encoding/base64"
        "fmt"
-       "log"
        "net/http"
        "regexp"
        "strconv"
@@ -27,6 +26,7 @@ import (
        "github.com/Azure/go-autorest/autorest/to"
        "github.com/jmcvetta/randutil"
        "github.com/mitchellh/mapstructure"
+       "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
 )
 
@@ -166,15 +166,14 @@ func WrapAzureError(err error) error {
                if parseErr != nil {
                        // Could not parse as a timestamp, must be number of seconds
                        dur, parseErr := strconv.ParseInt(ra, 10, 64)
-                       if parseErr != nil {
+                       if parseErr == nil {
                                earliestRetry = time.Now().Add(time.Duration(dur) * time.Second)
+                       } else {
+                               // Couldn't make sense of retry-after,
+                               // so set retry to 20 seconds
+                               earliestRetry = time.Now().Add(20 * time.Second)
                        }
                }
-               if parseErr != nil {
-                       // Couldn't make sense of retry-after,
-                       // so set retry to 20 seconds
-                       earliestRetry = time.Now().Add(20 * time.Second)
-               }
                return &AzureRateLimitError{*rq, earliestRetry}
        }
        if rq.ServiceError == nil {
@@ -200,14 +199,15 @@ type AzureInstanceSet struct {
        stopWg            sync.WaitGroup
        deleteNIC         chan string
        deleteBlob        chan storage.Blob
+       logger            logrus.FieldLogger
 }
 
-func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID) (prv InstanceSet, err error) {
+func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID, logger logrus.FieldLogger) (prv InstanceSet, err error) {
        azcfg := AzureInstanceSetConfig{}
        if err = mapstructure.Decode(config, &azcfg); err != nil {
                return nil, err
        }
-       ap := AzureInstanceSet{}
+       ap := AzureInstanceSet{logger: logger}
        err = ap.setup(azcfg, string(dispatcherID))
        if err != nil {
                return nil, err
@@ -250,10 +250,14 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
 
        az.ctx, az.stopFunc = context.WithCancel(context.Background())
        go func() {
+               az.stopWg.Add(1)
+               defer az.stopWg.Done()
+
                tk := time.NewTicker(5 * time.Minute)
                for {
                        select {
                        case <-az.ctx.Done():
+                               tk.Stop()
                                return
                        case <-tk.C:
                                az.ManageBlobs()
@@ -273,9 +277,9 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
                                }
                                _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
                                if delerr != nil {
-                                       log.Printf("Error deleting %v: %v", nicname, delerr)
+                                       az.logger.WithError(delerr).Warnf("Error deleting %v", nicname)
                                } else {
-                                       log.Printf("Deleted %v", nicname)
+                                       az.logger.Printf("Deleted NIC %v", nicname)
                                }
                        }
                }()
@@ -287,9 +291,9 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
                                }
                                err := blob.Delete(nil)
                                if err != nil {
-                                       log.Printf("error deleting %v: %v", blob.Name, err)
+                                       az.logger.WithError(err).Warnf("Error deleting %v", blob.Name)
                                } else {
-                                       log.Printf("Deleted blob %v", blob.Name)
+                                       az.logger.Printf("Deleted blob %v", blob.Name)
                                }
                        }
                }()
@@ -475,7 +479,7 @@ func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
        timestamp := time.Now()
        for ; result.NotDone(); err = result.Next() {
                if err != nil {
-                       log.Printf("Error listing nics: %v", err)
+                       az.logger.WithError(err).Warnf("Error listing nics")
                        return interfaces, nil
                }
                if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
@@ -486,7 +490,7 @@ func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
                                        created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
                                        if err == nil {
                                                if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
-                                                       log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+                                                       az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
                                                        az.deleteNIC <- *result.Value().Name
                                                }
                                        }
@@ -503,19 +507,16 @@ func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
 // leased to a VM) and haven't been modified for
 // DeleteDanglingResourcesAfter seconds.
 func (az *AzureInstanceSet) ManageBlobs() {
-       az.stopWg.Add(1)
-       defer az.stopWg.Done()
-
        result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
        if err != nil {
-               log.Printf("Couldn't get account keys %v", err)
+               az.logger.WithError(err).Warn("Couldn't get account keys")
                return
        }
 
        key1 := *(*result.Keys)[0].Value
        client, err := storage.NewBasicClientOnSovereignCloud(az.azconfig.StorageAccount, key1, az.azureEnv)
        if err != nil {
-               log.Printf("Couldn't make client %v", err)
+               az.logger.WithError(err).Warn("Couldn't make client")
                return
        }
 
@@ -528,7 +529,7 @@ func (az *AzureInstanceSet) ManageBlobs() {
        for {
                response, err := blobcont.ListBlobs(page)
                if err != nil {
-                       log.Printf("Error listing blobs %v", err)
+                       az.logger.WithError(err).Warn("Error listing blobs")
                        return
                }
                for _, b := range response.Blobs {
@@ -538,7 +539,7 @@ func (az *AzureInstanceSet) ManageBlobs() {
                                b.Properties.LeaseStatus == "unlocked" &&
                                age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
 
-                               log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
+                               az.logger.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
                                az.deleteBlob <- b
                        }
                }