X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b17f04b7797eda5a5d888264f7d480d762a9966f..93cfe7c262708fb09eda5aad1839c832816d4591:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index f18d82c06b..3c17b3bd06 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -18,13 +18,19 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/Azure/azure-sdk-for-go/storage" + "github.com/prometheus/client_golang/prometheus" ) -const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute) +const ( + azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute) + azureDefaultListBlobsMaxAttempts = 12 + azureDefaultListBlobsRetryDelay = arvados.Duration(10 * time.Second) +) var ( azureMaxGetBytes int @@ -106,6 +112,8 @@ type AzureBlobVolume struct { ReadOnly bool RequestTimeout arvados.Duration StorageClasses []string + ListBlobsRetryDelay arvados.Duration + ListBlobsMaxAttempts int azClient storage.Client container *azureContainer @@ -146,7 +154,13 @@ func (v *AzureBlobVolume) Type() string { } // Start implements Volume. -func (v *AzureBlobVolume) Start() error { +func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error { + if v.ListBlobsRetryDelay == 0 { + v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay + } + if v.ListBlobsMaxAttempts == 0 { + v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts + } if v.ContainerName == "" { return errors.New("no container name given") } @@ -182,6 +196,10 @@ func (v *AzureBlobVolume) Start() error { } else if !ok { return fmt.Errorf("Azure container %q does not exist", v.ContainerName) } + // Set up prometheus metrics + lbls := prometheus.Labels{"device_id": v.DeviceID()} + v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls) + return nil } @@ -452,7 +470,7 @@ func (v *AzureBlobVolume) Touch(loc string) error { return os.ErrNotExist } - metadata["touch"] = fmt.Sprintf("%d", time.Now()) + metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix()) return v.container.SetBlobMetadata(loc, metadata, nil) } @@ -480,8 +498,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { Prefix: prefix, Include: &storage.IncludeBlobDataset{Metadata: true}, } - for { - resp, err := v.container.ListBlobs(params) + for page := 1; ; page++ { + resp, err := v.listBlobs(page, params) if err != nil { return err } @@ -511,6 +529,22 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error { } } +// call v.container.ListBlobs, retrying if needed. +func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { + for i := 0; i < v.ListBlobsMaxAttempts; i++ { + resp, err = v.container.ListBlobs(params) + err = v.translateError(err) + if err == VolumeBusyError { + log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err) + time.Sleep(time.Duration(v.ListBlobsRetryDelay)) + continue + } else { + break + } + } + return +} + // Trash a Keep block. func (v *AzureBlobVolume) Trash(loc string) error { if v.ReadOnly { @@ -602,6 +636,9 @@ func (v *AzureBlobVolume) translateError(err error) error { switch { case err == nil: return err + case strings.Contains(err.Error(), "StatusCode=503"): + // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804) + return VolumeBusyError case strings.Contains(err.Error(), "Not Found"): // "storage: service returned without a response body (404 Not Found)" return os.ErrNotExist @@ -620,49 +657,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool { // and deletes them from the volume. func (v *AzureBlobVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 - var blocksDeleted, blocksInTrash int - params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}} + var blocksDeleted, blocksInTrash int64 - for { - resp, err := v.container.ListBlobs(params) + doBlob := func(b storage.Blob) { + // Check whether the block is flagged as trash + if b.Metadata["expires_at"] == "" { + return + } + + atomic.AddInt64(&blocksInTrash, 1) + atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength) + + expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64) if err != nil { - log.Printf("EmptyTrash: ListBlobs: %v", err) - break + log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err) + return } - for _, b := range resp.Blobs { - // Check if the block is expired - if b.Metadata["expires_at"] == "" { - continue - } - blocksInTrash++ - bytesInTrash += b.Properties.ContentLength + if expiresAt > time.Now().Unix() { + return + } - expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64) - if err != nil { - log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err) - continue - } + err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{ + IfMatch: b.Properties.Etag, + }) + if err != nil { + log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err) + return + } + atomic.AddInt64(&blocksDeleted, 1) + atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength) + } - if expiresAt > time.Now().Unix() { - continue + var wg sync.WaitGroup + todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers) + for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for b := range todo { + doBlob(b) } + }() + } - err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{ - IfMatch: b.Properties.Etag, - }) - if err != nil { - log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err) - continue - } - blocksDeleted++ - bytesDeleted += b.Properties.ContentLength + params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}} + for page := 1; ; page++ { + resp, err := v.listBlobs(page, params) + if err != nil { + log.Printf("EmptyTrash: ListBlobs: %v", err) + break + } + for _, b := range resp.Blobs { + todo <- b } if resp.NextMarker == "" { break } params.Marker = resp.NextMarker } + close(todo) + wg.Wait() log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } @@ -705,6 +760,7 @@ type azureContainer struct { } func (c *azureContainer) Exists() (bool, error) { + c.stats.TickOps("exists") c.stats.Tick(&c.stats.Ops) ok, err := c.ctr.Exists() c.stats.TickErr(err) @@ -712,6 +768,7 @@ func (c *azureContainer) Exists() (bool, error) { } func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) { + c.stats.TickOps("get_metadata") c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps) b := c.ctr.GetBlobReference(bname) err := b.GetMetadata(nil) @@ -720,6 +777,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er } func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) { + c.stats.TickOps("get_properties") c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps) b := c.ctr.GetBlobReference(bname) err := b.GetProperties(nil) @@ -728,6 +786,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie } func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) { + c.stats.TickOps("get") c.stats.Tick(&c.stats.Ops, &c.stats.GetOps) b := c.ctr.GetBlobReference(bname) rdr, err := b.Get(nil) @@ -736,6 +795,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) { } func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) { + c.stats.TickOps("get_range") c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps) b := c.ctr.GetBlobReference(bname) rdr, err := b.GetRange(&storage.GetBlobRangeOptions{ @@ -763,6 +823,7 @@ func (r *readerWithAzureLen) Len() int { } func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error { + c.stats.TickOps("create") c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps) if size != 0 { rdr = &readerWithAzureLen{ @@ -777,6 +838,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i } func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error { + c.stats.TickOps("set_metadata") c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps) b := c.ctr.GetBlobReference(bname) b.Metadata = m @@ -786,6 +848,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o } func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) { + c.stats.TickOps("list") c.stats.Tick(&c.stats.Ops, &c.stats.ListOps) resp, err := c.ctr.ListBlobs(params) c.stats.TickErr(err) @@ -793,6 +856,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage. } func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error { + c.stats.TickOps("delete") c.stats.Tick(&c.stats.Ops, &c.stats.DelOps) b := c.ctr.GetBlobReference(bname) err := b.Delete(opts)