-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
import (
"bytes"
"context"
"io"
"io/ioutil"
- "sync"
)
// getWithPipe invokes getter and copies the resulting data into
// buf. If ctx is done before all data is copied, getWithPipe closes
// the pipe with an error, and returns early with an error.
-func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) {
+func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) {
piper, pipew := io.Pipe()
- go getter(ctx, loc, pipew)
+ go func() {
+ pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew))
+ }()
done := make(chan struct{})
var size int
var err error
}
}
-type errorReadCloser struct {
- *io.PipeReader
- err error
- mtx sync.Mutex
-}
-
-func (erc *errorReadCloser) Close() error {
- erc.mtx.Lock()
- defer erc.mtx.Unlock()
- erc.PipeReader.Close()
- return erc.err
-}
-
-func (erc *errorReadCloser) SetError(err error) {
- erc.mtx.Lock()
- defer erc.mtx.Unlock()
- erc.err = err
-}
-
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
// from buf into the pipe. If ctx is done before all data is copied,
// putWithPipe closes the pipe with an error, and returns early with
// an error.
-func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error {
+func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error {
piper, pipew := io.Pipe()
copyErr := make(chan error)
go func() {
close(copyErr)
}()
- erc := errorReadCloser{
- PipeReader: piper,
- err: nil,
- }
putErr := make(chan error, 1)
go func() {
- putErr <- putter(ctx, loc, &erc)
+ putErr <- bw.WriteBlock(ctx, loc, piper)
close(putErr)
}()
// Ensure io.Copy goroutine isn't blocked writing to pipew
// (otherwise, io.Copy is still using buf so it isn't safe to
- // return). This can cause pipew to receive corrupt data, so
- // we first ensure putter() will get an error when calling
- // erc.Close().
- erc.SetError(err)
+ // return). This can cause pipew to receive corrupt data if
+ // err came from copyErr or ctx.Done() before the copy
+ // finished. That's OK, though: in that case err != nil, and
+ // CloseWithErr(err) ensures putter() will get an error from
+ // piper.Read() before seeing EOF.
go pipew.CloseWithError(err)
go io.Copy(ioutil.Discard, piper)
<-copyErr