1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 // getWithPipe invokes getter and copies the resulting data into
15 // buf. If ctx is done before all data is copied, getWithPipe closes
16 // the pipe with an error, and returns early with an error.
17 func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) {
18 piper, pipew := io.Pipe()
20 pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew))
22 done := make(chan struct{})
26 size, err = io.ReadFull(piper, buf)
27 if err == io.EOF || err == io.ErrUnexpectedEOF {
34 piper.CloseWithError(ctx.Err())
42 // putWithPipe invokes putter with a new pipe, and copies data
43 // from buf into the pipe. If ctx is done before all data is copied,
44 // putWithPipe closes the pipe with an error, and returns early with
46 func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error {
47 piper, pipew := io.Pipe()
48 copyErr := make(chan error)
50 _, err := io.Copy(pipew, bytes.NewReader(buf))
55 putErr := make(chan error, 1)
57 putErr <- bw.WriteBlock(ctx, loc, piper)
69 // Ensure io.Copy goroutine isn't blocked writing to pipew
70 // (otherwise, io.Copy is still using buf so it isn't safe to
71 // return). This can cause pipew to receive corrupt data if
72 // err came from copyErr or ctx.Done() before the copy
73 // finished. That's OK, though: in that case err != nil, and
74 // CloseWithErr(err) ensures putter() will get an error from
75 // piper.Read() before seeing EOF.
76 go pipew.CloseWithError(err)
77 go io.Copy(ioutil.Discard, piper)
80 // Note: io.Copy() is finished now, but putter() might still
81 // be running. If we encounter an error before putter()
82 // returns, we return right away without waiting for putter().