X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fb7f238945e33b07f1c80b0623315c1ecf86bca2..2b19cf9f9522dd0e8774031a54ce695e73fb72fe:/sdk/go/src/arvados.org/keepclient/keepclient_test.go diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index c87b87e37c..f8144f1a86 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -1,6 +1,7 @@ package keepclient import ( + "crypto/md5" "flag" "fmt" . "gopkg.in/check.v1" @@ -11,6 +12,7 @@ import ( "net/http" "os" "os/exec" + "sort" "testing" "time" ) @@ -357,12 +359,84 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { c.Check(status, Equals, io.ErrShortBuffer) } +func (s *StandaloneSuite) TestTransferFromBuffer(c *C) { + // Buffer for reads from 'r' + buffer := make([]byte, 100) + for i := 0; i < 100; i += 1 { + buffer[i] = byte(i) + } + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + go Transfer(buffer, nil, requests, nil) + + br1 := MakeBufferReader(requests) + + in := make([]byte, 64) + { + n, err := br1.Read(in) + + c.Check(n, Equals, 64) + c.Check(err, Equals, nil) + + for i := 0; i < 64; i += 1 { + c.Check(in[i], Equals, buffer[i]) + } + } + { + n, err := br1.Read(in) + + c.Check(n, Equals, 36) + c.Check(err, Equals, nil) + + for i := 0; i < 36; i += 1 { + c.Check(in[i], Equals, buffer[64+i]) + } + } + { + n, err := br1.Read(in) + + c.Check(n, Equals, 0) + c.Check(err, Equals, io.EOF) + } +} + +func (s *StandaloneSuite) TestTransferIoCopy(c *C) { + // Buffer for reads from 'r' + buffer := make([]byte, 100) + for i := 0; i < 100; i += 1 { + buffer[i] = byte(i) + } + + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) + + go Transfer(buffer, nil, requests, nil) + + br1 := MakeBufferReader(requests) + + reader, writer := io.Pipe() + + go func() { + p := make([]byte, 100) + n, err := reader.Read(p) + c.Check(n, Equals, 100) + c.Check(err, Equals, nil) + c.Check(p, DeepEquals, buffer) + }() + + io.Copy(writer, br1) +} + type StubHandler struct { c *C expectPath string expectApiToken string expectBody string - handled chan bool + handled chan string } func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { @@ -372,132 +446,314 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { this.c.Check(err, Equals, nil) this.c.Check(body, DeepEquals, []byte(this.expectBody)) resp.WriteHeader(200) - this.handled <- true + this.handled <- fmt.Sprintf("http://%s", req.Host) } -func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler, - io.ReadCloser, io.WriteCloser, chan UploadError)) { - - st := StubHandler{ - c, - "acbd18db4cc2f85cedef654fccc4a4d8", - "abc123", - "foo", - make(chan bool)} +func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) { server := http.Server{Handler: st} - listener, _ := net.ListenTCP("tcp", &net.TCPAddr{}) - defer listener.Close() + var err error + listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port}) + if err != nil { + panic(fmt.Sprintf("Could not listen on tcp port %v", port)) + } - url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port) + url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port) go server.Serve(listener) + return listener, url +} + +func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, + io.ReadCloser, io.WriteCloser, chan UploadStatus)) { + + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() + kc, _ := MakeKeepClient() kc.ApiToken = "abc123" reader, writer := io.Pipe() - upload_status := make(chan UploadError) + upload_status := make(chan UploadStatus) - f(kc, url, st, reader, writer, upload_status) + f(kc, url, reader, writer, upload_status) } func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { - log.Printf("Started TestUploadToStubKeepServer") + st := StubHandler{ + c, + "acbd18db4cc2f85cedef654fccc4a4d8", + "abc123", + "foo", + make(chan string)} - UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler, - reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) { + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan UploadStatus) { - go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status) + go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo"))) - writer.Write([]byte("foo")) - writer.Close() + writer.Write([]byte("foo")) + writer.Close() - <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)}) - }) + <-st.handled + status := <-upload_status + c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + }) } func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { - log.Printf("Started TestUploadToStubKeepServerBufferReader") + st := StubHandler{ + c, + "acbd18db4cc2f85cedef654fccc4a4d8", + "abc123", + "foo", + make(chan string)} - UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler, - reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) { + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan UploadStatus) { - // Buffer for reads from 'r' - buffer := make([]byte, 512) + // Buffer for reads from 'r' + buffer := make([]byte, 512) - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) + // Read requests on Transfer() buffer + requests := make(chan ReadRequest) + defer close(requests) - // Reporting reader error states - reader_status := make(chan error) + // Reporting reader error states + reader_status := make(chan error) - go Transfer(buffer, reader, requests, reader_status) + go Transfer(buffer, reader, requests, reader_status) - br1 := MakeBufferReader(requests) + br1 := MakeBufferReader(requests) - go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status) + go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3) - writer.Write([]byte("foo")) - writer.Close() + writer.Write([]byte("foo")) + writer.Close() - <-reader_status - <-st.handled + <-reader_status + <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)}) + status := <-upload_status + c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) - //c.Check(true, Equals, false) - }) + //c.Check(true, Equals, false) + }) } type FailHandler struct { - handled chan bool + handled chan string } func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { resp.WriteHeader(400) - this.handled <- true + this.handled <- fmt.Sprintf("http://%s", req.Host) } -/*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { - log.Printf("blup") +func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { + st := FailHandler{ + make(chan string)} - c.Check(true, Equals, false) + hash := "acbd18db4cc2f85cedef654fccc4a4d8" - log.Printf("blug") + UploadToStubHelper(c, st, + func(kc *KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan UploadStatus) { - st := FailHandler{make(chan bool)} - server := http.Server{Handler: st} + go kc.uploadToKeepServer(url, hash, reader, upload_status, 3) - listener, _ := net.ListenTCP("tcp", &net.TCPAddr{}) - defer listener.Close() + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + + status := <-upload_status + c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash)) + c.Check(status.StatusCode, Equals, 400) + }) + +} + +type KeepServer struct { + listener net.Listener + url string +} + +func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) { + ks = make([]KeepServer, n) + + for i := 0; i < n; i += 1 { + boguslistener, bogusurl := RunBogusKeepServer(st, port+i) + ks[i] = KeepServer{boguslistener, bogusurl} + } + + return ks +} + +func (s *StandaloneSuite) TestPutB(c *C) { + log.Printf("TestPutB") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks := RunSomeFakeKeepServers(st, 5, 2990) + + for i := 0; i < len(ks); i += 1 { + kc.Service_roots[i] = ks[i].url + defer ks[i].listener.Close() + } + + sort.Strings(kc.Service_roots) + + kc.PutB([]byte("foo")) + + shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + c.Check(<-st.handled, Equals, shuff[0]) + c.Check(<-st.handled, Equals, shuff[1]) +} + +func (s *StandaloneSuite) TestPutHR(c *C) { + log.Printf("TestPutHR") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} - go server.Serve(listener) kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks := RunSomeFakeKeepServers(st, 5, 2990) + + for i := 0; i < len(ks); i += 1 { + kc.Service_roots[i] = ks[i].url + defer ks[i].listener.Close() + } + + sort.Strings(kc.Service_roots) reader, writer := io.Pipe() - upload_status := make(chan UploadError) - go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status) + go func() { + writer.Write([]byte("foo")) + writer.Close() + }() - log.Printf("Writing 1") + kc.PutHR(hash, reader, 3) - writer.Write([]byte("foo")) + shuff := kc.ShuffledServiceRoots(hash) - log.Printf("Writing 2") + c.Check(<-st.handled, Equals, shuff[0]) + c.Check(<-st.handled, Equals, shuff[1]) +} - writer.Close() +func (s *StandaloneSuite) TestPutWithFail(c *C) { + log.Printf("TestPutWithFail") - log.Printf("Writing 3") + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - <-st.handled + st := StubHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} + + fh := FailHandler{ + make(chan string, 1)} - log.Printf("Handled?!") + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks1 := RunSomeFakeKeepServers(st, 4, 2990) + ks2 := RunSomeFakeKeepServers(fh, 1, 2995) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + for i, k := range ks2 { + kc.Service_roots[len(ks1)+i] = k.url + defer k.listener.Close() + } + + sort.Strings(kc.Service_roots) + + shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + err := kc.PutB([]byte("foo")) + + <-fh.handled + + c.Check(err, Equals, nil) + c.Check(<-st.handled, Equals, shuff[1]) + c.Check(<-st.handled, Equals, shuff[2]) +} - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"}) -}*/ +func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { + log.Printf("TestPutWithTooManyFail") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 1)} + + fh := FailHandler{ + make(chan string, 4)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + ks2 := RunSomeFakeKeepServers(fh, 4, 2991) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + for i, k := range ks2 { + kc.Service_roots[len(ks1)+i] = k.url + defer k.listener.Close() + } + + sort.Strings(kc.Service_roots) + + shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + err := kc.PutB([]byte("foo")) + + c.Check(err, Equals, InsufficientReplicasError) + c.Check(<-st.handled, Equals, shuff[1]) +}