2798: Updated keep client with buffer/streamer changes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 15:24:04 +0000 (11:24 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 15:24:04 +0000 (11:24 -0400)
sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
sdk/go/src/arvados.org/keepclient/support.go
sdk/go/src/arvados.org/streamer/streamer.go

index dcf1f33bbac1fac4927e821806659a9d3572fff9..dc3ceedf454281d6d88f3469dadab2fe8b56f573 100644 (file)
@@ -2,7 +2,7 @@
 package keepclient
 
 import (
-       "arvados.org/buffer"
+       "arvados.org/streamer"
        "crypto/md5"
        "crypto/tls"
        "errors"
@@ -69,7 +69,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
                bufsize = BLOCKSIZE
        }
 
-       t := buffer.StartTransferFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
+       t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
        defer t.Close()
 
        return this.putReplicas(hash, t, expectedLength)
@@ -80,7 +80,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 // replicas that were written and if there was an error.  Note this will return
 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
 func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
-       t := buffer.StartTransferFromSlice(buf)
+       t := streamer.AsyncStreamFromSlice(buf)
        defer t.Close()
 
        return this.putReplicas(hash, t, int64(len(buf)))
index 348b913fd584f20880378f2660018cf8db8e73ae..1d3bbeee308761be39b189ecfac044f6d009ff97 100644 (file)
@@ -1,7 +1,7 @@
 package keepclient
 
 import (
-       "arvados.org/buffer"
+       "arvados.org/streamer"
        "crypto/md5"
        "flag"
        "fmt"
@@ -141,6 +141,8 @@ func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+       log.Printf("TestUploadToStubKeepServer")
+
        st := StubPutHandler{
                c,
                "acbd18db4cc2f85cedef654fccc4a4d8",
@@ -161,9 +163,13 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                        status := <-upload_status
                        c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
                })
+
+       log.Printf("TestUploadToStubKeepServer done")
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+       log.Printf("TestUploadToStubKeepServerBufferReader")
+
        st := StubPutHandler{
                c,
                "acbd18db4cc2f85cedef654fccc4a4d8",
@@ -175,22 +181,23 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                func(kc KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan uploadStatus) {
 
-                       tr := buffer.StartTransferFromReader(512, reader)
+                       tr := streamer.AsyncStreamFromReader(512, reader)
                        defer tr.Close()
 
-                       br1 := tr.MakeBufferReader()
+                       br1 := tr.MakeStreamReader()
 
                        go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
 
                        writer.Write([]byte("foo"))
                        writer.Close()
 
-                       <-tr.Reader_status
                        <-st.handled
 
                        status := <-upload_status
                        c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
                })
+
+       log.Printf("TestUploadToStubKeepServerBufferReader done")
 }
 
 type FailHandler struct {
@@ -203,6 +210,8 @@ func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+       log.Printf("TestFailedUploadToStubKeepServer")
+
        st := FailHandler{
                make(chan string)}
 
@@ -223,7 +232,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
                        c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
                        c.Check(status.StatusCode, Equals, 500)
                })
-
+       log.Printf("TestFailedUploadToStubKeepServer done")
 }
 
 type KeepServer struct {
@@ -279,6 +288,8 @@ func (s *StandaloneSuite) TestPutB(c *C) {
                (s1 == shuff[1] && s2 == shuff[0]),
                Equals,
                true)
+
+       log.Printf("TestPutB done")
 }
 
 func (s *StandaloneSuite) TestPutHR(c *C) {
@@ -327,6 +338,8 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                (s1 == shuff[1] && s2 == shuff[0]),
                Equals,
                true)
+
+       log.Printf("TestPutHR done")
 }
 
 func (s *StandaloneSuite) TestPutWithFail(c *C) {
@@ -419,6 +432,8 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
        c.Check(err, Equals, InsufficientReplicasError)
        c.Check(replicas, Equals, 1)
        c.Check(<-st.handled, Equals, shuff[1])
+
+       log.Printf("TestPutWithTooManyFail done")
 }
 
 type StubGetHandler struct {
@@ -436,6 +451,7 @@ func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
 }
 
 func (s *StandaloneSuite) TestGet(c *C) {
+       log.Printf("TestGet")
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
@@ -453,6 +469,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
        kc.Service_roots = []string{url}
 
        r, n, url2, err := kc.Get(hash)
+       defer r.Close()
        c.Check(err, Equals, nil)
        c.Check(n, Equals, int64(3))
        c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
@@ -460,6 +477,8 @@ func (s *StandaloneSuite) TestGet(c *C) {
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
        c.Check(content, DeepEquals, []byte("foo"))
+
+       log.Printf("TestGet done")
 }
 
 func (s *StandaloneSuite) TestGetFail(c *C) {
index 7ea8248b393d27794545d0bea55afe1ff49d45a4..e657a60a39e46d668253436b61c7a191504188cf 100644 (file)
@@ -2,7 +2,7 @@
 package keepclient
 
 import (
-       "arvados.org/buffer"
+       "arvados.org/streamer"
        "encoding/json"
        "errors"
        "fmt"
@@ -137,6 +137,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
                upload_status <- uploadStatus{err, url, 0}
+               body.Close()
                return
        }
 
@@ -163,7 +164,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 func (this KeepClient) putReplicas(
        hash string,
-       tr buffer.TransferBuffer,
+       tr *streamer.AsyncStream,
        expectedLength int64) (replicas int, err error) {
 
        // Calculate the ordering for uploading to servers
@@ -186,7 +187,7 @@ func (this KeepClient) putReplicas(
                for active < remaining_replicas {
                        // Start some upload requests
                        if next_server < len(sv) {
-                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeBufferReader(), upload_status, expectedLength)
+                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
                                next_server += 1
                                active += 1
                        } else {
@@ -195,26 +196,17 @@ func (this KeepClient) putReplicas(
                }
 
                // Now wait for something to happen.
-               select {
-               case status := <-tr.Reader_status:
-                       if status == io.EOF {
-                               // good news!
-                       } else {
-                               // bad news
-                               return (this.Want_replicas - remaining_replicas), status
-                       }
-               case status := <-upload_status:
-                       if status.StatusCode == 200 {
-                               // good news!
-                               remaining_replicas -= 1
-                       } else {
-                               // writing to keep server failed for some reason
-                               log.Printf("Keep server put to %v failed with '%v'",
-                                       status.Url, status.Err)
-                       }
-                       active -= 1
-                       log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+               status := <-upload_status
+               if status.StatusCode == 200 {
+                       // good news!
+                       remaining_replicas -= 1
+               } else {
+                       // writing to keep server failed for some reason
+                       log.Printf("Keep server put to %v failed with '%v'",
+                               status.Url, status.Err)
                }
+               active -= 1
+               log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
        }
 
        return (this.Want_replicas - remaining_replicas), nil
index 78ab027829b59c99b3e1d539dc16c482927f5231..2217dd3352eae69255b74b4faa5a74425efca0ee 100644 (file)
@@ -117,6 +117,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 func (this *StreamReader) Close() error {
        this.stream.subtract_reader <- true
        close(this.responses)
+       this.stream = nil
        return nil
 }