From abc241fb83523ae5ae5905ae47210f15d7e0671c Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 22 Nov 2017 23:11:43 -0500 Subject: [PATCH] 12475: Rewrite streamer -> asyncbuf. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- build/run-tests.sh | 4 +- sdk/go/asyncbuf/buf.go | 101 +++++++ sdk/go/asyncbuf/buf_test.go | 159 +++++++++++ sdk/go/keepclient/hashcheck.go | 15 +- sdk/go/keepclient/keepclient.go | 17 +- sdk/go/keepclient/keepclient_test.go | 16 +- sdk/go/keepclient/support.go | 20 +- sdk/go/streamer/streamer.go | 158 ----------- sdk/go/streamer/streamer_test.go | 381 --------------------------- sdk/go/streamer/transfer.go | 310 ---------------------- 10 files changed, 287 insertions(+), 894 deletions(-) create mode 100644 sdk/go/asyncbuf/buf.go create mode 100644 sdk/go/asyncbuf/buf_test.go delete mode 100644 sdk/go/streamer/streamer.go delete mode 100644 sdk/go/streamer/streamer_test.go delete mode 100644 sdk/go/streamer/transfer.go diff --git a/build/run-tests.sh b/build/run-tests.sh index 433685c5a8..3cfc692aae 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -100,7 +100,7 @@ sdk/go/health sdk/go/httpserver sdk/go/manifest sdk/go/blockdigest -sdk/go/streamer +sdk/go/asyncbuf sdk/go/stats sdk/go/crunchrunner sdk/cwl @@ -829,7 +829,7 @@ gostuff=( sdk/go/health sdk/go/httpserver sdk/go/manifest - sdk/go/streamer + sdk/go/asyncbuf sdk/go/crunchrunner sdk/go/stats lib/crunchstat diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go new file mode 100644 index 0000000000..b3b9bf221a --- /dev/null +++ b/sdk/go/asyncbuf/buf.go @@ -0,0 +1,101 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package asyncbuf + +import ( + "bytes" + "io" + "sync" +) + +// A Buffer is an io.Writer that distributes written data +// asynchronously to multiple concurrent readers. +// +// NewReader() can be called at any time. In all cases, every returned +// io.Reader reads all data written to the Buffer. +// +// Behavior is undefined if Write is called after Close or +// CloseWithError. +type Buffer interface { + io.WriteCloser + + // NewReader() returns an io.Reader that reads all data + // written to the Buffer. + NewReader() io.Reader + + // Close, but return the given error (instead of io.EOF) to + // all readers when they reach the end of the buffer. + // + // CloseWithError(nil) is equivalent to + // CloseWithError(io.EOF). + CloseWithError(error) error +} + +type buffer struct { + data *bytes.Buffer + cond sync.Cond + err error // nil if there might be more writes +} + +// NewBuffer creates a new Buffer using buf as its initial +// contents. The new Buffer takes ownership of buf, and the caller +// should not use buf after this call. +func NewBuffer(buf []byte) Buffer { + return &buffer{ + data: bytes.NewBuffer(buf), + cond: sync.Cond{L: &sync.Mutex{}}, + } +} + +func (b *buffer) Write(p []byte) (int, error) { + defer b.cond.Broadcast() + b.cond.L.Lock() + defer b.cond.L.Unlock() + if b.err != nil { + return 0, b.err + } + return b.data.Write(p) +} + +func (b *buffer) Close() error { + return b.CloseWithError(nil) +} + +func (b *buffer) CloseWithError(err error) error { + defer b.cond.Broadcast() + b.cond.L.Lock() + defer b.cond.L.Unlock() + if err == nil { + b.err = io.EOF + } else { + b.err = err + } + return nil +} + +func (b *buffer) NewReader() io.Reader { + return &reader{b: b} +} + +type reader struct { + b *buffer + read int // # bytes already read +} + +func (r *reader) Read(p []byte) (int, error) { + r.b.cond.L.Lock() + defer r.b.cond.L.Unlock() + for { + if r.b.data.Len() > r.read || len(p) == 0 { + n := copy(p, r.b.data.Bytes()[r.read:]) + r.read += n + return n, nil + } + if r.b.err != nil { + return 0, r.b.err + } + r.b.cond.Wait() + } +} diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go new file mode 100644 index 0000000000..845853bc21 --- /dev/null +++ b/sdk/go/asyncbuf/buf_test.go @@ -0,0 +1,159 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +package asyncbuf + +import ( + "crypto/md5" + "errors" + "io" + "io/ioutil" + "math/rand" + "testing" + "time" + + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&Suite{}) + +type Suite struct{} + +func (s *Suite) TestNoWrites(c *check.C) { + b := NewBuffer(nil) + r1 := b.NewReader() + r2 := b.NewReader() + b.Close() + s.checkReader(c, r1, []byte{}, nil, nil) + s.checkReader(c, r2, []byte{}, nil, nil) +} + +func (s *Suite) TestNoReaders(c *check.C) { + b := NewBuffer(nil) + n, err := b.Write([]byte("foobar")) + err2 := b.Close() + c.Check(n, check.Equals, 6) + c.Check(err, check.IsNil) + c.Check(err2, check.IsNil) +} + +func (s *Suite) TestWriteReadClose(c *check.C) { + done := make(chan bool, 2) + b := NewBuffer(nil) + n, err := b.Write([]byte("foobar")) + c.Check(n, check.Equals, 6) + c.Check(err, check.IsNil) + r1 := b.NewReader() + r2 := b.NewReader() + go s.checkReader(c, r1, []byte("foobar"), nil, done) + go s.checkReader(c, r2, []byte("foobar"), nil, done) + time.Sleep(time.Millisecond) + c.Check(len(done), check.Equals, 0) + b.Close() + <-done + <-done +} + +func (s *Suite) TestPrefillWriteCloseRead(c *check.C) { + done := make(chan bool, 2) + b := NewBuffer([]byte("baz")) + n, err := b.Write([]byte("waz")) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + b.Close() + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwaz"), nil, done) + r2 := b.NewReader() + go s.checkReader(c, r2, []byte("bazwaz"), nil, done) + <-done + <-done +} + +func (s *Suite) TestWriteReadCloseRead(c *check.C) { + done := make(chan bool, 1) + b := NewBuffer(nil) + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwazqux"), nil, done) + + b.Write([]byte("bazwaz")) + + r2 := b.NewReader() + r2.Read(make([]byte, 3)) + + b.Write([]byte("qux")) + b.Close() + + s.checkReader(c, r2, []byte("wazqux"), nil, nil) + <-done +} + +func (s *Suite) TestCloseWithError(c *check.C) { + errFake := errors.New("it's not even a real error") + + done := make(chan bool, 1) + b := NewBuffer(nil) + r1 := b.NewReader() + go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done) + + b.Write([]byte("bazwaz")) + + r2 := b.NewReader() + r2.Read(make([]byte, 3)) + + b.Write([]byte("qux")) + b.CloseWithError(errFake) + + s.checkReader(c, r2, []byte("wazqux"), errFake, nil) + <-done +} + +// Write n*n bytes, n at a time; read them into n goroutines using +// varying buffer sizes; compare checksums. +func (s *Suite) TestManyReaders(c *check.C) { + const n = 256 + + b := NewBuffer(nil) + + expectSum := make(chan []byte) + go func() { + hash := md5.New() + buf := make([]byte, n) + for i := 0; i < n; i++ { + time.Sleep(10 * time.Nanosecond) + rand.Read(buf) + b.Write(buf) + hash.Write(buf) + } + expectSum <- hash.Sum(nil) + b.Close() + }() + + gotSum := make(chan []byte) + for i := 0; i < n; i++ { + go func(bufSize int) { + got := md5.New() + io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize)) + gotSum <- got.Sum(nil) + }(i + n/2) + } + + expect := <-expectSum + for i := 0; i < n; i++ { + c.Check(expect, check.DeepEquals, <-gotSum) + } +} + +func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) { + buf, err := ioutil.ReadAll(r) + c.Check(err, check.Equals, expectError) + c.Check(buf, check.DeepEquals, expectData) + if done != nil { + done <- true + } +} + +// Gocheck boilerplate +func Test(t *testing.T) { + check.TestingT(t) +} diff --git a/sdk/go/keepclient/hashcheck.go b/sdk/go/keepclient/hashcheck.go index 726b81362c..9295c14cc2 100644 --- a/sdk/go/keepclient/hashcheck.go +++ b/sdk/go/keepclient/hashcheck.go @@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) { _, err = io.Copy(this.Hash, this.Reader) if closer, ok := this.Reader.(io.Closer); ok { - err2 := closer.Close() - if err2 != nil && err == nil { - return err2 + closeErr := closer.Close() + if err == nil { + err = closeErr } } if err != nil { return err } - - sum := this.Hash.Sum(nil) - if fmt.Sprintf("%x", sum) != this.Check { - err = BadChecksum + if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check { + return BadChecksum } - - return err + return nil } diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index cbfad8177d..37d651e31f 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -21,7 +21,7 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" + "git.curoverse.com/arvados.git/sdk/go/asyncbuf" ) // A Keep "block" is 64MB. @@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, bufsize = BLOCKSIZE } - t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash}) - defer t.Close() - - return kc.putReplicas(hash, t, dataBytes) + buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize)) + go func() { + _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash}) + buf.CloseWithError(err) + }() + return kc.putReplicas(hash, buf.NewReader, dataBytes) } // PutHB writes a block to Keep. The hash of the bytes is given in @@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, // // Return values are the same as for PutHR. func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) { - t := streamer.AsyncStreamFromSlice(buf) - defer t.Close() - return kc.putReplicas(hash, t, int64(len(buf))) + newReader := func() io.Reader { return bytes.NewBuffer(buf) } + return kc.putReplicas(hash, newReader, int64(len(buf))) } // PutB writes a block to Keep. It computes the hash itself. diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index 3ce4e7425a..055141cbe8 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -5,6 +5,7 @@ package keepclient import ( + "bytes" "crypto/md5" "errors" "fmt" @@ -20,7 +21,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/streamer" . "gopkg.in/check.v1" ) @@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan uploadStatus) { - - tr := streamer.AsyncStreamFromReader(512, reader) - defer tr.Close() - - br1 := tr.MakeStreamReader() - - go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0) - - writer.Write([]byte("foo")) - writer.Close() + func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) { + go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0) <-st.handled diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 49ef543d87..37912506a2 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -17,7 +17,6 @@ import ( "strings" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" ) // Function used to emit debug messages. The easiest way to enable @@ -57,7 +56,7 @@ type uploadStatus struct { response string } -func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, +func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader, upload_status chan<- uploadStatus, expectedLength int64, requestID int32) { var req *http.Request @@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea if req, err = http.NewRequest("PUT", url, nil); err != nil { DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error()) upload_status <- uploadStatus{err, url, 0, 0, ""} - body.Close() return } req.ContentLength = expectedLength if expectedLength > 0 { - // Do() will close the body ReadCloser when it is done - // with it. - req.Body = body + req.Body = ioutil.NopCloser(body) } else { - // "For client requests, a value of 0 means unknown if Body is - // not nil." In this case we do want the body to be empty, so - // don't set req.Body. However, we still need to close the - // body ReadCloser. - body.Close() + // "For client requests, a value of 0 means unknown if + // Body is not nil." In this case we do want the body + // to be empty, so don't set req.Body. } req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken)) @@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea func (this *KeepClient) putReplicas( hash string, - tr *streamer.AsyncStream, + getReader func() io.Reader, expectedLength int64) (locator string, replicas int, err error) { // Generate an arbitrary ID to identify this specific @@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas( // Start some upload requests if next_server < len(sv) { DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server]) - go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID) + go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID) next_server += 1 active += 1 } else { diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go deleted file mode 100644 index 396e311038..0000000000 --- a/sdk/go/streamer/streamer.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -/* AsyncStream pulls data in from a io.Reader source (such as a file or network -socket) and fans out to any number of StreamReader sinks. - -Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at -any point in the lifetime of the AsyncStream, and each StreamReader will read -the contents of the buffer up to the "frontier" of the buffer, at which point -the StreamReader blocks until new data is read from the source. - -This is useful for minimizing readthrough latency as sinks can read and act on -data from the source without waiting for the source to be completely buffered. -It is also useful as a cache in situations where re-reading the original source -potentially is costly, since the buffer retains a copy of the source data. - -Usage: - -Begin reading into a buffer with maximum size 'buffersize' from 'source': - stream := AsyncStreamFromReader(buffersize, source) - -To create a new reader (this can be called multiple times, each reader starts -at the beginning of the buffer): - reader := tr.MakeStreamReader() - -Make sure to close the reader when you're done with it. - reader.Close() - -When you're done with the stream: - stream.Close() - -Alternately, if you already have a filled buffer and just want to read out from it: - stream := AsyncStreamFromSlice(buf) - - r := tr.MakeStreamReader() - -*/ - -package streamer - -import ( - "errors" - "io" -) - -var ErrAlreadyClosed = errors.New("cannot close a stream twice") - -type AsyncStream struct { - buffer []byte - requests chan sliceRequest - add_reader chan bool - subtract_reader chan bool - wait_zero_readers chan bool - closed bool -} - -// Reads from the buffer managed by the Transfer() -type StreamReader struct { - offset int - stream *AsyncStream - responses chan sliceResult -} - -func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream { - t := &AsyncStream{ - buffer: make([]byte, buffersize), - requests: make(chan sliceRequest), - add_reader: make(chan bool), - subtract_reader: make(chan bool), - wait_zero_readers: make(chan bool), - } - - go t.transfer(source) - go t.readersMonitor() - - return t -} - -func AsyncStreamFromSlice(buf []byte) *AsyncStream { - t := &AsyncStream{ - buffer: buf, - requests: make(chan sliceRequest), - add_reader: make(chan bool), - subtract_reader: make(chan bool), - wait_zero_readers: make(chan bool), - } - - go t.transfer(nil) - go t.readersMonitor() - - return t -} - -func (this *AsyncStream) MakeStreamReader() *StreamReader { - this.add_reader <- true - return &StreamReader{0, this, make(chan sliceResult)} -} - -// Reads from the buffer managed by the Transfer() -func (this *StreamReader) Read(p []byte) (n int, err error) { - this.stream.requests <- sliceRequest{this.offset, len(p), this.responses} - rr, valid := <-this.responses - if valid { - this.offset += len(rr.slice) - return copy(p, rr.slice), rr.err - } else { - return 0, io.ErrUnexpectedEOF - } -} - -func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) { - // Record starting offset in order to correctly report the number of bytes sent - starting_offset := this.offset - for { - this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses} - rr, valid := <-this.responses - if valid { - this.offset += len(rr.slice) - if rr.err != nil { - if rr.err == io.EOF { - // EOF is not an error. - return int64(this.offset - starting_offset), nil - } else { - return int64(this.offset - starting_offset), rr.err - } - } else { - dest.Write(rr.slice) - } - } else { - return int64(this.offset), io.ErrUnexpectedEOF - } - } -} - -// Close the responses channel -func (this *StreamReader) Close() error { - if this.stream == nil { - return ErrAlreadyClosed - } - this.stream.subtract_reader <- true - close(this.responses) - this.stream = nil - return nil -} - -func (this *AsyncStream) Close() error { - if this.closed { - return ErrAlreadyClosed - } - this.closed = true - this.wait_zero_readers <- true - close(this.requests) - close(this.add_reader) - close(this.subtract_reader) - close(this.wait_zero_readers) - return nil -} diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go deleted file mode 100644 index f8ddbf5a4c..0000000000 --- a/sdk/go/streamer/streamer_test.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -package streamer - -import ( - . "gopkg.in/check.v1" - "io" - "testing" - "time" -) - -// Gocheck boilerplate -func Test(t *testing.T) { TestingT(t) } - -var _ = Suite(&StandaloneSuite{}) - -// Standalone tests -type StandaloneSuite struct{} - -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) -} - -func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } -} - -func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } -} - -func ReadIntoBufferHelper(c *C, bufsize int) { - buffer := make([]byte, bufsize) - - reader, writer := io.Pipe() - slices := make(chan nextSlice) - - go readIntoBuffer(buffer, reader, slices) - - HelperWrite128andCheck(c, buffer, writer, slices) - HelperWrite96andCheck(c, buffer, writer, slices) - - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) -} - -func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { - buffer := make([]byte, 223) - reader, writer := io.Pipe() - slices := make(chan nextSlice) - - go readIntoBuffer(buffer, reader, slices) - - HelperWrite128andCheck(c, buffer, writer, slices) - - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices - - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - - writer.Close() - s1 = <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransfer(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(512, reader) - - br1 := tr.MakeStreamReader() - out := make([]byte, 128) - - { - // Write some data, and read into a buffer shorter than - // available data - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - - writer.Write(out[:100]) - - in := make([]byte, 64) - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Write some more data, and read into buffer longer than - // available data - in := make([]byte, 64) - n, err := br1.Read(in) - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, out[64+i]) - } - - } - - { - // Test read before write - type Rd struct { - n int - err error - } - rd := make(chan Rd) - in := make([]byte, 64) - - go func() { - n, err := br1.Read(in) - rd <- Rd{n, err} - }() - - time.Sleep(100 * time.Millisecond) - writer.Write(out[100:]) - - got := <-rd - - c.Check(got.n, Equals, 28) - c.Check(got.err, Equals, nil) - - for i := 0; i < 28; i += 1 { - c.Check(in[i], Equals, out[100+i]) - } - } - - br2 := tr.MakeStreamReader() - { - // Test 'catch up' reader - in := make([]byte, 256) - n, err := br2.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Test closing the reader - writer.Close() - - in := make([]byte, 256) - n1, err1 := br1.Read(in) - n2, err2 := br2.Read(in) - c.Check(n1, Equals, 0) - c.Check(err1, Equals, io.EOF) - c.Check(n2, Equals, 0) - c.Check(err2, Equals, io.EOF) - } - - { - // Test 'catch up' reader after closing - br3 := tr.MakeStreamReader() - in := make([]byte, 256) - n, err := br3.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - - n, err = br3.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(100, reader) - defer tr.Close() - - sr := tr.MakeStreamReader() - defer sr.Close() - - out := make([]byte, 101) - go writer.Write(out) - - n, err := sr.Read(out) - c.Check(n, Equals, 100) - c.Check(err, IsNil) - - n, err = sr.Read(out) - c.Check(n, Equals, 0) - c.Check(err, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - tr := AsyncStreamFromSlice(buffer) - - br1 := tr.MakeStreamReader() - - in := make([]byte, 64) - { - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, buffer[i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, buffer[64+i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferIoCopy(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - tr := AsyncStreamFromSlice(buffer) - defer tr.Close() - - br1 := tr.MakeStreamReader() - defer br1.Close() - - reader, writer := io.Pipe() - - go func() { - p := make([]byte, 100) - n, err := reader.Read(p) - c.Check(n, Equals, 100) - c.Check(err, Equals, nil) - c.Check(p, DeepEquals, buffer) - }() - - io.Copy(writer, br1) -} - -func (s *StandaloneSuite) TestManyReaders(c *C) { - reader, writer := io.Pipe() - - tr := AsyncStreamFromReader(512, reader) - defer tr.Close() - - sr := tr.MakeStreamReader() - go func() { - time.Sleep(100 * time.Millisecond) - sr.Close() - }() - - for i := 0; i < 200; i += 1 { - go func() { - br1 := tr.MakeStreamReader() - defer br1.Close() - - p := make([]byte, 3) - n, err := br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("foo")) - - n, err = br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("bar")) - - n, err = br1.Read(p) - c.Check(n, Equals, 3) - c.Check(p[0:3], DeepEquals, []byte("baz")) - - n, err = br1.Read(p) - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - }() - } - - writer.Write([]byte("foo")) - writer.Write([]byte("bar")) - writer.Write([]byte("baz")) - writer.Close() -} - -func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) { - buffer := make([]byte, 100) - tr := AsyncStreamFromSlice(buffer) - sr := tr.MakeStreamReader() - c.Check(sr.Close(), IsNil) - c.Check(sr.Close(), Equals, ErrAlreadyClosed) - c.Check(tr.Close(), IsNil) - c.Check(tr.Close(), Equals, ErrAlreadyClosed) -} diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go deleted file mode 100644 index bea27f8f81..0000000000 --- a/sdk/go/streamer/transfer.go +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: Apache-2.0 - -/* Internal implementation of AsyncStream. -Outline of operation: - -The kernel is the transfer() goroutine. It manages concurrent reads and -appends to the "body" slice. "body" is a slice of "source_buffer" that -represents the segment of the buffer that is already filled in and available -for reading. - -To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read -from the io.Reader source directly into source_buffer. Each read goes into a -slice of buffer which spans the section immediately following the end of the -current "body". Each time a Read completes, a slice representing the the -section just filled in (or any read errors/EOF) is sent over the "slices" -channel back to the transfer() function. - -Meanwhile, the transfer() function selects() on two channels, the "requests" -channel and the "slices" channel. - -When a message is received on the "slices" channel, this means the a new -section of the buffer has data, or an error is signaled. Since the data has -been read directly into the source_buffer, it is able to simply increases the -size of the body slice to encompass the newly filled in section. Then any -pending reads are serviced with handleReadRequest (described below). - -When a message is received on the "requests" channel, it means a StreamReader -wants access to a slice of the buffer. This is passed to handleReadRequest(). - -The handleReadRequest() function takes a sliceRequest consisting of a buffer -offset, maximum size, and channel to send the response. If there was an error -reported from the source reader, it is returned. If the offset is less than -the size of the body, the request can proceed, and it sends a body slice -spanning the segment from offset to min(offset+maxsize, end of the body). If -source reader status is EOF (done filling the buffer) and the read request -offset is beyond end of the body, it responds with EOF. Otherwise, the read -request is for a slice beyond the current size of "body" but we expect the body -to expand as more data is added, so the request gets added to a wait list. - -The transfer() runs until the requests channel is closed by AsyncStream.Close() - -To track readers, streamer uses the readersMonitor() goroutine. This goroutine -chooses which channels to receive from based on the number of outstanding -readers. When a new reader is created, it sends a message on the add_reader -channel. If the number of readers is already at MAX_READERS, this blocks the -sender until an existing reader is closed. When a reader is closed, it sends a -message on the subtract_reader channel. Finally, when AsyncStream.Close() is -called, it sends a message on the wait_zero_readers channel, which will block -the sender unless there are zero readers and it is safe to shut down the -AsyncStream. -*/ - -package streamer - -import ( - "io" -) - -const MAX_READERS = 100 - -// A slice passed from readIntoBuffer() to transfer() -type nextSlice struct { - slice []byte - reader_error error -} - -// A read request to the Transfer() function -type sliceRequest struct { - offset int - maxsize int - result chan<- sliceResult -} - -// A read result from the Transfer() function -type sliceResult struct { - slice []byte - err error -} - -// Supports writing into a buffer -type bufferWriter struct { - buf []byte - ptr int -} - -// Copy p into this.buf, increment pointer and return number of bytes read. -func (this *bufferWriter) Write(p []byte) (n int, err error) { - n = copy(this.buf[this.ptr:], p) - this.ptr += n - return n, nil -} - -// Read repeatedly from the reader and write sequentially into the specified -// buffer, and report each read to channel 'c'. Completes when Reader 'r' -// reports on the error channel and closes channel 'c'. -func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) { - defer close(slices) - - if writeto, ok := r.(io.WriterTo); ok { - n, err := writeto.WriteTo(&bufferWriter{buffer, 0}) - if err != nil { - slices <- nextSlice{nil, err} - } else { - slices <- nextSlice{buffer[:n], nil} - slices <- nextSlice{nil, io.EOF} - } - return - } else { - // Initially entire buffer is available - ptr := buffer[:] - for { - var n int - var err error - if len(ptr) > 0 { - const readblock = 64 * 1024 - // Read 64KiB into the next part of the buffer - if len(ptr) > readblock { - n, err = r.Read(ptr[:readblock]) - } else { - n, err = r.Read(ptr) - } - } else { - // Ran out of buffer space, try reading one more byte - var b [1]byte - n, err = r.Read(b[:]) - - if n > 0 { - // Reader has more data but we have nowhere to - // put it, so we're stuffed - slices <- nextSlice{nil, io.ErrShortBuffer} - } else { - // Return some other error (hopefully EOF) - slices <- nextSlice{nil, err} - } - return - } - - // End on error (includes EOF) - if err != nil { - slices <- nextSlice{nil, err} - return - } - - if n > 0 { - // Make a slice with the contents of the read - slices <- nextSlice{ptr[:n], nil} - - // Adjust the scratch space slice - ptr = ptr[n:] - } - } - } -} - -// Handle a read request. Returns true if a response was sent, and false if -// the request should be queued. -func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool { - if (reader_status != nil) && (reader_status != io.EOF) { - req.result <- sliceResult{nil, reader_status} - return true - } else if req.offset < len(body) { - var end int - if req.offset+req.maxsize < len(body) { - end = req.offset + req.maxsize - } else { - end = len(body) - } - req.result <- sliceResult{body[req.offset:end], nil} - return true - } else if (reader_status == io.EOF) && (req.offset >= len(body)) { - req.result <- sliceResult{nil, io.EOF} - return true - } else { - return false - } -} - -// Mediates between reads and appends. -// If 'source_reader' is not nil, reads data from 'source_reader' and stores it -// in the provided buffer. Otherwise, use the contents of 'buffer' as is. -// Accepts read requests on the buffer on the 'requests' channel. Completes -// when 'requests' channel is closed. -func (this *AsyncStream) transfer(source_reader io.Reader) { - source_buffer := this.buffer - requests := this.requests - - // currently buffered data - var body []byte - - // for receiving slices from readIntoBuffer - var slices chan nextSlice = nil - - // indicates the status of the underlying reader - var reader_status error = nil - - if source_reader != nil { - // 'body' is the buffer slice representing the body content read so far - body = source_buffer[:0] - - // used to communicate slices of the buffer as they are - // readIntoBuffer will close 'slices' when it is done with it - slices = make(chan nextSlice) - - // Spin it off - go readIntoBuffer(source_buffer, source_reader, slices) - } else { - // use the whole buffer - body = source_buffer[:] - - // buffer is complete - reader_status = io.EOF - } - - pending_requests := make([]sliceRequest, 0) - - for { - select { - case req, valid := <-requests: - // Handle a buffer read request - if valid { - if !handleReadRequest(req, body, reader_status) { - pending_requests = append(pending_requests, req) - } - } else { - // closed 'requests' channel indicates we're done - return - } - - case bk, valid := <-slices: - // Got a new slice from the reader - if valid { - reader_status = bk.reader_error - - if bk.slice != nil { - // adjust body bounds now that another slice has been read - body = source_buffer[0 : len(body)+len(bk.slice)] - } - - // handle pending reads - n := 0 - for n < len(pending_requests) { - if handleReadRequest(pending_requests[n], body, reader_status) { - // move the element from the back of the slice to - // position 'n', then shorten the slice by one element - pending_requests[n] = pending_requests[len(pending_requests)-1] - pending_requests = pending_requests[0 : len(pending_requests)-1] - } else { - - // Request wasn't handled, so keep it in the request slice - n += 1 - } - } - } else { - if reader_status == nil { - // slices channel closed without signaling EOF - reader_status = io.ErrUnexpectedEOF - } - slices = nil - } - } - } -} - -func (this *AsyncStream) readersMonitor() { - var readers int = 0 - - for { - if readers == 0 { - select { - case _, ok := <-this.wait_zero_readers: - if ok { - // nothing, just implicitly unblock the sender - } else { - return - } - case _, ok := <-this.add_reader: - if ok { - readers += 1 - } else { - return - } - } - } else if readers > 0 && readers < MAX_READERS { - select { - case _, ok := <-this.add_reader: - if ok { - readers += 1 - } else { - return - } - - case _, ok := <-this.subtract_reader: - if ok { - readers -= 1 - } else { - return - } - } - } else if readers == MAX_READERS { - _, ok := <-this.subtract_reader - if ok { - readers -= 1 - } else { - return - } - } - } -} -- 2.39.5