14399: Retry Azure ListBlobs call after 503 errors.
[arvados.git] / services / keepstore / azure_blob_volume.go
index 6b5b233c2a6701912ce06b1356fdb864778d0cf8..3c17b3bd0641e2bee23007d775b1740e2c7a14d4 100644 (file)
@@ -26,7 +26,11 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+const (
+       azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
+       azureDefaultListBlobsMaxAttempts = 12
+       azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
+)
 
 var (
        azureMaxGetBytes           int
@@ -108,6 +112,8 @@ type AzureBlobVolume struct {
        ReadOnly              bool
        RequestTimeout        arvados.Duration
        StorageClasses        []string
+       ListBlobsRetryDelay   arvados.Duration
+       ListBlobsMaxAttempts  int
 
        azClient  storage.Client
        container *azureContainer
@@ -149,6 +155,12 @@ func (v *AzureBlobVolume) Type() string {
 
 // Start implements Volume.
 func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+       if v.ListBlobsRetryDelay == 0 {
+               v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
+       }
+       if v.ListBlobsMaxAttempts == 0 {
+               v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
+       }
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -486,8 +498,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                Prefix:  prefix,
                Include: &storage.IncludeBlobDataset{Metadata: true},
        }
-       for {
-               resp, err := v.container.ListBlobs(params)
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
                if err != nil {
                        return err
                }
@@ -517,6 +529,22 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
+// call v.container.ListBlobs, retrying if needed.
+func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+       for i := 0; i < v.ListBlobsMaxAttempts; i++ {
+               resp, err = v.container.ListBlobs(params)
+               err = v.translateError(err)
+               if err == VolumeBusyError {
+                       log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
+                       time.Sleep(time.Duration(v.ListBlobsRetryDelay))
+                       continue
+               } else {
+                       break
+               }
+       }
+       return
+}
+
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
        if v.ReadOnly {
@@ -674,8 +702,8 @@ func (v *AzureBlobVolume) EmptyTrash() {
        }
 
        params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
-       for {
-               resp, err := v.container.ListBlobs(params)
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break