X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/48ffdd5ac196771381c8dc9ab47cfad5f1929720..ce5813a53eda0f1d1512f6c080e6c4ed95179089:/sdk/go/src/arvados.org/keepclient/keepclient_test.go diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index 756868cd1c..8eedadd64b 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -1,6 +1,7 @@ package keepclient import ( + "arvados.org/streamer" "crypto/md5" "flag" "fmt" @@ -12,14 +13,14 @@ import ( "net/http" "os" "os/exec" - "sort" "strings" "testing" - "time" ) // Gocheck boilerplate -func Test(t *testing.T) { TestingT(t) } +func Test(t *testing.T) { + TestingT(t) +} // Gocheck boilerplate var _ = Suite(&ServerRequiredSuite{}) @@ -43,8 +44,12 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) { c.Skip("Skipping tests that require server") } else { os.Chdir(pythonDir()) - exec.Command("python", "run_test_server.py", "start").Run() - exec.Command("python", "run_test_server.py", "start_keep").Run() + if err := exec.Command("python", "run_test_server.py", "start").Run(); err != nil { + panic("'python run_test_server.py start' returned error") + } + if err := exec.Command("python", "run_test_server.py", "start_keep").Run(); err != nil { + panic("'python run_test_server.py start_keep' returned error") + } } } @@ -60,381 +65,35 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { os.Setenv("ARVADOS_API_HOST_INSECURE", "") kc, err := MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, false) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, false) os.Setenv("ARVADOS_API_HOST_INSECURE", "true") kc, err = MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, true) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, true) + c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true) c.Assert(err, Equals, nil) - c.Assert(len(kc.Service_roots), Equals, 2) - c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107") - c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108") + c.Check(len(kc.ServiceRoots()), Equals, 2) + c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107") + c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108") } func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { - kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}} + kc := KeepClient{} + kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}) // "foo" acbd18db4cc2f85cedef654fccc4a4d8 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"} - c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) + c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) // "bar" 37b51d194a7513e45b56f6524f2d51f2 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"} - c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) -} - -func ReadIntoBufferHelper(c *C, bufsize int) { - buffer := make([]byte, bufsize) - - reader, writer := io.Pipe() - slices := make(chan ReaderSlice) - - go ReadIntoBuffer(buffer, reader, slices) - - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 512) - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) -} - -func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { - buffer := make([]byte, 223) - reader, writer := io.Pipe() - slices := make(chan ReaderSlice) - - go ReadIntoBuffer(buffer, reader, slices) - - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices - - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) - } - -} - -func (s *StandaloneSuite) TestTransfer(c *C) { - reader, writer := io.Pipe() - - // Buffer for reads from 'r' - buffer := make([]byte, 512) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) - - go Transfer(buffer, reader, requests, reader_status) - - br1 := MakeBufferReader(requests) - out := make([]byte, 128) - - { - // Write some data, and read into a buffer shorter than - // available data - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - - writer.Write(out[:100]) - - in := make([]byte, 64) - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Write some more data, and read into buffer longer than - // available data - in := make([]byte, 64) - n, err := br1.Read(in) - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, out[64+i]) - } - - } - - { - // Test read before write - type Rd struct { - n int - err error - } - rd := make(chan Rd) - in := make([]byte, 64) - - go func() { - n, err := br1.Read(in) - rd <- Rd{n, err} - }() - - time.Sleep(100 * time.Millisecond) - writer.Write(out[100:]) - - got := <-rd - - c.Check(got.n, Equals, 28) - c.Check(got.err, Equals, nil) - - for i := 0; i < 28; i += 1 { - c.Check(in[i], Equals, out[100+i]) - } - } - - br2 := MakeBufferReader(requests) - { - // Test 'catch up' reader - in := make([]byte, 256) - n, err := br2.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } - - { - // Test closing the reader - writer.Close() - status := <-reader_status - c.Check(status, Equals, io.EOF) - - in := make([]byte, 256) - n1, err1 := br1.Read(in) - n2, err2 := br2.Read(in) - c.Check(n1, Equals, 0) - c.Check(err1, Equals, io.EOF) - c.Check(n2, Equals, 0) - c.Check(err2, Equals, io.EOF) - } - - { - // Test 'catch up' reader after closing - br3 := MakeBufferReader(requests) - in := make([]byte, 256) - n, err := br3.Read(in) - - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) - - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } - - n, err = br3.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { - reader, writer := io.Pipe() - - // Buffer for reads from 'r' - buffer := make([]byte, 100) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) - - go Transfer(buffer, reader, requests, reader_status) - - out := make([]byte, 101) - go writer.Write(out) - - status := <-reader_status - c.Check(status, Equals, io.ErrShortBuffer) -} - -func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) - - br1 := MakeBufferReader(requests) - - in := make([]byte, 64) - { - n, err := br1.Read(in) - - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) - - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, buffer[i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) - - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, buffer[64+i]) - } - } - { - n, err := br1.Read(in) - - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) - } -} - -func (s *StandaloneSuite) TestTransferIoCopy(c *C) { - // Buffer for reads from 'r' - buffer := make([]byte, 100) - for i := 0; i < 100; i += 1 { - buffer[i] = byte(i) - } - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - go Transfer(buffer, nil, requests, nil) - - br1 := MakeBufferReader(requests) - - reader, writer := io.Pipe() - - go func() { - p := make([]byte, 100) - n, err := reader.Read(p) - c.Check(n, Equals, 100) - c.Check(err, Equals, nil) - c.Check(p, DeepEquals, buffer) - }() - - io.Copy(writer, br1) + c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) } type StubPutHandler struct { @@ -456,22 +115,20 @@ func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request } func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) { - server := http.Server{Handler: st} - var err error listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port}) if err != nil { panic(fmt.Sprintf("Could not listen on tcp port %v", port)) } - url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port) + url = fmt.Sprintf("http://localhost:%d", port) - go server.Serve(listener) + go http.Serve(listener, st) return listener, url } -func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, - io.ReadCloser, io.WriteCloser, chan UploadStatus)) { +func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string, + io.ReadCloser, io.WriteCloser, chan uploadStatus)) { listener, url := RunBogusKeepServer(st, 2990) defer listener.Close() @@ -480,12 +137,14 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, kc.ApiToken = "abc123" reader, writer := io.Pipe() - upload_status := make(chan UploadStatus) + upload_status := make(chan uploadStatus) f(kc, url, reader, writer, upload_status) } func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { + log.Printf("TestUploadToStubKeepServer") + st := StubPutHandler{ c, "acbd18db4cc2f85cedef654fccc4a4d8", @@ -494,8 +153,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo"))) @@ -504,11 +163,15 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { <-st.handled status := <-upload_status - c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) }) + + log.Printf("TestUploadToStubKeepServer done") } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { + log.Printf("TestUploadToStubKeepServerBufferReader") + st := StubPutHandler{ c, "acbd18db4cc2f85cedef654fccc4a4d8", @@ -517,36 +180,26 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { - - // Buffer for reads from 'r' - buffer := make([]byte, 512) - - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) - - // Reporting reader error states - reader_status := make(chan error) + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { - go Transfer(buffer, reader, requests, reader_status) + tr := streamer.AsyncStreamFromReader(512, reader) + defer tr.Close() - br1 := MakeBufferReader(requests) + br1 := tr.MakeStreamReader() go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3) writer.Write([]byte("foo")) writer.Close() - <-reader_status <-st.handled status := <-upload_status - c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) - - //c.Check(true, Equals, false) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""}) }) + + log.Printf("TestUploadToStubKeepServerBufferReader done") } type FailHandler struct { @@ -554,19 +207,21 @@ type FailHandler struct { } func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - resp.WriteHeader(400) + resp.WriteHeader(500) this.handled <- fmt.Sprintf("http://%s", req.Host) } func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { + log.Printf("TestFailedUploadToStubKeepServer") + st := FailHandler{ make(chan string)} hash := "acbd18db4cc2f85cedef654fccc4a4d8" UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, - writer io.WriteCloser, upload_status chan UploadStatus) { + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { go kc.uploadToKeepServer(url, hash, reader, upload_status, 3) @@ -576,10 +231,10 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { <-st.handled status := <-upload_status - c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash)) - c.Check(status.StatusCode, Equals, 400) + c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash)) + c.Check(status.statusCode, Equals, 500) }) - + log.Printf("TestFailedUploadToStubKeepServer done") } type KeepServer struct { @@ -614,23 +269,29 @@ func (s *StandaloneSuite) TestPutB(c *C) { kc.Want_replicas = 2 kc.ApiToken = "abc123" - kc.Service_roots = make([]string, 5) + service_roots := make([]string, 5) ks := RunSomeFakeKeepServers(st, 5, 2990) for i := 0; i < len(ks); i += 1 { - kc.Service_roots[i] = ks[i].url + service_roots[i] = ks[i].url defer ks[i].listener.Close() } - sort.Strings(kc.Service_roots) + kc.SetServiceRoots(service_roots) kc.PutB([]byte("foo")) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) - c.Check(<-st.handled, Equals, shuff[0]) - c.Check(<-st.handled, Equals, shuff[1]) + s1 := <-st.handled + s2 := <-st.handled + c.Check((s1 == shuff[0] && s2 == shuff[1]) || + (s1 == shuff[1] && s2 == shuff[0]), + Equals, + true) + + log.Printf("TestPutB done") } func (s *StandaloneSuite) TestPutHR(c *C) { @@ -649,16 +310,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.Want_replicas = 2 kc.ApiToken = "abc123" - kc.Service_roots = make([]string, 5) + service_roots := make([]string, 5) ks := RunSomeFakeKeepServers(st, 5, 2990) for i := 0; i < len(ks); i += 1 { - kc.Service_roots[i] = ks[i].url + service_roots[i] = ks[i].url defer ks[i].listener.Close() } - sort.Strings(kc.Service_roots) + kc.SetServiceRoots(service_roots) reader, writer := io.Pipe() @@ -669,7 +330,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) { kc.PutHR(hash, reader, 3) - shuff := kc.ShuffledServiceRoots(hash) + shuff := kc.shuffledServiceRoots(hash) log.Print(shuff) s1 := <-st.handled @@ -679,6 +340,8 @@ func (s *StandaloneSuite) TestPutHR(c *C) { (s1 == shuff[1] && s2 == shuff[0]), Equals, true) + + log.Printf("TestPutHR done") } func (s *StandaloneSuite) TestPutWithFail(c *C) { @@ -700,30 +363,30 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) { kc.Want_replicas = 2 kc.ApiToken = "abc123" - kc.Service_roots = make([]string, 5) + service_roots := make([]string, 5) ks1 := RunSomeFakeKeepServers(st, 4, 2990) ks2 := RunSomeFakeKeepServers(fh, 1, 2995) for i, k := range ks1 { - kc.Service_roots[i] = k.url + service_roots[i] = k.url defer k.listener.Close() } for i, k := range ks2 { - kc.Service_roots[len(ks1)+i] = k.url + service_roots[len(ks1)+i] = k.url defer k.listener.Close() } - sort.Strings(kc.Service_roots) + kc.SetServiceRoots(service_roots) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) phash, replicas, err := kc.PutB([]byte("foo")) <-fh.handled c.Check(err, Equals, nil) - c.Check(phash, Equals, hash) + c.Check(phash, Equals, "") c.Check(replicas, Equals, 2) c.Check(<-st.handled, Equals, shuff[1]) c.Check(<-st.handled, Equals, shuff[2]) @@ -748,29 +411,31 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { kc.Want_replicas = 2 kc.ApiToken = "abc123" - kc.Service_roots = make([]string, 5) + service_roots := make([]string, 5) ks1 := RunSomeFakeKeepServers(st, 1, 2990) ks2 := RunSomeFakeKeepServers(fh, 4, 2991) for i, k := range ks1 { - kc.Service_roots[i] = k.url + service_roots[i] = k.url defer k.listener.Close() } for i, k := range ks2 { - kc.Service_roots[len(ks1)+i] = k.url + service_roots[len(ks1)+i] = k.url defer k.listener.Close() } - sort.Strings(kc.Service_roots) + kc.SetServiceRoots(service_roots) - shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) _, replicas, err := kc.PutB([]byte("foo")) c.Check(err, Equals, InsufficientReplicasError) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, shuff[1]) + + log.Printf("TestPutWithTooManyFail done") } type StubGetHandler struct { @@ -788,6 +453,7 @@ func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request } func (s *StandaloneSuite) TestGet(c *C) { + log.Printf("TestGet") hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) @@ -802,9 +468,10 @@ func (s *StandaloneSuite) TestGet(c *C) { kc, _ := MakeKeepClient() kc.ApiToken = "abc123" - kc.Service_roots = []string{url} + kc.SetServiceRoots([]string{url}) r, n, url2, err := kc.Get(hash) + defer r.Close() c.Check(err, Equals, nil) c.Check(n, Equals, int64(3)) c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash)) @@ -812,6 +479,8 @@ func (s *StandaloneSuite) TestGet(c *C) { content, err2 := ioutil.ReadAll(r) c.Check(err2, Equals, nil) c.Check(content, DeepEquals, []byte("foo")) + + log.Printf("TestGet done") } func (s *StandaloneSuite) TestGetFail(c *C) { @@ -824,7 +493,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) { kc, _ := MakeKeepClient() kc.ApiToken = "abc123" - kc.Service_roots = []string{url} + kc.SetServiceRoots([]string{url}) r, n, url2, err := kc.Get(hash) c.Check(err, Equals, BlockNotFound) @@ -833,6 +502,43 @@ func (s *StandaloneSuite) TestGetFail(c *C) { c.Check(r, Equals, nil) } +type BarHandler struct { + handled chan string +} + +func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.Write([]byte("bar")) + this.handled <- fmt.Sprintf("http://%s", req.Host) +} + +func (s *StandaloneSuite) TestChecksum(c *C) { + foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar"))) + + st := BarHandler{make(chan string, 1)} + + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() + + kc, _ := MakeKeepClient() + kc.ApiToken = "abc123" + kc.SetServiceRoots([]string{url}) + + r, n, _, err := kc.Get(barhash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, nil) + + <-st.handled + + r, n, _, err = kc.Get(foohash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, BadChecksum) + + <-st.handled +} + func (s *StandaloneSuite) TestGetWithFailures(c *C) { hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) @@ -848,21 +554,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { kc, _ := MakeKeepClient() kc.ApiToken = "abc123" - kc.Service_roots = make([]string, 5) + service_roots := make([]string, 5) ks1 := RunSomeFakeKeepServers(st, 1, 2990) ks2 := RunSomeFakeKeepServers(fh, 4, 2991) for i, k := range ks1 { - kc.Service_roots[i] = k.url + service_roots[i] = k.url defer k.listener.Close() } for i, k := range ks2 { - kc.Service_roots[len(ks1)+i] = k.url + service_roots[len(ks1)+i] = k.url defer k.listener.Close() } - sort.Strings(kc.Service_roots) + kc.SetServiceRoots(service_roots) r, n, url2, err := kc.Get(hash) <-fh.handled @@ -883,11 +589,19 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { kc, err := MakeKeepClient() c.Assert(err, Equals, nil) - hash, replicas, err := kc.PutB([]byte("foo")) - c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo")))) - c.Check(replicas, Equals, 2) - c.Check(err, Equals, nil) + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + { + n, _, err := kc.Ask(hash) + c.Check(err, Equals, BlockNotFound) + c.Check(n, Equals, int64(0)) + } + { + hash2, replicas, err := kc.PutB([]byte("foo")) + c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3)) + c.Check(replicas, Equals, 2) + c.Check(err, Equals, nil) + } { r, n, url2, err := kc.Get(hash) c.Check(err, Equals, nil) @@ -898,7 +612,6 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(err2, Equals, nil) c.Check(content, DeepEquals, []byte("foo")) } - { n, url2, err := kc.Ask(hash) c.Check(err, Equals, nil) @@ -906,3 +619,80 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) } } + +type StubProxyHandler struct { + handled chan string +} + +func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.Header().Set("X-Keep-Replicas-Stored", "2") + this.handled <- fmt.Sprintf("http://%s", req.Host) +} + +func (s *StandaloneSuite) TestPutProxy(c *C) { + log.Printf("TestPutProxy") + + st := StubProxyHandler{make(chan string, 1)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.Using_proxy = true + kc.ApiToken = "abc123" + service_roots := make([]string, 1) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + + for i, k := range ks1 { + service_roots[i] = k.url + defer k.listener.Close() + } + + kc.SetServiceRoots(service_roots) + + _, replicas, err := kc.PutB([]byte("foo")) + <-st.handled + + c.Check(err, Equals, nil) + c.Check(replicas, Equals, 2) + + log.Printf("TestPutProxy done") +} + +func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { + log.Printf("TestPutProxy") + + st := StubProxyHandler{make(chan string, 1)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 3 + kc.Using_proxy = true + kc.ApiToken = "abc123" + service_roots := make([]string, 1) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + + for i, k := range ks1 { + service_roots[i] = k.url + defer k.listener.Close() + } + kc.SetServiceRoots(service_roots) + + _, replicas, err := kc.PutB([]byte("foo")) + <-st.handled + + c.Check(err, Equals, InsufficientReplicasError) + c.Check(replicas, Equals, 2) + + log.Printf("TestPutProxy done") +} + +func (s *StandaloneSuite) TestMakeLocator(c *C) { + l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678") + + c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce") + c.Check(l.Size, Equals, 3) + c.Check(l.Signature, Equals, "abcde") + c.Check(l.Timestamp, Equals, "12345678") +}