+++ /dev/null
-package main
-
-import (
- "bytes"
- "context"
- "io"
- "io/ioutil"
- "sync"
-)
-
-// getWithPipe invokes getter and copies the resulting data into
-// buf. If ctx is done before all data is copied, getWithPipe closes
-// the pipe with an error, and returns early with an error.
-func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) {
- piper, pipew := io.Pipe()
- go getter(ctx, loc, pipew)
- done := make(chan struct{})
- var size int
- var err error
- go func() {
- size, err = io.ReadFull(piper, buf)
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- err = nil
- }
- close(done)
- }()
- select {
- case <-ctx.Done():
- piper.CloseWithError(ctx.Err())
- return 0, ctx.Err()
- case <-done:
- piper.Close()
- return size, err
- }
-}
-
-type errorReadCloser struct {
- *io.PipeReader
- err error
- mtx sync.Mutex
-}
-
-func (erc *errorReadCloser) Close() error {
- erc.mtx.Lock()
- defer erc.mtx.Unlock()
- erc.PipeReader.Close()
- return erc.err
-}
-
-func (erc *errorReadCloser) SetError(err error) {
- erc.mtx.Lock()
- defer erc.mtx.Unlock()
- erc.err = err
-}
-
-// putWithPipe invokes putter with a new pipe, and and copies data
-// from buf into the pipe. If ctx is done before all data is copied,
-// putWithPipe closes the pipe with an error, and returns early with
-// an error.
-func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error {
- piper, pipew := io.Pipe()
- copyErr := make(chan error)
- go func() {
- _, err := io.Copy(pipew, bytes.NewReader(buf))
- copyErr <- err
- close(copyErr)
- }()
-
- erc := errorReadCloser{
- PipeReader: piper,
- err: nil,
- }
- putErr := make(chan error, 1)
- go func() {
- putErr <- putter(ctx, loc, &erc)
- close(putErr)
- }()
-
- var err error
- select {
- case err = <-copyErr:
- case err = <-putErr:
- case <-ctx.Done():
- err = ctx.Err()
- }
-
- // Ensure io.Copy goroutine isn't blocked writing to pipew
- // (otherwise, io.Copy is still using buf so it isn't safe to
- // return). This can cause pipew to receive corrupt data, so
- // we first ensure putter() will get an error when calling
- // erc.Close().
- erc.SetError(err)
- go pipew.CloseWithError(err)
- go io.Copy(ioutil.Discard, piper)
- <-copyErr
-
- // Note: io.Copy() is finished now, but putter() might still
- // be running. If we encounter an error before putter()
- // returns, we return right away without waiting for putter().
-
- if err != nil {
- return err
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err = <-putErr:
- return err
- }
-}