2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / keepstore.go
index 62b6d15e565cfe6e60671055f073a6f9c11de864..c9a80230597c5eacc5bac7e66d48d82439420610 100644 (file)
@@ -243,13 +243,58 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
        } else {
                out = io.MultiWriter(out, hashcheck)
        }
+
+       buf, err := ks.bufferPool.GetContext(ctx)
+       if err != nil {
+               return 0, err
+       }
+       defer ks.bufferPool.Put(buf)
+       streamer := newStreamWriterAt(out, 65536, buf)
+       defer streamer.Close()
+
        var errToCaller error = os.ErrNotExist
        for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
                if ctx.Err() != nil {
                        return 0, ctx.Err()
                }
-               n, err = mnt.BlockRead(ctx, li.hash, out)
-               if err == nil && li.size > 0 && n != li.size {
+               err := mnt.BlockRead(ctx, li.hash, streamer)
+               if err != nil {
+                       if streamer.WroteAt() != 0 {
+                               // BlockRead encountered an error
+                               // after writing some data, so it's
+                               // too late to try another
+                               // volume. Flush streamer before
+                               // calling Wrote() to ensure our
+                               // return value accurately reflects
+                               // the number of bytes written to
+                               // opts.WriteTo.
+                               streamer.Close()
+                               return streamer.Wrote(), err
+                       }
+                       if !os.IsNotExist(err) {
+                               errToCaller = err
+                       }
+                       continue
+               }
+               if li.size == 0 {
+                       // hashCheckingWriter isn't in use because we
+                       // don't know the expected size. All we can do
+                       // is check after writing all the data, and
+                       // trust the caller is doing a HEAD request so
+                       // it's not too late to set an error code in
+                       // the response header.
+                       err = streamer.Close()
+                       if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+                               err = errChecksum
+                       }
+                       if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+                               // We didn't set the content-length header
+                               // above because we didn't know the block size
+                               // until now.
+                               rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+                       }
+                       return streamer.WroteAt(), err
+               } else if streamer.WroteAt() != li.size {
                        // If the backend read fewer bytes than
                        // expected but returns no error, we can
                        // classify this as a checksum error (even
@@ -260,42 +305,17 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
                        // it anyway, but if it's a HEAD request the
                        // caller can still change the response status
                        // code.
-                       return n, errChecksum
-               }
-               if err == nil && li.size == 0 {
-                       // hashCheckingWriter isn't in use because we
-                       // don't know the expected size. All we can do
-                       // is check after writing all the data, and
-                       // trust the caller is doing a HEAD request so
-                       // it's not too late to set an error code in
-                       // the response header.
-                       if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
-                               return n, errChecksum
-                       }
-               }
-               if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
-                       // We didn't set the content-length header
-                       // above because we didn't know the block size
-                       // until now.
-                       rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
-               }
-               if n > 0 || err == nil {
-                       // success, or there's an error but we can't
-                       // retry because we've already sent some data.
-                       return n, err
-               }
-               if !os.IsNotExist(err) {
-                       // If some volume returns a transient error,
-                       // return it to the caller instead of "Not
-                       // found" so it can retry.
-                       errToCaller = err
+                       return streamer.WroteAt(), errChecksum
                }
+               // Ensure streamer flushes all buffered data without
+               // errors.
+               err = streamer.Close()
+               return streamer.Wrote(), err
        }
        return 0, errToCaller
 }
 
 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
-       ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
        token := ctxToken(ctx)
        if token == "" {
                return 0, errNoTokenProvided
@@ -459,7 +479,7 @@ func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOpti
                        continue
                }
                cmp := &checkEqual{Expect: opts.Data}
-               if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+               if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
                        if !cmp.Equal() {
                                return resp, errCollision
                        }
@@ -564,25 +584,28 @@ func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
        return mnts
 }
 
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
 // interface) is equal to the expected data.
 //
 // Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
 type checkEqual struct {
-       Expect     []byte
-       equalUntil int
+       Expect   []byte
+       equal    atomic.Int64
+       notequal atomic.Bool
 }
 
 func (ce *checkEqual) Equal() bool {
-       return ce.equalUntil == len(ce.Expect)
+       return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
 }
 
-func (ce *checkEqual) Write(p []byte) (int, error) {
-       endpos := ce.equalUntil + len(p)
-       if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
-               ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+       endpos := int(offset) + len(p)
+       if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+               ce.equal.Add(int64(len(p)))
        } else {
-               ce.equalUntil = -1
+               ce.notequal.Store(true)
        }
        return len(p), nil
 }