From: Tom Clegg Date: Fri, 30 Dec 2016 19:39:52 +0000 (-0500) Subject: 10467: Remove unneeded errorReadCloser, make getter/putter interfaces more like Write... X-Git-Tag: 1.1.0~501^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/b3e5ea60bdecb41fbf954b67ab859dc4542d0c1a?hp=568c7abf660b7a68f70b6ea47ae2e7352233f053 10467: Remove unneeded errorReadCloser, make getter/putter interfaces more like WriterTo/ReaderFrom. --- diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go index 91aa27094b..0b3999c196 100644 --- a/services/keepstore/pipe_adapters.go +++ b/services/keepstore/pipe_adapters.go @@ -5,15 +5,16 @@ import ( "context" "io" "io/ioutil" - "sync" ) // getWithPipe invokes getter and copies the resulting data into // buf. If ctx is done before all data is copied, getWithPipe closes // the pipe with an error, and returns early with an error. -func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) { +func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, io.Writer) error) (int, error) { piper, pipew := io.Pipe() - go getter(ctx, loc, pipew) + go func() { + pipew.CloseWithError(getter(ctx, loc, pipew)) + }() done := make(chan struct{}) var size int var err error @@ -34,30 +35,11 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(contex } } -type errorReadCloser struct { - *io.PipeReader - err error - mtx sync.Mutex -} - -func (erc *errorReadCloser) Close() error { - erc.mtx.Lock() - defer erc.mtx.Unlock() - erc.PipeReader.Close() - return erc.err -} - -func (erc *errorReadCloser) SetError(err error) { - erc.mtx.Lock() - defer erc.mtx.Unlock() - erc.err = err -} - // putWithPipe invokes putter with a new pipe, and and copies data // from buf into the pipe. If ctx is done before all data is copied, // putWithPipe closes the pipe with an error, and returns early with // an error. -func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error { +func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.Reader) error) error { piper, pipew := io.Pipe() copyErr := make(chan error) go func() { @@ -66,13 +48,9 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex close(copyErr) }() - erc := errorReadCloser{ - PipeReader: piper, - err: nil, - } putErr := make(chan error, 1) go func() { - putErr <- putter(ctx, loc, &erc) + putErr <- putter(ctx, loc, piper) close(putErr) }() @@ -86,10 +64,11 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex // Ensure io.Copy goroutine isn't blocked writing to pipew // (otherwise, io.Copy is still using buf so it isn't safe to - // return). This can cause pipew to receive corrupt data, so - // we first ensure putter() will get an error when calling - // erc.Close(). - erc.SetError(err) + // return). This can cause pipew to receive corrupt data if + // err came from copyErr or ctx.Done() before the copy + // finished. That's OK, though: in that case err != nil, and + // CloseWithErr(err) ensures putter() will get an error from + // piper.Read() before seeing EOF. go pipew.CloseWithError(err) go io.Copy(ioutil.Discard, piper) <-copyErr diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 459e73a28f..56a46790e0 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -216,21 +216,19 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro return getWithPipe(ctx, loc, buf, v.get) } -func (v *UnixVolume) get(ctx context.Context, loc string, w *io.PipeWriter) { +func (v *UnixVolume) get(ctx context.Context, loc string, w io.Writer) error { path := v.blockPath(loc) stat, err := v.stat(path) if err != nil { - w.CloseWithError(v.translateError(err)) - return + return v.translateError(err) } - err = v.getFunc(ctx, path, func(rdr io.Reader) error { + return v.getFunc(ctx, path, func(rdr io.Reader) error { n, err := io.Copy(w, rdr) if err == nil && n != stat.Size() { err = io.ErrUnexpectedEOF } return err }) - w.CloseWithError(err) } // Compare returns nil if Get(loc) would return the same content as @@ -254,7 +252,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error { return putWithPipe(ctx, loc, block, v.put) } -func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) error { +func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.Reader) error { if v.ReadOnly { return MethodDisabledError } @@ -287,11 +285,6 @@ func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) err os.Remove(tmpfile.Name()) return err } - if err := rdr.Close(); err != nil { - tmpfile.Close() - os.Remove(tmpfile.Name()) - return err - } if err := tmpfile.Close(); err != nil { log.Printf("closing %s: %s\n", tmpfile.Name(), err) os.Remove(tmpfile.Name())