X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2c6557f613fcf6cdcebb08c321a5d061aeb780c6..293631794d64c64986ba0db2568345c005c90790:/services/keepstore/azure_blob_volume.go?ds=sidebyside diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index bdd669bb46..2c8a79350c 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -31,7 +31,7 @@ func init() { } func newAzureBlobVolume(params newVolumeParams) (volume, error) { - v := &AzureBlobVolume{ + v := &azureBlobVolume{ RequestTimeout: azureDefaultRequestTimeout, WriteRaceInterval: azureDefaultWriteRaceInterval, WriteRacePollTime: azureDefaultWriteRacePollTime, @@ -79,7 +79,7 @@ func newAzureBlobVolume(params newVolumeParams) (volume, error) { return v, v.check() } -func (v *AzureBlobVolume) check() error { +func (v *azureBlobVolume) check() error { lbls := prometheus.Labels{"device_id": v.DeviceID()} v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) return nil @@ -93,9 +93,9 @@ const ( azureDefaultWriteRacePollTime = arvados.Duration(time.Second) ) -// An AzureBlobVolume stores and retrieves blocks in an Azure Blob +// An azureBlobVolume stores and retrieves blocks in an Azure Blob // container. -type AzureBlobVolume struct { +type azureBlobVolume struct { StorageAccountName string StorageAccountKey string StorageBaseURL string // "" means default, "core.windows.net" @@ -125,12 +125,12 @@ func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Resp } // DeviceID returns a globally unique ID for the storage container. -func (v *AzureBlobVolume) DeviceID() string { +func (v *azureBlobVolume) DeviceID() string { return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName } // Return true if expires_at metadata attribute is found on the block -func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) { +func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) { metadata, err := v.container.GetBlobMetadata(loc) if err != nil { return false, metadata, v.translateError(err) @@ -147,24 +147,22 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a BlockWrite operation is in progress, // and wait for it to finish writing. -func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { trashed, _, err := v.checkTrashed(hash) if err != nil { - return 0, err + return err } if trashed { - return 0, os.ErrNotExist + return os.ErrNotExist } buf, err := v.bufferPool.GetContext(ctx) if err != nil { - return 0, err + return err } defer v.bufferPool.Put(buf) - streamer := newStreamWriterAt(writeTo, 65536, buf) - defer streamer.Close() var deadline time.Time - size, err := v.get(ctx, hash, streamer) - for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" { + wrote, err := v.get(ctx, hash, w) + for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" { // Seeing a brand new empty block probably means we're // in a race with CreateBlob, which under the hood // (apparently) does "CreateEmpty" and "CommitData" @@ -185,23 +183,18 @@ func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io } select { case <-ctx.Done(): - return 0, ctx.Err() + return ctx.Err() case <-time.After(v.WriteRacePollTime.Duration()): } - size, err = v.get(ctx, hash, streamer) + wrote, err = v.get(ctx, hash, w) } if !deadline.IsZero() { - ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size) - } - if err != nil { - streamer.Close() - return streamer.Wrote(), err + ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote) } - err = streamer.Close() - return streamer.Wrote(), err + return err } -func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) { +func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -212,6 +205,7 @@ func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) pieces := 1 expectSize := BlockSize + sizeKnown := false if pieceSize < BlockSize { // Unfortunately the handler doesn't tell us how long // the blob is expected to be, so we have to ask @@ -225,15 +219,15 @@ func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) } expectSize = int(props.ContentLength) pieces = (expectSize + pieceSize - 1) / pieceSize + sizeKnown = true } if expectSize == 0 { return 0, nil } - // We'll update this actualSize if/when we get the last piece. - actualSize := -1 errors := make(chan error, pieces) + var wrote atomic.Int64 var wg sync.WaitGroup wg.Add(pieces) for p := 0; p < pieces; p++ { @@ -289,36 +283,29 @@ func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) rdr.Close() }() n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos)) - if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) { + wrote.Add(n) + if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) { // If we don't know the actual size, // and just tried reading 64 MiB, it's // normal to encounter EOF. } else if err != nil { - if ctx.Err() == nil { - errors <- err - } + errors <- err cancel() return } - if p == pieces-1 { - actualSize = startPos + int(n) - } }(p) } wg.Wait() close(errors) if len(errors) > 0 { - return 0, v.translateError(<-errors) - } - if ctx.Err() != nil { - return 0, ctx.Err() + return int(wrote.Load()), v.translateError(<-errors) } - return actualSize, nil + return int(wrote.Load()), ctx.Err() } // BlockWrite stores a block on the volume. If it already exists, its // timestamp is updated. -func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { +func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { // Send the block data through a pipe, so that (if we need to) // we can close the pipe early and abandon our // CreateBlockBlobFromReader() goroutine, without worrying @@ -359,7 +346,7 @@ func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []by } // BlockTouch updates the last-modified property of a block blob. -func (v *AzureBlobVolume) BlockTouch(hash string) error { +func (v *azureBlobVolume) BlockTouch(hash string) error { trashed, metadata, err := v.checkTrashed(hash) if err != nil { return err @@ -373,7 +360,7 @@ func (v *AzureBlobVolume) BlockTouch(hash string) error { } // Mtime returns the last-modified property of a block blob. -func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) { +func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) { trashed, _, err := v.checkTrashed(hash) if err != nil { return time.Time{}, err @@ -391,7 +378,7 @@ func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) { // Index writes a list of Keep blocks that are stored in the // container. -func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { +func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { params := storage.ListBlobsParameters{ Prefix: prefix, Include: &storage.IncludeBlobDataset{Metadata: true}, @@ -432,7 +419,7 @@ func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Wr } // call v.container.ListBlobs, retrying if needed. -func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { +func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { for i := 0; i < v.ListBlobsMaxAttempts; i++ { resp, err = v.container.ListBlobs(params) err = v.translateError(err) @@ -448,7 +435,7 @@ func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters } // Trash a Keep block. -func (v *AzureBlobVolume) BlockTrash(loc string) error { +func (v *azureBlobVolume) BlockTrash(loc string) error { // Ideally we would use If-Unmodified-Since, but that // particular condition seems to be ignored by Azure. Instead, // we get the Etag before checking Mtime, and use If-Match to @@ -481,7 +468,7 @@ func (v *AzureBlobVolume) BlockTrash(loc string) error { // BlockUntrash deletes the expires_at metadata attribute for the // specified block blob. -func (v *AzureBlobVolume) BlockUntrash(hash string) error { +func (v *azureBlobVolume) BlockUntrash(hash string) error { // if expires_at does not exist, return NotFoundError metadata, err := v.container.GetBlobMetadata(hash) if err != nil { @@ -499,7 +486,7 @@ func (v *AzureBlobVolume) BlockUntrash(hash string) error { // If possible, translate an Azure SDK error to a recognizable error // like os.ErrNotExist. -func (v *AzureBlobVolume) translateError(err error) error { +func (v *azureBlobVolume) translateError(err error) error { switch { case err == nil: return err @@ -519,13 +506,13 @@ func (v *AzureBlobVolume) translateError(err error) error { var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) -func (v *AzureBlobVolume) isKeepBlock(s string) bool { +func (v *azureBlobVolume) isKeepBlock(s string) bool { return keepBlockRegexp.MatchString(s) } // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime // and deletes them from the volume. -func (v *AzureBlobVolume) EmptyTrash() { +func (v *azureBlobVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64 @@ -593,7 +580,7 @@ func (v *AzureBlobVolume) EmptyTrash() { } // InternalStats returns bucket I/O and API call counters. -func (v *AzureBlobVolume) InternalStats() interface{} { +func (v *azureBlobVolume) InternalStats() interface{} { return &v.container.stats }