2798: Read requests from Transfer() now return a slice. Added BufferReader
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 14 May 2014 02:36:19 +0000 (22:36 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 14 May 2014 02:36:19 +0000 (22:36 -0400)
WriteTo() but for some reason http.Request Body isn't using it.

sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go

index 93fcf4b3b95542cd6a337b7f9fd98673cf8cdc0c..ce675030f2e936775bed6c7b73d32665da9c9a4b 100644 (file)
@@ -208,15 +208,15 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
 
 // A read request to the Transfer() function
 type ReadRequest struct {
-       offset int
-       p      []byte
-       result chan<- ReadResult
+       offset  int
+       maxsize int
+       result  chan<- ReadResult
 }
 
 // A read result from the Transfer() function
 type ReadResult struct {
-       n   int
-       err error
+       slice []byte
+       err   error
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -232,16 +232,36 @@ func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
 
 // Reads from the buffer managed by the Transfer()
 func (this BufferReader) Read(p []byte) (n int, err error) {
-       this.requests <- ReadRequest{*this.offset, p, this.responses}
+       log.Printf("BufferReader Read %d", len(p))
+       this.requests <- ReadRequest{*this.offset, len(p), this.responses}
        rr, valid := <-this.responses
        if valid {
-               *this.offset += rr.n
-               return rr.n, rr.err
+               *this.offset += len(rr.slice)
+               return copy(p, rr.slice), rr.err
        } else {
                return 0, io.ErrUnexpectedEOF
        }
 }
 
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+       log.Printf("BufferReader WriteTo")
+       for {
+               this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses}
+               rr, valid := <-this.responses
+               if valid {
+                       *this.offset += len(rr.slice)
+                       if err != nil {
+                               return int64(*this.offset), err
+                       } else {
+                               dest.Write(rr.slice)
+                       }
+
+               } else {
+                       return int64(*this.offset), io.ErrUnexpectedEOF
+               }
+       }
+}
+
 // Close the responses channel
 func (this BufferReader) Close() error {
        close(this.responses)
@@ -251,12 +271,18 @@ func (this BufferReader) Close() error {
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
 func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
-       log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+       log.Printf("HandleReadRequest offset: %d  max: %d body: %d %t", req.offset, req.maxsize, len(body), complete)
        if req.offset < len(body) {
-               req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+               var end int
+               if req.offset+req.maxsize < len(body) {
+                       end = req.offset + req.maxsize
+               } else {
+                       end = len(body)
+               }
+               req.result <- ReadResult{body[req.offset:end], nil}
                return true
        } else if complete && req.offset >= len(body) {
-               req.result <- ReadResult{0, io.EOF}
+               req.result <- ReadResult{nil, io.EOF}
                return true
        } else {
                return false
index fd4ba7ba4922cb87bafcfeb8df10060b2c8b0298..c87b87e37c138610c52205038a4462cf987781ca 100644 (file)
@@ -366,7 +366,7 @@ type StubHandler struct {
 }
 
 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       this.c.Check(req.URL.Path, Equals, this.expectPath)
+       this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
        this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
        body, err := ioutil.ReadAll(req.Body)
        this.c.Check(err, Equals, nil)
@@ -375,19 +375,21 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        this.handled <- true
 }
 
-func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
+       io.ReadCloser, io.WriteCloser, chan UploadError)) {
+
        st := StubHandler{
                c,
-               "/acbd18db4cc2f85cedef654fccc4a4d8",
+               "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
                make(chan bool)}
        server := http.Server{Handler: st}
 
-       listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+       listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
        defer listener.Close()
 
-       log.Printf("%s", listener.Addr().String())
+       url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
 
        go server.Serve(listener)
        kc, _ := MakeKeepClient()
@@ -396,14 +398,59 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
        reader, writer := io.Pipe()
        upload_status := make(chan UploadError)
 
-       go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+       f(kc, url, st, reader, writer, upload_status)
+}
 
-       writer.Write([]byte("foo"))
-       writer.Close()
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+       log.Printf("Started TestUploadToStubKeepServer")
 
-       <-st.handled
-       status := <-upload_status
-       c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+       UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+               reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+               go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
+
+               writer.Write([]byte("foo"))
+               writer.Close()
+
+               <-st.handled
+               status := <-upload_status
+               c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+       })
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+       log.Printf("Started TestUploadToStubKeepServerBufferReader")
+
+       UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+               reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+               // Buffer for reads from 'r'
+               buffer := make([]byte, 512)
+
+               // Read requests on Transfer() buffer
+               requests := make(chan ReadRequest)
+               defer close(requests)
+
+               // Reporting reader error states
+               reader_status := make(chan error)
+
+               go Transfer(buffer, reader, requests, reader_status)
+
+               br1 := MakeBufferReader(requests)
+
+               go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
+
+               writer.Write([]byte("foo"))
+               writer.Close()
+
+               <-reader_status
+               <-st.handled
+
+               status := <-upload_status
+               c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+
+               //c.Check(true, Equals, false)
+       })
 }
 
 type FailHandler struct {