} else {
out = io.MultiWriter(out, hashcheck)
}
+
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer ks.bufferPool.Put(buf)
+ streamer := newStreamWriterAt(out, 65536, buf)
+ defer streamer.Close()
+
var errToCaller error = os.ErrNotExist
for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
if ctx.Err() != nil {
return 0, ctx.Err()
}
- n, err = mnt.BlockRead(ctx, li.hash, out)
- if err == nil && li.size > 0 && n != li.size {
+ err := mnt.BlockRead(ctx, li.hash, streamer)
+ if err != nil {
+ if streamer.WroteAt() != 0 {
+ // BlockRead encountered an error
+ // after writing some data, so it's
+ // too late to try another
+ // volume. Flush streamer before
+ // calling Wrote() to ensure our
+ // return value accurately reflects
+ // the number of bytes written to
+ // opts.WriteTo.
+ streamer.Close()
+ return streamer.Wrote(), err
+ }
+ if !os.IsNotExist(err) {
+ errToCaller = err
+ }
+ continue
+ }
+ if li.size == 0 {
+ // hashCheckingWriter isn't in use because we
+ // don't know the expected size. All we can do
+ // is check after writing all the data, and
+ // trust the caller is doing a HEAD request so
+ // it's not too late to set an error code in
+ // the response header.
+ err = streamer.Close()
+ if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+ err = errChecksum
+ }
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+ // We didn't set the content-length header
+ // above because we didn't know the block size
+ // until now.
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+ }
+ return streamer.WroteAt(), err
+ } else if streamer.WroteAt() != li.size {
// If the backend read fewer bytes than
// expected but returns no error, we can
// classify this as a checksum error (even
// it anyway, but if it's a HEAD request the
// caller can still change the response status
// code.
- return n, errChecksum
- }
- if err == nil && li.size == 0 {
- // hashCheckingWriter isn't in use because we
- // don't know the expected size. All we can do
- // is check after writing all the data, and
- // trust the caller is doing a HEAD request so
- // it's not too late to set an error code in
- // the response header.
- if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
- return n, errChecksum
- }
- }
- if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
- // We didn't set the content-length header
- // above because we didn't know the block size
- // until now.
- rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
- }
- if n > 0 || err == nil {
- // success, or there's an error but we can't
- // retry because we've already sent some data.
- return n, err
- }
- if !os.IsNotExist(err) {
- // If some volume returns a transient error,
- // return it to the caller instead of "Not
- // found" so it can retry.
- errToCaller = err
+ return streamer.WroteAt(), errChecksum
}
+ // Ensure streamer flushes all buffered data without
+ // errors.
+ err = streamer.Close()
+ return streamer.Wrote(), err
}
return 0, errToCaller
}
func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
- ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
token := ctxToken(ctx)
if token == "" {
return 0, errNoTokenProvided
continue
}
cmp := &checkEqual{Expect: opts.Data}
- if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
if !cmp.Equal() {
return resp, errCollision
}
return mnts
}
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
// interface) is equal to the expected data.
//
// Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
type checkEqual struct {
- Expect []byte
- equalUntil int
+ Expect []byte
+ equal atomic.Int64
+ notequal atomic.Bool
}
func (ce *checkEqual) Equal() bool {
- return ce.equalUntil == len(ce.Expect)
+ return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
}
-func (ce *checkEqual) Write(p []byte) (int, error) {
- endpos := ce.equalUntil + len(p)
- if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
- ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+ endpos := int(offset) + len(p)
+ if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+ ce.equal.Add(int64(len(p)))
} else {
- ce.equalUntil = -1
+ ce.notequal.Store(true)
}
return len(p), nil
}