From: Peter Amstutz Date: Wed, 14 May 2014 02:36:19 +0000 (-0400) Subject: 2798: Read requests from Transfer() now return a slice. Added BufferReader X-Git-Tag: 1.1.0~2603^2~5^2~17 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fb7f238945e33b07f1c80b0623315c1ecf86bca2 2798: Read requests from Transfer() now return a slice. Added BufferReader WriteTo() but for some reason http.Request Body isn't using it. --- diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index 93fcf4b3b9..ce675030f2 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -208,15 +208,15 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) { // A read request to the Transfer() function type ReadRequest struct { - offset int - p []byte - result chan<- ReadResult + offset int + maxsize int + result chan<- ReadResult } // A read result from the Transfer() function type ReadResult struct { - n int - err error + slice []byte + err error } // Reads from the buffer managed by the Transfer() @@ -232,16 +232,36 @@ func MakeBufferReader(requests chan<- ReadRequest) BufferReader { // Reads from the buffer managed by the Transfer() func (this BufferReader) Read(p []byte) (n int, err error) { - this.requests <- ReadRequest{*this.offset, p, this.responses} + log.Printf("BufferReader Read %d", len(p)) + this.requests <- ReadRequest{*this.offset, len(p), this.responses} rr, valid := <-this.responses if valid { - *this.offset += rr.n - return rr.n, rr.err + *this.offset += len(rr.slice) + return copy(p, rr.slice), rr.err } else { return 0, io.ErrUnexpectedEOF } } +func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) { + log.Printf("BufferReader WriteTo") + for { + this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses} + rr, valid := <-this.responses + if valid { + *this.offset += len(rr.slice) + if err != nil { + return int64(*this.offset), err + } else { + dest.Write(rr.slice) + } + + } else { + return int64(*this.offset), io.ErrUnexpectedEOF + } + } +} + // Close the responses channel func (this BufferReader) Close() error { close(this.responses) @@ -251,12 +271,18 @@ func (this BufferReader) Close() error { // Handle a read request. Returns true if a response was sent, and false if // the request should be queued. func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool { - log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete) + log.Printf("HandleReadRequest offset: %d max: %d body: %d %t", req.offset, req.maxsize, len(body), complete) if req.offset < len(body) { - req.result <- ReadResult{copy(req.p, body[req.offset:]), nil} + var end int + if req.offset+req.maxsize < len(body) { + end = req.offset + req.maxsize + } else { + end = len(body) + } + req.result <- ReadResult{body[req.offset:end], nil} return true } else if complete && req.offset >= len(body) { - req.result <- ReadResult{0, io.EOF} + req.result <- ReadResult{nil, io.EOF} return true } else { return false diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index fd4ba7ba49..c87b87e37c 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -366,7 +366,7 @@ type StubHandler struct { } func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - this.c.Check(req.URL.Path, Equals, this.expectPath) + this.c.Check(req.URL.Path, Equals, "/"+this.expectPath) this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken)) body, err := ioutil.ReadAll(req.Body) this.c.Check(err, Equals, nil) @@ -375,19 +375,21 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { this.handled <- true } -func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { +func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler, + io.ReadCloser, io.WriteCloser, chan UploadError)) { + st := StubHandler{ c, - "/acbd18db4cc2f85cedef654fccc4a4d8", + "acbd18db4cc2f85cedef654fccc4a4d8", "abc123", "foo", make(chan bool)} server := http.Server{Handler: st} - listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999}) + listener, _ := net.ListenTCP("tcp", &net.TCPAddr{}) defer listener.Close() - log.Printf("%s", listener.Addr().String()) + url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port) go server.Serve(listener) kc, _ := MakeKeepClient() @@ -396,14 +398,59 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { reader, writer := io.Pipe() upload_status := make(chan UploadError) - go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status) + f(kc, url, st, reader, writer, upload_status) +} - writer.Write([]byte("foo")) - writer.Close() +func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { + log.Printf("Started TestUploadToStubKeepServer") - <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"}) + UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler, + reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) { + + go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status) + + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-upload_status + c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)}) + }) +} + +func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { + log.Printf("Started TestUploadToStubKeepServerBufferReader") + + UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler, + reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) { + + // Buffer for reads from 'r' + buffer := make([]byte, 512) + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + // Reporting reader error states + reader_status := make(chan error) + + go Transfer(buffer, reader, requests, reader_status) + + br1 := MakeBufferReader(requests) + + go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status) + + writer.Write([]byte("foo")) + writer.Close() + + <-reader_status + <-st.handled + + status := <-upload_status + c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)}) + + //c.Check(true, Equals, false) + }) } type FailHandler struct {