2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / azure_blob_volume.go
index 31660614f3c8fd213e7859ae9634e798ac83754b..2c8a79350c86b02e08eea2007c58a8f2e632ca47 100644 (file)
@@ -147,24 +147,22 @@ func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a BlockWrite operation is in progress,
 // and wait for it to finish writing.
-func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        trashed, _, err := v.checkTrashed(hash)
        if err != nil {
-               return 0, err
+               return err
        }
        if trashed {
-               return 0, os.ErrNotExist
+               return os.ErrNotExist
        }
        buf, err := v.bufferPool.GetContext(ctx)
        if err != nil {
-               return 0, err
+               return err
        }
        defer v.bufferPool.Put(buf)
-       streamer := newStreamWriterAt(writeTo, 65536, buf)
-       defer streamer.Close()
        var deadline time.Time
-       size, err := v.get(ctx, hash, streamer)
-       for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+       wrote, err := v.get(ctx, hash, w)
+       for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
                // Seeing a brand new empty block probably means we're
                // in a race with CreateBlob, which under the hood
                // (apparently) does "CreateEmpty" and "CommitData"
@@ -185,20 +183,15 @@ func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io
                }
                select {
                case <-ctx.Done():
-                       return 0, ctx.Err()
+                       return ctx.Err()
                case <-time.After(v.WriteRacePollTime.Duration()):
                }
-               size, err = v.get(ctx, hash, streamer)
+               wrote, err = v.get(ctx, hash, w)
        }
        if !deadline.IsZero() {
-               ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
-       }
-       if err != nil {
-               streamer.Close()
-               return streamer.Wrote(), err
+               ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
        }
-       err = streamer.Close()
-       return streamer.Wrote(), err
+       return err
 }
 
 func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
@@ -212,6 +205,7 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 
        pieces := 1
        expectSize := BlockSize
+       sizeKnown := false
        if pieceSize < BlockSize {
                // Unfortunately the handler doesn't tell us how long
                // the blob is expected to be, so we have to ask
@@ -225,15 +219,15 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
                }
                expectSize = int(props.ContentLength)
                pieces = (expectSize + pieceSize - 1) / pieceSize
+               sizeKnown = true
        }
 
        if expectSize == 0 {
                return 0, nil
        }
 
-       // We'll update this actualSize if/when we get the last piece.
-       actualSize := -1
        errors := make(chan error, pieces)
+       var wrote atomic.Int64
        var wg sync.WaitGroup
        wg.Add(pieces)
        for p := 0; p < pieces; p++ {
@@ -289,31 +283,24 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
                                rdr.Close()
                        }()
                        n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
-                       if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+                       wrote.Add(n)
+                       if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
                                // If we don't know the actual size,
                                // and just tried reading 64 MiB, it's
                                // normal to encounter EOF.
                        } else if err != nil {
-                               if ctx.Err() == nil {
-                                       errors <- err
-                               }
+                               errors <- err
                                cancel()
                                return
                        }
-                       if p == pieces-1 {
-                               actualSize = startPos + int(n)
-                       }
                }(p)
        }
        wg.Wait()
        close(errors)
        if len(errors) > 0 {
-               return 0, v.translateError(<-errors)
-       }
-       if ctx.Err() != nil {
-               return 0, ctx.Err()
+               return int(wrote.Load()), v.translateError(<-errors)
        }
-       return actualSize, nil
+       return int(wrote.Load()), ctx.Err()
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its