X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/568c7abf660b7a68f70b6ea47ae2e7352233f053..5cc1710b57f98905469225c68d975ad2e3e7e56d:/services/keepstore/pipe_adapters.go diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go index 91aa27094b..6b555054b6 100644 --- a/services/keepstore/pipe_adapters.go +++ b/services/keepstore/pipe_adapters.go @@ -1,19 +1,24 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "bytes" "context" "io" "io/ioutil" - "sync" ) // getWithPipe invokes getter and copies the resulting data into // buf. If ctx is done before all data is copied, getWithPipe closes // the pipe with an error, and returns early with an error. -func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) { +func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (int, error) { piper, pipew := io.Pipe() - go getter(ctx, loc, pipew) + go func() { + pipew.CloseWithError(br.ReadBlock(ctx, loc, pipew)) + }() done := make(chan struct{}) var size int var err error @@ -34,30 +39,11 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(contex } } -type errorReadCloser struct { - *io.PipeReader - err error - mtx sync.Mutex -} - -func (erc *errorReadCloser) Close() error { - erc.mtx.Lock() - defer erc.mtx.Unlock() - erc.PipeReader.Close() - return erc.err -} - -func (erc *errorReadCloser) SetError(err error) { - erc.mtx.Lock() - defer erc.mtx.Unlock() - erc.err = err -} - -// putWithPipe invokes putter with a new pipe, and and copies data +// putWithPipe invokes putter with a new pipe, and copies data // from buf into the pipe. If ctx is done before all data is copied, // putWithPipe closes the pipe with an error, and returns early with // an error. -func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error { +func putWithPipe(ctx context.Context, loc string, buf []byte, bw BlockWriter) error { piper, pipew := io.Pipe() copyErr := make(chan error) go func() { @@ -66,13 +52,9 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex close(copyErr) }() - erc := errorReadCloser{ - PipeReader: piper, - err: nil, - } putErr := make(chan error, 1) go func() { - putErr <- putter(ctx, loc, &erc) + putErr <- bw.WriteBlock(ctx, loc, piper) close(putErr) }() @@ -86,10 +68,11 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex // Ensure io.Copy goroutine isn't blocked writing to pipew // (otherwise, io.Copy is still using buf so it isn't safe to - // return). This can cause pipew to receive corrupt data, so - // we first ensure putter() will get an error when calling - // erc.Close(). - erc.SetError(err) + // return). This can cause pipew to receive corrupt data if + // err came from copyErr or ctx.Done() before the copy + // finished. That's OK, though: in that case err != nil, and + // CloseWithErr(err) ensures putter() will get an error from + // piper.Read() before seeing EOF. go pipew.CloseWithError(err) go io.Copy(ioutil.Discard, piper) <-copyErr