}
func newAzureBlobVolume(params newVolumeParams) (volume, error) {
- v := &AzureBlobVolume{
+ v := &azureBlobVolume{
RequestTimeout: azureDefaultRequestTimeout,
WriteRaceInterval: azureDefaultWriteRaceInterval,
WriteRacePollTime: azureDefaultWriteRacePollTime,
return v, v.check()
}
-func (v *AzureBlobVolume) check() error {
+func (v *azureBlobVolume) check() error {
lbls := prometheus.Labels{"device_id": v.DeviceID()}
v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
return nil
azureDefaultWriteRacePollTime = arvados.Duration(time.Second)
)
-// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// An azureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
-type AzureBlobVolume struct {
+type azureBlobVolume struct {
StorageAccountName string
StorageAccountKey string
StorageBaseURL string // "" means default, "core.windows.net"
}
// DeviceID returns a globally unique ID for the storage container.
-func (v *AzureBlobVolume) DeviceID() string {
+func (v *azureBlobVolume) DeviceID() string {
return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
}
// Return true if expires_at metadata attribute is found on the block
-func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
metadata, err := v.container.GetBlobMetadata(loc)
if err != nil {
return false, metadata, v.translateError(err)
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a BlockWrite operation is in progress,
// and wait for it to finish writing.
-func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
- return 0, err
+ return err
}
if trashed {
- return 0, os.ErrNotExist
+ return os.ErrNotExist
}
buf, err := v.bufferPool.GetContext(ctx)
if err != nil {
- return 0, err
+ return err
}
defer v.bufferPool.Put(buf)
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
var deadline time.Time
- size, err := v.get(ctx, hash, streamer)
- for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+ wrote, err := v.get(ctx, hash, w)
+ for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
}
select {
case <-ctx.Done():
- return 0, ctx.Err()
+ return ctx.Err()
case <-time.After(v.WriteRacePollTime.Duration()):
}
- size, err = v.get(ctx, hash, streamer)
+ wrote, err = v.get(ctx, hash, w)
}
if !deadline.IsZero() {
- ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
- }
- if err != nil {
- streamer.Close()
- return streamer.Wrote(), err
+ ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
}
- err = streamer.Close()
- return streamer.Wrote(), err
+ return err
}
-func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
+func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pieces := 1
expectSize := BlockSize
+ sizeKnown := false
if pieceSize < BlockSize {
// Unfortunately the handler doesn't tell us how long
// the blob is expected to be, so we have to ask
}
expectSize = int(props.ContentLength)
pieces = (expectSize + pieceSize - 1) / pieceSize
+ sizeKnown = true
}
if expectSize == 0 {
return 0, nil
}
- // We'll update this actualSize if/when we get the last piece.
- actualSize := -1
errors := make(chan error, pieces)
+ var wrote atomic.Int64
var wg sync.WaitGroup
wg.Add(pieces)
for p := 0; p < pieces; p++ {
rdr.Close()
}()
n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
- if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+ wrote.Add(n)
+ if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
// If we don't know the actual size,
// and just tried reading 64 MiB, it's
// normal to encounter EOF.
} else if err != nil {
- if ctx.Err() == nil {
- errors <- err
- }
+ errors <- err
cancel()
return
}
- if p == pieces-1 {
- actualSize = startPos + int(n)
- }
}(p)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
- return 0, v.translateError(<-errors)
- }
- if ctx.Err() != nil {
- return 0, ctx.Err()
+ return int(wrote.Load()), v.translateError(<-errors)
}
- return actualSize, nil
+ return int(wrote.Load()), ctx.Err()
}
// BlockWrite stores a block on the volume. If it already exists, its
// timestamp is updated.
-func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
// Send the block data through a pipe, so that (if we need to)
// we can close the pipe early and abandon our
// CreateBlockBlobFromReader() goroutine, without worrying
}
// BlockTouch updates the last-modified property of a block blob.
-func (v *AzureBlobVolume) BlockTouch(hash string) error {
+func (v *azureBlobVolume) BlockTouch(hash string) error {
trashed, metadata, err := v.checkTrashed(hash)
if err != nil {
return err
}
// Mtime returns the last-modified property of a block blob.
-func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) {
+func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
return time.Time{}, err
// Index writes a list of Keep blocks that are stored in the
// container.
-func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
params := storage.ListBlobsParameters{
Prefix: prefix,
Include: &storage.IncludeBlobDataset{Metadata: true},
}
// call v.container.ListBlobs, retrying if needed.
-func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
for i := 0; i < v.ListBlobsMaxAttempts; i++ {
resp, err = v.container.ListBlobs(params)
err = v.translateError(err)
}
// Trash a Keep block.
-func (v *AzureBlobVolume) BlockTrash(loc string) error {
+func (v *azureBlobVolume) BlockTrash(loc string) error {
// Ideally we would use If-Unmodified-Since, but that
// particular condition seems to be ignored by Azure. Instead,
// we get the Etag before checking Mtime, and use If-Match to
// BlockUntrash deletes the expires_at metadata attribute for the
// specified block blob.
-func (v *AzureBlobVolume) BlockUntrash(hash string) error {
+func (v *azureBlobVolume) BlockUntrash(hash string) error {
// if expires_at does not exist, return NotFoundError
metadata, err := v.container.GetBlobMetadata(hash)
if err != nil {
// If possible, translate an Azure SDK error to a recognizable error
// like os.ErrNotExist.
-func (v *AzureBlobVolume) translateError(err error) error {
+func (v *azureBlobVolume) translateError(err error) error {
switch {
case err == nil:
return err
var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+func (v *azureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
// and deletes them from the volume.
-func (v *AzureBlobVolume) EmptyTrash() {
+func (v *azureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int64
}
// InternalStats returns bucket I/O and API call counters.
-func (v *AzureBlobVolume) InternalStats() interface{} {
+func (v *azureBlobVolume) InternalStats() interface{} {
return &v.container.stats
}