X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/646ea4328be842f4baa194205618c01910ec49db..93cfe7c262708fb09eda5aad1839c832816d4591:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index 66956b89ee..3c17b3bd06 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -26,7 +26,11 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute) +const ( + azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute) + azureDefaultListBlobsMaxAttempts = 12 + azureDefaultListBlobsRetryDelay = arvados.Duration(10 * time.Second) +) var ( azureMaxGetBytes int @@ -108,6 +112,8 @@ type AzureBlobVolume struct { ReadOnly bool RequestTimeout arvados.Duration StorageClasses []string + ListBlobsRetryDelay arvados.Duration + ListBlobsMaxAttempts int azClient storage.Client container *azureContainer @@ -148,7 +154,13 @@ func (v *AzureBlobVolume) Type() string { } // Start implements Volume. -func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error { +func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error { + if v.ListBlobsRetryDelay == 0 { + v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay + } + if v.ListBlobsMaxAttempts == 0 { + v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts + } if v.ContainerName == "" { return errors.New("no container name given") } @@ -186,9 +198,7 @@ func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.Co } // Set up prometheus metrics lbls := prometheus.Labels{"device_id": v.DeviceID()} - v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls) - v.container.stats.errCounters = errCounters.MustCurryWith(lbls) - v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls) + v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls) return nil } @@ -488,8 +498,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { Prefix: prefix, Include: &storage.IncludeBlobDataset{Metadata: true}, } - for { - resp, err := v.container.ListBlobs(params) + for page := 1; ; page++ { + resp, err := v.listBlobs(page, params) if err != nil { return err } @@ -519,6 +529,22 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { } } +// call v.container.ListBlobs, retrying if needed. +func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { + for i := 0; i < v.ListBlobsMaxAttempts; i++ { + resp, err = v.container.ListBlobs(params) + err = v.translateError(err) + if err == VolumeBusyError { + log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err) + time.Sleep(time.Duration(v.ListBlobsRetryDelay)) + continue + } else { + break + } + } + return +} + // Trash a Keep block. func (v *AzureBlobVolume) Trash(loc string) error { if v.ReadOnly { @@ -676,8 +702,8 @@ func (v *AzureBlobVolume) EmptyTrash() { } params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}} - for { - resp, err := v.container.ListBlobs(params) + for page := 1; ; page++ { + resp, err := v.listBlobs(page, params) if err != nil { log.Printf("EmptyTrash: ListBlobs: %v", err) break