From 4ec57745d2106e955fea4442c9eccb2fce7246c4 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 15 May 2014 16:36:18 -0400 Subject: [PATCH] Moved non-keep-specific buffering code into a separate package. --- sdk/go/src/arvados.org/buffer/buffer.go | 243 +++++++++++++ sdk/go/src/arvados.org/buffer/buffer_test.go | 364 +++++++++++++++++++ 2 files changed, 607 insertions(+) create mode 100644 sdk/go/src/arvados.org/buffer/buffer.go create mode 100644 sdk/go/src/arvados.org/buffer/buffer_test.go diff --git a/sdk/go/src/arvados.org/buffer/buffer.go b/sdk/go/src/arvados.org/buffer/buffer.go new file mode 100644 index 0000000000..6af1dd0fa7 --- /dev/null +++ b/sdk/go/src/arvados.org/buffer/buffer.go @@ -0,0 +1,243 @@ +package buffer + +import ( + "io" + "log" +) + +type ReaderSlice struct { + slice []byte + reader_error error +} + +// Read repeatedly from the reader into the specified buffer, and report each +// read to channel 'c'. Completes when Reader 'r' reports on the error channel +// and closes channel 'c'. +func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) { + defer close(slices) + + // Initially use entire buffer as scratch space + ptr := buffer[:] + for { + var n int + var err error + if len(ptr) > 0 { + // Read into the scratch space + n, err = r.Read(ptr) + } else { + // Ran out of scratch space, try reading one more byte + var b [1]byte + n, err = r.Read(b[:]) + + if n > 0 { + // Reader has more data but we have nowhere to + // put it, so we're stuffed + slices <- ReaderSlice{nil, io.ErrShortBuffer} + } else { + // Return some other error (hopefully EOF) + slices <- ReaderSlice{nil, err} + } + return + } + + // End on error (includes EOF) + if err != nil { + slices <- ReaderSlice{nil, err} + return + } + + if n > 0 { + // Make a slice with the contents of the read + slices <- ReaderSlice{ptr[:n], nil} + + // Adjust the scratch space slice + ptr = ptr[n:] + } + } +} + +// A read request to the Transfer() function +type ReadRequest struct { + offset int + maxsize int + result chan<- ReadResult +} + +// A read result from the Transfer() function +type ReadResult struct { + slice []byte + err error +} + +// Reads from the buffer managed by the Transfer() +type BufferReader struct { + offset *int + requests chan<- ReadRequest + responses chan ReadResult +} + +func MakeBufferReader(requests chan<- ReadRequest) BufferReader { + return BufferReader{new(int), requests, make(chan ReadResult)} +} + +// Reads from the buffer managed by the Transfer() +func (this BufferReader) Read(p []byte) (n int, err error) { + this.requests <- ReadRequest{*this.offset, len(p), this.responses} + rr, valid := <-this.responses + if valid { + *this.offset += len(rr.slice) + return copy(p, rr.slice), rr.err + } else { + return 0, io.ErrUnexpectedEOF + } +} + +func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) { + // Record starting offset in order to correctly report the number of bytes sent + starting_offset := *this.offset + for { + this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses} + rr, valid := <-this.responses + if valid { + log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err) + *this.offset += len(rr.slice) + if rr.err != nil { + if rr.err == io.EOF { + // EOF is not an error. + return int64(*this.offset - starting_offset), nil + } else { + return int64(*this.offset - starting_offset), rr.err + } + } else { + dest.Write(rr.slice) + } + } else { + return int64(*this.offset), io.ErrUnexpectedEOF + } + } +} + +// Close the responses channel +func (this BufferReader) Close() error { + close(this.responses) + return nil +} + +// Handle a read request. Returns true if a response was sent, and false if +// the request should be queued. +func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool { + log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body)) + if req.offset < len(body) { + var end int + if req.offset+req.maxsize < len(body) { + end = req.offset + req.maxsize + } else { + end = len(body) + } + req.result <- ReadResult{body[req.offset:end], nil} + return true + } else if complete && req.offset >= len(body) { + req.result <- ReadResult{nil, io.EOF} + return true + } else { + return false + } +} + +// If 'source_reader' is not nil, reads data from 'source_reader' and stores it +// in the provided buffer. Otherwise, use the contents of 'buffer' as is. +// Accepts read requests on the buffer on the 'requests' channel. Completes +// when 'requests' channel is closed. +func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) { + // currently buffered data + var body []byte + + // for receiving slices from ReadIntoBuffer + var slices chan ReaderSlice = nil + + // indicates whether the buffered data is complete + var complete bool = false + + if source_reader != nil { + // 'body' is the buffer slice representing the body content read so far + body = source_buffer[:0] + + // used to communicate slices of the buffer as they are + // ReadIntoBuffer will close 'slices' when it is done with it + slices = make(chan ReaderSlice) + + // Spin it off + go ReadIntoBuffer(source_buffer, source_reader, slices) + } else { + // use the whole buffer + body = source_buffer[:] + + // buffer is complete + complete = true + } + + pending_requests := make([]ReadRequest, 0) + + for { + select { + case req, valid := <-requests: + // Handle a buffer read request + if valid { + if !HandleReadRequest(req, body, complete) { + pending_requests = append(pending_requests, req) + } + } else { + // closed 'requests' channel indicates we're done + return + } + + case bk, valid := <-slices: + // Got a new slice from the reader + if valid { + if bk.reader_error != nil { + reader_error <- bk.reader_error + if bk.reader_error == io.EOF { + // EOF indicates the reader is done + // sending, so our buffer is complete. + complete = true + } else { + // some other reader error + return + } + } + + if bk.slice != nil { + // adjust body bounds now that another slice has been read + body = source_buffer[0 : len(body)+len(bk.slice)] + } + + // handle pending reads + n := 0 + for n < len(pending_requests) { + if HandleReadRequest(pending_requests[n], body, complete) { + + // move the element from the + // back of the slice to + // position 'n', then shorten + // the slice by one element + pending_requests[n] = pending_requests[len(pending_requests)-1] + pending_requests = pending_requests[0 : len(pending_requests)-1] + } else { + + // Request wasn't handled, so keep it in the request slice + n += 1 + } + } + } else { + if complete { + // no more reads + slices = nil + } else { + // reader channel closed without signaling EOF + reader_error <- io.ErrUnexpectedEOF + return + } + } + } + } +} diff --git a/sdk/go/src/arvados.org/buffer/buffer_test.go b/sdk/go/src/arvados.org/buffer/buffer_test.go new file mode 100644 index 0000000000..28e1e66dfc --- /dev/null +++ b/sdk/go/src/arvados.org/buffer/buffer_test.go @@ -0,0 +1,364 @@ +package buffer + +import ( + . "gopkg.in/check.v1" + "io" + "testing" + "time" +) + +// Gocheck boilerplate +func Test(t *testing.T) { TestingT(t) } + +var _ = Suite(&StandaloneSuite{}) + +// Standalone tests +type StandaloneSuite struct{} + +func ReadIntoBufferHelper(c *C, bufsize int) { + buffer := make([]byte, bufsize) + + reader, writer := io.Pipe() + slices := make(chan ReaderSlice) + + go ReadIntoBuffer(buffer, reader, slices) + + { + out := make([]byte, 128) + for i := 0; i < 128; i += 1 { + out[i] = byte(i) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 128) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 128; i += 1 { + c.Check(s1.slice[i], Equals, byte(i)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } + } + { + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 96) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 96; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 96) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } + } + { + writer.Close() + s1 := <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.EOF) + } +} + +func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { + ReadIntoBufferHelper(c, 512) + ReadIntoBufferHelper(c, 225) + ReadIntoBufferHelper(c, 224) +} + +func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { + buffer := make([]byte, 223) + reader, writer := io.Pipe() + slices := make(chan ReaderSlice) + + go ReadIntoBuffer(buffer, reader, slices) + + { + out := make([]byte, 128) + for i := 0; i < 128; i += 1 { + out[i] = byte(i) + } + writer.Write(out) + s1 := <-slices + c.Check(len(s1.slice), Equals, 128) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 128; i += 1 { + c.Check(s1.slice[i], Equals, byte(i)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } + } + { + out := make([]byte, 96) + for i := 0; i < 96; i += 1 { + out[i] = byte(i / 2) + } + + // Write will deadlock because it can't write all the data, so + // spin it off to a goroutine + go writer.Write(out) + s1 := <-slices + + c.Check(len(s1.slice), Equals, 95) + c.Check(s1.reader_error, Equals, nil) + for i := 0; i < 95; i += 1 { + c.Check(s1.slice[i], Equals, byte(i/2)) + } + for i := 0; i < len(buffer); i += 1 { + if i < 128 { + c.Check(buffer[i], Equals, byte(i)) + } else if i < (128 + 95) { + c.Check(buffer[i], Equals, byte((i-128)/2)) + } else { + c.Check(buffer[i], Equals, byte(0)) + } + } + } + { + writer.Close() + s1 := <-slices + c.Check(len(s1.slice), Equals, 0) + c.Check(s1.reader_error, Equals, io.ErrShortBuffer) + } + +} + +func (s *StandaloneSuite) TestTransfer(c *C) { + reader, writer := io.Pipe() + + // Buffer for reads from 'r' + buffer := make([]byte, 512) + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + // Reporting reader error states + reader_status := make(chan error) + + go Transfer(buffer, reader, requests, reader_status) + + br1 := MakeBufferReader(requests) + out := make([]byte, 128) + + { + // Write some data, and read into a buffer shorter than + // available data + for i := 0; i < 128; i += 1 { + out[i] = byte(i) + } + + writer.Write(out[:100]) + + in := make([]byte, 64) + n, err := br1.Read(in) + + c.Check(n, Equals, 64) + c.Check(err, Equals, nil) + + for i := 0; i < 64; i += 1 { + c.Check(in[i], Equals, out[i]) + } + } + + { + // Write some more data, and read into buffer longer than + // available data + in := make([]byte, 64) + n, err := br1.Read(in) + c.Check(n, Equals, 36) + c.Check(err, Equals, nil) + + for i := 0; i < 36; i += 1 { + c.Check(in[i], Equals, out[64+i]) + } + + } + + { + // Test read before write + type Rd struct { + n int + err error + } + rd := make(chan Rd) + in := make([]byte, 64) + + go func() { + n, err := br1.Read(in) + rd <- Rd{n, err} + }() + + time.Sleep(100 * time.Millisecond) + writer.Write(out[100:]) + + got := <-rd + + c.Check(got.n, Equals, 28) + c.Check(got.err, Equals, nil) + + for i := 0; i < 28; i += 1 { + c.Check(in[i], Equals, out[100+i]) + } + } + + br2 := MakeBufferReader(requests) + { + // Test 'catch up' reader + in := make([]byte, 256) + n, err := br2.Read(in) + + c.Check(n, Equals, 128) + c.Check(err, Equals, nil) + + for i := 0; i < 128; i += 1 { + c.Check(in[i], Equals, out[i]) + } + } + + { + // Test closing the reader + writer.Close() + status := <-reader_status + c.Check(status, Equals, io.EOF) + + in := make([]byte, 256) + n1, err1 := br1.Read(in) + n2, err2 := br2.Read(in) + c.Check(n1, Equals, 0) + c.Check(err1, Equals, io.EOF) + c.Check(n2, Equals, 0) + c.Check(err2, Equals, io.EOF) + } + + { + // Test 'catch up' reader after closing + br3 := MakeBufferReader(requests) + in := make([]byte, 256) + n, err := br3.Read(in) + + c.Check(n, Equals, 128) + c.Check(err, Equals, nil) + + for i := 0; i < 128; i += 1 { + c.Check(in[i], Equals, out[i]) + } + + n, err = br3.Read(in) + + c.Check(n, Equals, 0) + c.Check(err, Equals, io.EOF) + } +} + +func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { + reader, writer := io.Pipe() + + // Buffer for reads from 'r' + buffer := make([]byte, 100) + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + // Reporting reader error states + reader_status := make(chan error) + + go Transfer(buffer, reader, requests, reader_status) + + out := make([]byte, 101) + go writer.Write(out) + + status := <-reader_status + c.Check(status, Equals, io.ErrShortBuffer) +} + +func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { + // Buffer for reads from 'r' + buffer := make([]byte, 100) + for i := 0; i < 100; i += 1 { + buffer[i] = byte(i) + } + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + go Transfer(buffer, nil, requests, nil) + + br1 := MakeBufferReader(requests) + + in := make([]byte, 64) + { + n, err := br1.Read(in) + + c.Check(n, Equals, 64) + c.Check(err, Equals, nil) + + for i := 0; i < 64; i += 1 { + c.Check(in[i], Equals, buffer[i]) + } + } + { + n, err := br1.Read(in) + + c.Check(n, Equals, 36) + c.Check(err, Equals, nil) + + for i := 0; i < 36; i += 1 { + c.Check(in[i], Equals, buffer[64+i]) + } + } + { + n, err := br1.Read(in) + + c.Check(n, Equals, 0) + c.Check(err, Equals, io.EOF) + } +} + +func (s *StandaloneSuite) TestTransferIoCopy(c *C) { + // Buffer for reads from 'r' + buffer := make([]byte, 100) + for i := 0; i < 100; i += 1 { + buffer[i] = byte(i) + } + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + go Transfer(buffer, nil, requests, nil) + + br1 := MakeBufferReader(requests) + + reader, writer := io.Pipe() + + go func() { + p := make([]byte, 100) + n, err := reader.Read(p) + c.Check(n, Equals, 100) + c.Check(err, Equals, nil) + c.Check(p, DeepEquals, buffer) + }() + + io.Copy(writer, br1) +} -- 2.39.5