// A read request to the Transfer() function
type ReadRequest struct {
- offset int
- p []byte
- result chan<- ReadResult
+ offset int
+ maxsize int
+ result chan<- ReadResult
}
// A read result from the Transfer() function
type ReadResult struct {
- n int
- err error
+ slice []byte
+ err error
}
// Reads from the buffer managed by the Transfer()
// Reads from the buffer managed by the Transfer()
func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- ReadRequest{*this.offset, p, this.responses}
+ log.Printf("BufferReader Read %d", len(p))
+ this.requests <- ReadRequest{*this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
- *this.offset += rr.n
- return rr.n, rr.err
+ *this.offset += len(rr.slice)
+ return copy(p, rr.slice), rr.err
} else {
return 0, io.ErrUnexpectedEOF
}
}
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+ log.Printf("BufferReader WriteTo")
+ for {
+ this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses}
+ rr, valid := <-this.responses
+ if valid {
+ *this.offset += len(rr.slice)
+ if err != nil {
+ return int64(*this.offset), err
+ } else {
+ dest.Write(rr.slice)
+ }
+
+ } else {
+ return int64(*this.offset), io.ErrUnexpectedEOF
+ }
+ }
+}
+
// Close the responses channel
func (this BufferReader) Close() error {
close(this.responses)
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
- log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+ log.Printf("HandleReadRequest offset: %d max: %d body: %d %t", req.offset, req.maxsize, len(body), complete)
if req.offset < len(body) {
- req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+ var end int
+ if req.offset+req.maxsize < len(body) {
+ end = req.offset + req.maxsize
+ } else {
+ end = len(body)
+ }
+ req.result <- ReadResult{body[req.offset:end], nil}
return true
} else if complete && req.offset >= len(body) {
- req.result <- ReadResult{0, io.EOF}
+ req.result <- ReadResult{nil, io.EOF}
return true
} else {
return false
}
func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- this.c.Check(req.URL.Path, Equals, this.expectPath)
+ this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
body, err := ioutil.ReadAll(req.Body)
this.c.Check(err, Equals, nil)
this.handled <- true
}
-func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
+ io.ReadCloser, io.WriteCloser, chan UploadError)) {
+
st := StubHandler{
c,
- "/acbd18db4cc2f85cedef654fccc4a4d8",
+ "acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
make(chan bool)}
server := http.Server{Handler: st}
- listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+ listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
defer listener.Close()
- log.Printf("%s", listener.Addr().String())
+ url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
go server.Serve(listener)
kc, _ := MakeKeepClient()
reader, writer := io.Pipe()
upload_status := make(chan UploadError)
- go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+ f(kc, url, st, reader, writer, upload_status)
+}
- writer.Write([]byte("foo"))
- writer.Close()
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+ log.Printf("Started TestUploadToStubKeepServer")
- <-st.handled
- status := <-upload_status
- c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+ UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+ reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+ go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-st.handled
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+ })
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+ log.Printf("Started TestUploadToStubKeepServerBufferReader")
+
+ UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+ reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 512)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ br1 := MakeBufferReader(requests)
+
+ go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-reader_status
+ <-st.handled
+
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+
+ //c.Check(true, Equals, false)
+ })
}
type FailHandler struct {