10467: Remove unneeded errorReadCloser, make getter/putter interfaces more like Write...
authorTom Clegg <tom@curoverse.com>
Fri, 30 Dec 2016 19:39:52 +0000 (14:39 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 30 Dec 2016 19:39:52 +0000 (14:39 -0500)
services/keepstore/pipe_adapters.go
services/keepstore/volume_unix.go

index 91aa27094bad1a488b24feb444be06f1d0e957ba..0b3999c1966d77605975b349900ce8427de1cc70 100644 (file)
@@ -5,15 +5,16 @@ import (
        "context"
        "io"
        "io/ioutil"
-       "sync"
 )
 
 // getWithPipe invokes getter and copies the resulting data into
 // buf. If ctx is done before all data is copied, getWithPipe closes
 // the pipe with an error, and returns early with an error.
-func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) {
+func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, io.Writer) error) (int, error) {
        piper, pipew := io.Pipe()
-       go getter(ctx, loc, pipew)
+       go func() {
+               pipew.CloseWithError(getter(ctx, loc, pipew))
+       }()
        done := make(chan struct{})
        var size int
        var err error
@@ -34,30 +35,11 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(contex
        }
 }
 
-type errorReadCloser struct {
-       *io.PipeReader
-       err error
-       mtx sync.Mutex
-}
-
-func (erc *errorReadCloser) Close() error {
-       erc.mtx.Lock()
-       defer erc.mtx.Unlock()
-       erc.PipeReader.Close()
-       return erc.err
-}
-
-func (erc *errorReadCloser) SetError(err error) {
-       erc.mtx.Lock()
-       defer erc.mtx.Unlock()
-       erc.err = err
-}
-
 // putWithPipe invokes putter with a new pipe, and and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
-func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error {
+func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.Reader) error) error {
        piper, pipew := io.Pipe()
        copyErr := make(chan error)
        go func() {
@@ -66,13 +48,9 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex
                close(copyErr)
        }()
 
-       erc := errorReadCloser{
-               PipeReader: piper,
-               err:        nil,
-       }
        putErr := make(chan error, 1)
        go func() {
-               putErr <- putter(ctx, loc, &erc)
+               putErr <- putter(ctx, loc, piper)
                close(putErr)
        }()
 
@@ -86,10 +64,11 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex
 
        // Ensure io.Copy goroutine isn't blocked writing to pipew
        // (otherwise, io.Copy is still using buf so it isn't safe to
-       // return). This can cause pipew to receive corrupt data, so
-       // we first ensure putter() will get an error when calling
-       // erc.Close().
-       erc.SetError(err)
+       // return). This can cause pipew to receive corrupt data if
+       // err came from copyErr or ctx.Done() before the copy
+       // finished. That's OK, though: in that case err != nil, and
+       // CloseWithErr(err) ensures putter() will get an error from
+       // piper.Read() before seeing EOF.
        go pipew.CloseWithError(err)
        go io.Copy(ioutil.Discard, piper)
        <-copyErr
index 459e73a28f501da5145f46259657fcf5d33175ee..56a46790e040f87d2f6c2009f8c733870d4a2662 100644 (file)
@@ -216,21 +216,19 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
        return getWithPipe(ctx, loc, buf, v.get)
 }
 
-func (v *UnixVolume) get(ctx context.Context, loc string, w *io.PipeWriter) {
+func (v *UnixVolume) get(ctx context.Context, loc string, w io.Writer) error {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               w.CloseWithError(v.translateError(err))
-               return
+               return v.translateError(err)
        }
-       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
                n, err := io.Copy(w, rdr)
                if err == nil && n != stat.Size() {
                        err = io.ErrUnexpectedEOF
                }
                return err
        })
-       w.CloseWithError(err)
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -254,7 +252,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
        return putWithPipe(ctx, loc, block, v.put)
 }
 
-func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) error {
+func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.Reader) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
@@ -287,11 +285,6 @@ func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) err
                os.Remove(tmpfile.Name())
                return err
        }
-       if err := rdr.Close(); err != nil {
-               tmpfile.Close()
-               os.Remove(tmpfile.Name())
-               return err
-       }
        if err := tmpfile.Close(); err != nil {
                log.Printf("closing %s: %s\n", tmpfile.Name(), err)
                os.Remove(tmpfile.Name())