From: Peter Amstutz Date: Thu, 15 May 2014 20:47:14 +0000 (-0400) Subject: 2798: Completed move of Transfer() related code out to 'buffer' package. X-Git-Tag: 1.1.0~2603^2~8^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/d3b11ddc2506de37b8e6538be69237d6d2a60a4a 2798: Completed move of Transfer() related code out to 'buffer' package. --- diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index 2738cefa7c..829ab0efb0 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -1,6 +1,7 @@ package keepclient import ( + "arvados.org/buffer" "crypto/md5" "crypto/tls" "encoding/json" @@ -155,243 +156,6 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) { return pseq } -type ReaderSlice struct { - slice []byte - reader_error error -} - -// Read repeatedly from the reader into the specified buffer, and report each -// read to channel 'c'. Completes when Reader 'r' reports on the error channel -// and closes channel 'c'. -func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) { - defer close(slices) - - // Initially use entire buffer as scratch space - ptr := buffer[:] - for { - var n int - var err error - if len(ptr) > 0 { - // Read into the scratch space - n, err = r.Read(ptr) - } else { - // Ran out of scratch space, try reading one more byte - var b [1]byte - n, err = r.Read(b[:]) - - if n > 0 { - // Reader has more data but we have nowhere to - // put it, so we're stuffed - slices <- ReaderSlice{nil, io.ErrShortBuffer} - } else { - // Return some other error (hopefully EOF) - slices <- ReaderSlice{nil, err} - } - return - } - - // End on error (includes EOF) - if err != nil { - slices <- ReaderSlice{nil, err} - return - } - - if n > 0 { - // Make a slice with the contents of the read - slices <- ReaderSlice{ptr[:n], nil} - - // Adjust the scratch space slice - ptr = ptr[n:] - } - } -} - -// A read request to the Transfer() function -type ReadRequest struct { - offset int - maxsize int - result chan<- ReadResult -} - -// A read result from the Transfer() function -type ReadResult struct { - slice []byte - err error -} - -// Reads from the buffer managed by the Transfer() -type BufferReader struct { - offset *int - requests chan<- ReadRequest - responses chan ReadResult -} - -func MakeBufferReader(requests chan<- ReadRequest) BufferReader { - return BufferReader{new(int), requests, make(chan ReadResult)} -} - -// Reads from the buffer managed by the Transfer() -func (this BufferReader) Read(p []byte) (n int, err error) { - this.requests <- ReadRequest{*this.offset, len(p), this.responses} - rr, valid := <-this.responses - if valid { - *this.offset += len(rr.slice) - return copy(p, rr.slice), rr.err - } else { - return 0, io.ErrUnexpectedEOF - } -} - -func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) { - // Record starting offset in order to correctly report the number of bytes sent - starting_offset := *this.offset - for { - this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses} - rr, valid := <-this.responses - if valid { - log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err) - *this.offset += len(rr.slice) - if rr.err != nil { - if rr.err == io.EOF { - // EOF is not an error. - return int64(*this.offset - starting_offset), nil - } else { - return int64(*this.offset - starting_offset), rr.err - } - } else { - dest.Write(rr.slice) - } - } else { - return int64(*this.offset), io.ErrUnexpectedEOF - } - } -} - -// Close the responses channel -func (this BufferReader) Close() error { - close(this.responses) - return nil -} - -// Handle a read request. Returns true if a response was sent, and false if -// the request should be queued. -func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool { - log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body)) - if req.offset < len(body) { - var end int - if req.offset+req.maxsize < len(body) { - end = req.offset + req.maxsize - } else { - end = len(body) - } - req.result <- ReadResult{body[req.offset:end], nil} - return true - } else if complete && req.offset >= len(body) { - req.result <- ReadResult{nil, io.EOF} - return true - } else { - return false - } -} - -// If 'source_reader' is not nil, reads data from 'source_reader' and stores it -// in the provided buffer. Otherwise, use the contents of 'buffer' as is. -// Accepts read requests on the buffer on the 'requests' channel. Completes -// when 'requests' channel is closed. -func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) { - // currently buffered data - var body []byte - - // for receiving slices from ReadIntoBuffer - var slices chan ReaderSlice = nil - - // indicates whether the buffered data is complete - var complete bool = false - - if source_reader != nil { - // 'body' is the buffer slice representing the body content read so far - body = source_buffer[:0] - - // used to communicate slices of the buffer as they are - // ReadIntoBuffer will close 'slices' when it is done with it - slices = make(chan ReaderSlice) - - // Spin it off - go ReadIntoBuffer(source_buffer, source_reader, slices) - } else { - // use the whole buffer - body = source_buffer[:] - - // buffer is complete - complete = true - } - - pending_requests := make([]ReadRequest, 0) - - for { - select { - case req, valid := <-requests: - // Handle a buffer read request - if valid { - if !HandleReadRequest(req, body, complete) { - pending_requests = append(pending_requests, req) - } - } else { - // closed 'requests' channel indicates we're done - return - } - - case bk, valid := <-slices: - // Got a new slice from the reader - if valid { - if bk.reader_error != nil { - reader_error <- bk.reader_error - if bk.reader_error == io.EOF { - // EOF indicates the reader is done - // sending, so our buffer is complete. - complete = true - } else { - // some other reader error - return - } - } - - if bk.slice != nil { - // adjust body bounds now that another slice has been read - body = source_buffer[0 : len(body)+len(bk.slice)] - } - - // handle pending reads - n := 0 - for n < len(pending_requests) { - if HandleReadRequest(pending_requests[n], body, complete) { - - // move the element from the - // back of the slice to - // position 'n', then shorten - // the slice by one element - pending_requests[n] = pending_requests[len(pending_requests)-1] - pending_requests = pending_requests[0 : len(pending_requests)-1] - } else { - - // Request wasn't handled, so keep it in the request slice - n += 1 - } - } - } else { - if complete { - // no more reads - slices = nil - } else { - // reader channel closed without signaling EOF - reader_error <- io.ErrUnexpectedEOF - return - } - } - } - } -} - type UploadStatus struct { Err error Url string @@ -434,7 +198,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read func (this KeepClient) putReplicas( hash string, - requests chan ReadRequest, + requests chan buffer.ReadRequest, reader_status chan error, expectedLength int64) (replicas int, err error) { @@ -458,7 +222,7 @@ func (this KeepClient) putReplicas( for active < remaining_replicas { // Start some upload requests if next_server < len(sv) { - go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength) + go this.uploadToKeepServer(sv[next_server], hash, buffer.MakeBufferReader(requests), upload_status, expectedLength) next_server += 1 active += 1 } else { @@ -497,18 +261,18 @@ var OversizeBlockError = errors.New("Block too big") func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) { // Buffer for reads from 'r' - var buffer []byte + var buf []byte if expectedLength > 0 { if expectedLength > BLOCKSIZE { return 0, OversizeBlockError } - buffer = make([]byte, expectedLength) + buf = make([]byte, expectedLength) } else { - buffer = make([]byte, BLOCKSIZE) + buf = make([]byte, BLOCKSIZE) } // Read requests on Transfer() buffer - requests := make(chan ReadRequest) + requests := make(chan buffer.ReadRequest) defer close(requests) // Reporting reader error states @@ -516,20 +280,20 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re defer close(reader_status) // Start the transfer goroutine - go Transfer(buffer, r, requests, reader_status) + go buffer.Transfer(buf, r, requests, reader_status) return this.putReplicas(hash, requests, reader_status, expectedLength) } -func (this KeepClient) PutHB(hash string, buffer []byte) (replicas int, err error) { +func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) { // Read requests on Transfer() buffer - requests := make(chan ReadRequest) + requests := make(chan buffer.ReadRequest) defer close(requests) // Start the transfer goroutine - go Transfer(buffer, nil, requests, nil) + go buffer.Transfer(buf, nil, requests, nil) - return this.putReplicas(hash, requests, nil, int64(len(buffer))) + return this.putReplicas(hash, requests, nil, int64(len(buf))) } func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) { diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index 3d38c60007..5f189fc8aa 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -1,6 +1,7 @@ package keepclient import ( + "arvados.org/buffer" "crypto/md5" "flag" "fmt" @@ -15,7 +16,6 @@ import ( "sort" "strings" "testing" - "time" ) // Gocheck boilerplate @@ -89,354 +89,6 @@ func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) } -func ReadIntoBufferHelper(c *C, bufsize int) { - buffer := make([]byte, bufsize) - - reader, writer := io.Pipe() - slices := make(chan ReaderSlice) - - go ReadIntoBuffer(buffer, reader, slices) - - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 512) - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) -} - -func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { - buffer := make([]byte, 223) - reader, writer := io.Pipe() - slices := make(chan ReaderSlice) - - go ReadIntoBuffer(buffer, reader, slices) - - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices - - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) - } - -} - -func (s *StandaloneSuite) TestTransfer(c *C) { - reader, writer := io.Pipe() - - // Buffer for reads from 'r' - buffer := make([]byte, 512) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) - - go Transfer(buffer, reader, requests, reader_status) - - br1 := MakeBufferReader(requests) - out := make([]byte, 128) - - { - // Write some data, and read into a buffer shorter than - // available data - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - - writer.Write(out[:100]) - - in := make([]byte, 64) - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Write some more data, and read into buffer longer than - // available data - in := make([]byte, 64) - n, err := br1.Read(in) - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, out[64+i]) - } - - } - - { - // Test read before write - type Rd struct { - n int - err error - } - rd := make(chan Rd) - in := make([]byte, 64) - - go func() { - n, err := br1.Read(in) - rd <- Rd{n, err} - }() - - time.Sleep(100 * time.Millisecond) - writer.Write(out[100:]) - - got := <-rd - - c.Check(got.n, Equals, 28) - c.Check(got.err, Equals, nil) - - for i := 0; i < 28; i += 1 { - c.Check(in[i], Equals, out[100+i]) - } - } - - br2 := MakeBufferReader(requests) - { - // Test 'catch up' reader - in := make([]byte, 256) - n, err := br2.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Test closing the reader - writer.Close() - status := <-reader_status - c.Check(status, Equals, io.EOF) - - in := make([]byte, 256) - n1, err1 := br1.Read(in) - n2, err2 := br2.Read(in) - c.Check(n1, Equals, 0) - c.Check(err1, Equals, io.EOF) - c.Check(n2, Equals, 0) - c.Check(err2, Equals, io.EOF) - } - - { - // Test 'catch up' reader after closing - br3 := MakeBufferReader(requests) - in := make([]byte, 256) - n, err := br3.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - - n, err = br3.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { - reader, writer := io.Pipe() - - // Buffer for reads from 'r' - buffer := make([]byte, 100) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) - - go Transfer(buffer, reader, requests, reader_status) - - out := make([]byte, 101) - go writer.Write(out) - - status := <-reader_status - c.Check(status, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) - - br1 := MakeBufferReader(requests) - - in := make([]byte, 64) - { - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, buffer[i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, buffer[64+i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferIoCopy(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) - - br1 := MakeBufferReader(requests) - - reader, writer := io.Pipe() - - go func() { - p := make([]byte, 100) - n, err := reader.Read(p) - c.Check(n, Equals, 100) - c.Check(err, Equals, nil) - c.Check(p, DeepEquals, buffer) - }() - - io.Copy(writer, br1) -} - type StubPutHandler struct { c *C expectPath string @@ -521,18 +173,18 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { writer io.WriteCloser, upload_status chan UploadStatus) { // Buffer for reads from 'r' - buffer := make([]byte, 512) + buf := make([]byte, 512) // Read requests on Transfer() buffer - requests := make(chan ReadRequest) + requests := make(chan buffer.ReadRequest) defer close(requests) // Reporting reader error states reader_status := make(chan error) - go Transfer(buffer, reader, requests, reader_status) + go buffer.Transfer(buf, reader, requests, reader_status) - br1 := MakeBufferReader(requests) + br1 := buffer.MakeBufferReader(requests) go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)