15003: Fix errant warnings on Workbench configs.
[arvados.git] / services / keepstore / azure_blob_volume.go
index c4eb8b23d5fc10078d9891ed2fbc88ef315ba30d..3c17b3bd0641e2bee23007d775b1740e2c7a14d4 100644 (file)
@@ -18,13 +18,19 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
-const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+const (
+       azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
+       azureDefaultListBlobsMaxAttempts = 12
+       azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
+)
 
 var (
        azureMaxGetBytes           int
@@ -105,6 +111,9 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
+       ListBlobsRetryDelay   arvados.Duration
+       ListBlobsMaxAttempts  int
 
        azClient  storage.Client
        container *azureContainer
@@ -145,7 +154,13 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+       if v.ListBlobsRetryDelay == 0 {
+               v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
+       }
+       if v.ListBlobsMaxAttempts == 0 {
+               v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
+       }
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -181,6 +196,10 @@ func (v *AzureBlobVolume) Start() error {
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -451,7 +470,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
                return os.ErrNotExist
        }
 
-       metadata["touch"] = fmt.Sprintf("%d", time.Now())
+       metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
        return v.container.SetBlobMetadata(loc, metadata, nil)
 }
 
@@ -479,8 +498,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                Prefix:  prefix,
                Include: &storage.IncludeBlobDataset{Metadata: true},
        }
-       for {
-               resp, err := v.container.ListBlobs(params)
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
                if err != nil {
                        return err
                }
@@ -510,6 +529,22 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
+// call v.container.ListBlobs, retrying if needed.
+func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+       for i := 0; i < v.ListBlobsMaxAttempts; i++ {
+               resp, err = v.container.ListBlobs(params)
+               err = v.translateError(err)
+               if err == VolumeBusyError {
+                       log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
+                       time.Sleep(time.Duration(v.ListBlobsRetryDelay))
+                       continue
+               } else {
+                       break
+               }
+       }
+       return
+}
+
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
        if v.ReadOnly {
@@ -590,12 +625,20 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
        switch {
        case err == nil:
                return err
+       case strings.Contains(err.Error(), "StatusCode=503"):
+               // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
+               return VolumeBusyError
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
@@ -614,49 +657,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
-       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       var blocksDeleted, blocksInTrash int64
+
+       doBlob := func(b storage.Blob) {
+               // Check whether the block is flagged as trash
+               if b.Metadata["expires_at"] == "" {
+                       return
+               }
 
-       for {
-               resp, err := v.container.ListBlobs(params)
+               atomic.AddInt64(&blocksInTrash, 1)
+               atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
+
+               expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: ListBlobs: %v", err)
-                       break
+                       log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+                       return
                }
-               for _, b := range resp.Blobs {
-                       // Check if the block is expired
-                       if b.Metadata["expires_at"] == "" {
-                               continue
-                       }
 
-                       blocksInTrash++
-                       bytesInTrash += b.Properties.ContentLength
+               if expiresAt > time.Now().Unix() {
+                       return
+               }
 
-                       expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
-                       if err != nil {
-                               log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
-                               continue
-                       }
+               err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+                       IfMatch: b.Properties.Etag,
+               })
+               if err != nil {
+                       log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+                       return
+               }
+               atomic.AddInt64(&blocksDeleted, 1)
+               atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
+       }
 
-                       if expiresAt > time.Now().Unix() {
-                               continue
+       var wg sync.WaitGroup
+       todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for b := range todo {
+                               doBlob(b)
                        }
+               }()
+       }
 
-                       err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
-                               IfMatch: b.Properties.Etag,
-                       })
-                       if err != nil {
-                               log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
-                               continue
-                       }
-                       blocksDeleted++
-                       bytesDeleted += b.Properties.ContentLength
+       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       for page := 1; ; page++ {
+               resp, err := v.listBlobs(page, params)
+               if err != nil {
+                       log.Printf("EmptyTrash: ListBlobs: %v", err)
+                       break
+               }
+               for _, b := range resp.Blobs {
+                       todo <- b
                }
                if resp.NextMarker == "" {
                        break
                }
                params.Marker = resp.NextMarker
        }
+       close(todo)
+       wg.Wait()
 
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
@@ -699,6 +760,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -706,6 +768,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -714,6 +777,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -722,6 +786,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -730,6 +795,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -757,6 +823,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -771,6 +838,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -780,6 +848,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -787,6 +856,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)