15003: Fix errant warnings on Workbench configs.
[arvados.git] / services / keepstore / azure_blob_volume.go
index 4f7339facf4ace001ac886a5076afc217e040c18..3c17b3bd0641e2bee23007d775b1740e2c7a14d4 100644 (file)
@@ -23,9 +23,14 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
-const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+const (
+       azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
+       azureDefaultListBlobsMaxAttempts = 12
+       azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
+)
 
 var (
        azureMaxGetBytes           int
@@ -107,6 +112,8 @@ type AzureBlobVolume struct {
        ReadOnly              bool
        RequestTimeout        arvados.Duration
        StorageClasses        []string
+       ListBlobsRetryDelay   arvados.Duration
+       ListBlobsMaxAttempts  int
 
        azClient  storage.Client
        container *azureContainer
@@ -147,7 +154,13 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+       if v.ListBlobsRetryDelay == 0 {
+               v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
+       }
+       if v.ListBlobsMaxAttempts == 0 {
+               v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
+       }
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -183,6 +196,10 @@ func (v *AzureBlobVolume) Start() error {
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -481,8 +498,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                Prefix:  prefix,
                Include: &storage.IncludeBlobDataset{Metadata: true},
        }
-       for {
-               resp, err := v.container.ListBlobs(params)
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
                if err != nil {
                        return err
                }
@@ -512,6 +529,22 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
+// call v.container.ListBlobs, retrying if needed.
+func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+       for i := 0; i < v.ListBlobsMaxAttempts; i++ {
+               resp, err = v.container.ListBlobs(params)
+               err = v.translateError(err)
+               if err == VolumeBusyError {
+                       log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
+                       time.Sleep(time.Duration(v.ListBlobsRetryDelay))
+                       continue
+               } else {
+                       break
+               }
+       }
+       return
+}
+
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
        if v.ReadOnly {
@@ -669,8 +702,8 @@ func (v *AzureBlobVolume) EmptyTrash() {
        }
 
        params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
-       for {
-               resp, err := v.container.ListBlobs(params)
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break
@@ -727,6 +760,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -734,6 +768,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -742,6 +777,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -750,6 +786,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -758,6 +795,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -785,6 +823,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -799,6 +838,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -808,6 +848,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -815,6 +856,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)