From 86dffe03ec74387d14da9ceb17934bbdab1239b9 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 22 May 2014 11:17:32 -0400 Subject: [PATCH] 1885: Full-stack integration test (api+keep+keepproxy+keepclient) works! --- sdk/go/src/arvados.org/keepclient/support.go | 4 +- .../src/arvados.org/keepproxy/keepproxy.go | 18 ++++--- .../arvados.org/keepproxy/keepproxy_test.go | 51 ++++++++++++++----- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go index 89c5e4b50f..ef4a8e1133 100644 --- a/sdk/go/src/arvados.org/keepclient/support.go +++ b/sdk/go/src/arvados.org/keepclient/support.go @@ -186,6 +186,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read var resp *http.Response if resp, err = this.Client.Do(req); err != nil { upload_status <- uploadStatus{err, url, 0, 0} + body.Close() return } @@ -231,7 +232,6 @@ func (this KeepClient) putReplicas( next_server += 1 active += 1 } else { - fmt.Print(active) if active == 0 { return (this.Want_replicas - remaining_replicas), InsufficientReplicasError } else { @@ -251,7 +251,7 @@ func (this KeepClient) putReplicas( status.url, status.err) } active -= 1 - log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active) + log.Printf("Upload status code: %v remaining replicas: %v active: %v", status.statusCode, remaining_replicas, active) } return this.Want_replicas, nil diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go index 4213ce0935..8b8ff18a31 100644 --- a/services/keep/src/arvados.org/keepproxy/keepproxy.go +++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go @@ -240,25 +240,27 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var reader io.ReadCloser var err error + var blocklen int64 if req.Method == "GET" { - reader, _, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp) + reader, blocklen, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp) + defer reader.Close() } else if req.Method == "HEAD" { - _, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp) + blocklen, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp) } + resp.Header().Set("Content-Length", fmt.Sprint(blocklen)) + switch err { case nil: - io.Copy(resp, reader) + if reader != nil { + io.Copy(resp, reader) + } case keepclient.BlockNotFound: http.Error(resp, "Not found", http.StatusNotFound) default: http.Error(resp, err.Error(), http.StatusBadGateway) } - - if reader != nil { - reader.Close() - } } func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { @@ -294,6 +296,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques // Now try to put the block through replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength) + log.Printf("Replicas stored: %v err: %v", replicas, err) + // Tell the client how many successful PUTs we accomplished resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas)) diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go index 256f7f4277..98712819a6 100644 --- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go +++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go @@ -2,6 +2,7 @@ package main import ( "arvados.org/keepclient" + "crypto/md5" "crypto/tls" "fmt" . "gopkg.in/check.v1" @@ -14,6 +15,7 @@ import ( "os/exec" "strings" "testing" + "time" ) // Gocheck boilerplate @@ -94,7 +96,7 @@ func SetupProxyService() { } } -func (s *ServerRequiredSuite) TestPutAndGet(c *C) { +func (s *ServerRequiredSuite) TestPutAskGet(c *C) { log.Print("TestPutAndGet start") os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true") @@ -111,22 +113,43 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) { os.Args = []string{"keepproxy", "-listen=:29950"} go main() - log.Print("keepproxy main started") - - hash, rep, err2 := kc.PutB([]byte("foo")) - - log.Print("PutB") + time.Sleep(100 * time.Millisecond) - c.Check(rep, Equals, 2) - c.Check(err2, Equals, nil) + log.Print("keepproxy main started") - reader, blocklen, _, err3 := kc.Get(hash) - all, err := ioutil.ReadAll(reader) - c.Check(all, DeepEquals, []byte("foo")) - c.Check(blocklen, Equals, int64(3)) - c.Check(err3, Equals, nil) + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + // Uncomment this when actual keep server supports HEAD + /*{ + _, _, err := kc.Ask(hash) + c.Check(err, Equals, keepclient.BlockNotFound) + log.Print("Ask 1") + }*/ + + { + hash2, rep, err := kc.PutB([]byte("foo")) + c.Check(hash2, Equals, hash) + c.Check(rep, Equals, 2) + c.Check(err, Equals, nil) + log.Print("PutB") + } - log.Print("Get") + // Uncomment this when actual keep server supports HEAD + /*{ + blocklen, _, err := kc.Ask(hash) + c.Check(blocklen, Equals, int64(3)) + c.Check(err, Equals, nil) + log.Print("Ask 2") + }*/ + + { + reader, blocklen, _, err := kc.Get(hash) + all, err := ioutil.ReadAll(reader) + c.Check(all, DeepEquals, []byte("foo")) + c.Check(blocklen, Equals, int64(3)) + c.Check(err, Equals, nil) + log.Print("Get") + } // Close internal listener socket. listener.Close() -- 2.30.2