X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6..c6a6693dc36615effca5e3363b81199362007c59:/sdk/go/src/arvados.org/keepclient/keepclient_test.go diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index fd4ba7ba49..1d3bbeee30 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -1,6 +1,8 @@ package keepclient import ( + "arvados.org/streamer" + "crypto/md5" "flag" "fmt" . "gopkg.in/check.v1" @@ -11,12 +13,15 @@ import ( "net/http" "os" "os/exec" + "sort" + "strings" "testing" - "time" ) // Gocheck boilerplate -func Test(t *testing.T) { TestingT(t) } +func Test(t *testing.T) { + TestingT(t) +} // Gocheck boilerplate var _ = Suite(&ServerRequiredSuite{}) @@ -30,43 +35,49 @@ type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +func pythonDir() string { + gopath := os.Getenv("GOPATH") + return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0]) +} + func (s *ServerRequiredSuite) SetUpSuite(c *C) { if *no_server { c.Skip("Skipping tests that require server") } else { - os.Chdir(os.ExpandEnv("$GOPATH../python")) + os.Chdir(pythonDir()) exec.Command("python", "run_test_server.py", "start").Run() exec.Command("python", "run_test_server.py", "start_keep").Run() } } func (s *ServerRequiredSuite) TearDownSuite(c *C) { - os.Chdir(os.ExpandEnv("$GOPATH../python")) + os.Chdir(pythonDir()) exec.Command("python", "run_test_server.py", "stop_keep").Run() exec.Command("python", "run_test_server.py", "stop").Run() } -func (s *ServerRequiredSuite) TestInit(c *C) { +func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) { os.Setenv("ARVADOS_API_HOST", "localhost:3001") os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") os.Setenv("ARVADOS_API_HOST_INSECURE", "") kc, err := MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, false) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, false) os.Setenv("ARVADOS_API_HOST_INSECURE", "true") kc, err = MakeKeepClient() - c.Assert(kc.ApiServer, Equals, "localhost:3001") - c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") - c.Assert(kc.ApiInsecure, Equals, true) + c.Check(kc.ApiServer, Equals, "localhost:3001") + c.Check(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + c.Check(kc.ApiInsecure, Equals, true) + c.Check(kc.Client.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, Equals, true) c.Assert(err, Equals, nil) - c.Assert(len(kc.Service_roots), Equals, 2) - c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107") - c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108") + c.Check(len(kc.Service_roots), Equals, 2) + c.Check(kc.Service_roots[0], Equals, "http://localhost:25107") + c.Check(kc.Service_roots[1], Equals, "http://localhost:25108") } func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { @@ -74,383 +85,528 @@ func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) { // "foo" acbd18db4cc2f85cedef654fccc4a4d8 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"} - c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) + c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle) // "bar" 37b51d194a7513e45b56f6524f2d51f2 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"} - c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) + c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle) } -func ReadIntoBufferHelper(c *C, bufsize int) { - buffer := make([]byte, bufsize) +type StubPutHandler struct { + c *C + expectPath string + expectApiToken string + expectBody string + handled chan string +} - reader, writer := io.Pipe() - slices := make(chan ReaderSlice) +func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + this.c.Check(req.URL.Path, Equals, "/"+this.expectPath) + this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken)) + body, err := ioutil.ReadAll(req.Body) + this.c.Check(err, Equals, nil) + this.c.Check(body, DeepEquals, []byte(this.expectBody)) + resp.WriteHeader(200) + this.handled <- fmt.Sprintf("http://%s", req.Host) +} - go ReadIntoBuffer(buffer, reader, slices) +func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) { + server := http.Server{Handler: st} - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } + var err error + listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port}) + if err != nil { + panic(fmt.Sprintf("Could not listen on tcp port %v", port)) } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 96) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 96; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 96) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.EOF) - } -} -func (s *StandaloneSuite) TestReadIntoBuffer(c *C) { - ReadIntoBufferHelper(c, 512) - ReadIntoBufferHelper(c, 225) - ReadIntoBufferHelper(c, 224) + url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port) + + go server.Serve(listener) + return listener, url } -func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) { - buffer := make([]byte, 223) +func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string, + io.ReadCloser, io.WriteCloser, chan uploadStatus)) { + + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() + + kc, _ := MakeKeepClient() + kc.ApiToken = "abc123" + reader, writer := io.Pipe() - slices := make(chan ReaderSlice) + upload_status := make(chan uploadStatus) - go ReadIntoBuffer(buffer, reader, slices) + f(kc, url, reader, writer, upload_status) +} - { - out := make([]byte, 128) - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } - writer.Write(out) - s1 := <-slices - c.Check(len(s1.slice), Equals, 128) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 128; i += 1 { - c.Check(s1.slice[i], Equals, byte(i)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - out := make([]byte, 96) - for i := 0; i < 96; i += 1 { - out[i] = byte(i / 2) - } - - // Write will deadlock because it can't write all the data, so - // spin it off to a goroutine - go writer.Write(out) - s1 := <-slices - - c.Check(len(s1.slice), Equals, 95) - c.Check(s1.reader_error, Equals, nil) - for i := 0; i < 95; i += 1 { - c.Check(s1.slice[i], Equals, byte(i/2)) - } - for i := 0; i < len(buffer); i += 1 { - if i < 128 { - c.Check(buffer[i], Equals, byte(i)) - } else if i < (128 + 95) { - c.Check(buffer[i], Equals, byte((i-128)/2)) - } else { - c.Check(buffer[i], Equals, byte(0)) - } - } - } - { - writer.Close() - s1 := <-slices - c.Check(len(s1.slice), Equals, 0) - c.Check(s1.reader_error, Equals, io.ErrShortBuffer) - } +func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { + log.Printf("TestUploadToStubKeepServer") + + st := StubPutHandler{ + c, + "acbd18db4cc2f85cedef654fccc4a4d8", + "abc123", + "foo", + make(chan string)} + + UploadToStubHelper(c, st, + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { + + go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo"))) + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + status := <-upload_status + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + }) + + log.Printf("TestUploadToStubKeepServer done") } -func (s *StandaloneSuite) TestTransfer(c *C) { - reader, writer := io.Pipe() +func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { + log.Printf("TestUploadToStubKeepServerBufferReader") - // Buffer for reads from 'r' - buffer := make([]byte, 512) + st := StubPutHandler{ + c, + "acbd18db4cc2f85cedef654fccc4a4d8", + "abc123", + "foo", + make(chan string)} - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) + UploadToStubHelper(c, st, + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { - // Reporting reader error states - reader_status := make(chan error) + tr := streamer.AsyncStreamFromReader(512, reader) + defer tr.Close() - go Transfer(buffer, reader, requests, reader_status) + br1 := tr.MakeStreamReader() - br1 := MakeBufferReader(requests) - out := make([]byte, 128) + go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3) - { - // Write some data, and read into a buffer shorter than - // available data - for i := 0; i < 128; i += 1 { - out[i] = byte(i) - } + writer.Write([]byte("foo")) + writer.Close() - writer.Write(out[:100]) + <-st.handled - in := make([]byte, 64) - n, err := br1.Read(in) + status := <-upload_status + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + }) - c.Check(n, Equals, 64) - c.Check(err, Equals, nil) + log.Printf("TestUploadToStubKeepServerBufferReader done") +} - for i := 0; i < 64; i += 1 { - c.Check(in[i], Equals, out[i]) - } - } +type FailHandler struct { + handled chan string +} - { - // Write some more data, and read into buffer longer than - // available data - in := make([]byte, 64) - n, err := br1.Read(in) - c.Check(n, Equals, 36) - c.Check(err, Equals, nil) +func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.WriteHeader(500) + this.handled <- fmt.Sprintf("http://%s", req.Host) +} + +func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { + log.Printf("TestFailedUploadToStubKeepServer") + + st := FailHandler{ + make(chan string)} + + hash := "acbd18db4cc2f85cedef654fccc4a4d8" - for i := 0; i < 36; i += 1 { - c.Check(in[i], Equals, out[64+i]) - } + UploadToStubHelper(c, st, + func(kc KeepClient, url string, reader io.ReadCloser, + writer io.WriteCloser, upload_status chan uploadStatus) { + go kc.uploadToKeepServer(url, hash, reader, upload_status, 3) + + writer.Write([]byte("foo")) + writer.Close() + + <-st.handled + + status := <-upload_status + c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash)) + c.Check(status.StatusCode, Equals, 500) + }) + log.Printf("TestFailedUploadToStubKeepServer done") +} + +type KeepServer struct { + listener net.Listener + url string +} + +func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) { + ks = make([]KeepServer, n) + + for i := 0; i < n; i += 1 { + boguslistener, bogusurl := RunBogusKeepServer(st, port+i) + ks[i] = KeepServer{boguslistener, bogusurl} } - { - // Test read before write - type Rd struct { - n int - err error - } - rd := make(chan Rd) - in := make([]byte, 64) - - go func() { - n, err := br1.Read(in) - rd <- Rd{n, err} - }() - - time.Sleep(100 * time.Millisecond) - writer.Write(out[100:]) - - got := <-rd - - c.Check(got.n, Equals, 28) - c.Check(got.err, Equals, nil) - - for i := 0; i < 28; i += 1 { - c.Check(in[i], Equals, out[100+i]) - } + return ks +} + +func (s *StandaloneSuite) TestPutB(c *C) { + log.Printf("TestPutB") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubPutHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks := RunSomeFakeKeepServers(st, 5, 2990) + + for i := 0; i < len(ks); i += 1 { + kc.Service_roots[i] = ks[i].url + defer ks[i].listener.Close() } - br2 := MakeBufferReader(requests) - { - // Test 'catch up' reader - in := make([]byte, 256) - n, err := br2.Read(in) + sort.Strings(kc.Service_roots) - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) + kc.PutB([]byte("foo")) + + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + s1 := <-st.handled + s2 := <-st.handled + c.Check((s1 == shuff[0] && s2 == shuff[1]) || + (s1 == shuff[1] && s2 == shuff[0]), + Equals, + true) + + log.Printf("TestPutB done") +} + +func (s *StandaloneSuite) TestPutHR(c *C) { + log.Printf("TestPutHR") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubPutHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } + ks := RunSomeFakeKeepServers(st, 5, 2990) + + for i := 0; i < len(ks); i += 1 { + kc.Service_roots[i] = ks[i].url + defer ks[i].listener.Close() } - { - // Test closing the reader + sort.Strings(kc.Service_roots) + + reader, writer := io.Pipe() + + go func() { + writer.Write([]byte("foo")) writer.Close() - status := <-reader_status - c.Check(status, Equals, io.EOF) - - in := make([]byte, 256) - n1, err1 := br1.Read(in) - n2, err2 := br2.Read(in) - c.Check(n1, Equals, 0) - c.Check(err1, Equals, io.EOF) - c.Check(n2, Equals, 0) - c.Check(err2, Equals, io.EOF) - } + }() - { - // Test 'catch up' reader after closing - br3 := MakeBufferReader(requests) - in := make([]byte, 256) - n, err := br3.Read(in) + kc.PutHR(hash, reader, 3) - c.Check(n, Equals, 128) - c.Check(err, Equals, nil) + shuff := kc.shuffledServiceRoots(hash) + log.Print(shuff) + + s1 := <-st.handled + s2 := <-st.handled + + c.Check((s1 == shuff[0] && s2 == shuff[1]) || + (s1 == shuff[1] && s2 == shuff[0]), + Equals, + true) + + log.Printf("TestPutHR done") +} + +func (s *StandaloneSuite) TestPutWithFail(c *C) { + log.Printf("TestPutWithFail") - for i := 0; i < 128; i += 1 { - c.Check(in[i], Equals, out[i]) - } + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - n, err = br3.Read(in) + st := StubPutHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 2)} + + fh := FailHandler{ + make(chan string, 1)} - c.Check(n, Equals, 0) - c.Check(err, Equals, io.EOF) + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks1 := RunSomeFakeKeepServers(st, 4, 2990) + ks2 := RunSomeFakeKeepServers(fh, 1, 2995) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + for i, k := range ks2 { + kc.Service_roots[len(ks1)+i] = k.url + defer k.listener.Close() } + + sort.Strings(kc.Service_roots) + + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + phash, replicas, err := kc.PutB([]byte("foo")) + + <-fh.handled + + c.Check(err, Equals, nil) + c.Check(phash, Equals, hash) + c.Check(replicas, Equals, 2) + c.Check(<-st.handled, Equals, shuff[1]) + c.Check(<-st.handled, Equals, shuff[2]) } -func (s *StandaloneSuite) TestTransferShortBuffer(c *C) { - reader, writer := io.Pipe() +func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { + log.Printf("TestPutWithTooManyFail") - // Buffer for reads from 'r' - buffer := make([]byte, 100) + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) - // Read requests on Transfer() buffer - requests := make(chan ReadRequest) - defer close(requests) + st := StubPutHandler{ + c, + hash, + "abc123", + "foo", + make(chan string, 1)} - // Reporting reader error states - reader_status := make(chan error) + fh := FailHandler{ + make(chan string, 4)} - go Transfer(buffer, reader, requests, reader_status) + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + ks2 := RunSomeFakeKeepServers(fh, 4, 2991) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + for i, k := range ks2 { + kc.Service_roots[len(ks1)+i] = k.url + defer k.listener.Close() + } - out := make([]byte, 101) - go writer.Write(out) + sort.Strings(kc.Service_roots) - status := <-reader_status - c.Check(status, Equals, io.ErrShortBuffer) + shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + + _, replicas, err := kc.PutB([]byte("foo")) + + c.Check(err, Equals, InsufficientReplicasError) + c.Check(replicas, Equals, 1) + c.Check(<-st.handled, Equals, shuff[1]) + + log.Printf("TestPutWithTooManyFail done") } -type StubHandler struct { +type StubGetHandler struct { c *C expectPath string expectApiToken string - expectBody string - handled chan bool + returnBody []byte } -func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - this.c.Check(req.URL.Path, Equals, this.expectPath) +func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + this.c.Check(req.URL.Path, Equals, "/"+this.expectPath) this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken)) - body, err := ioutil.ReadAll(req.Body) - this.c.Check(err, Equals, nil) - this.c.Check(body, DeepEquals, []byte(this.expectBody)) - resp.WriteHeader(200) - this.handled <- true + resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody))) + resp.Write(this.returnBody) } -func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { - st := StubHandler{ +func (s *StandaloneSuite) TestGet(c *C) { + log.Printf("TestGet") + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := StubGetHandler{ c, - "/acbd18db4cc2f85cedef654fccc4a4d8", + hash, "abc123", - "foo", - make(chan bool)} - server := http.Server{Handler: st} + []byte("foo")} - listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999}) + listener, url := RunBogusKeepServer(st, 2990) defer listener.Close() - log.Printf("%s", listener.Addr().String()) - - go server.Serve(listener) kc, _ := MakeKeepClient() kc.ApiToken = "abc123" + kc.Service_roots = []string{url} - reader, writer := io.Pipe() - upload_status := make(chan UploadError) + r, n, url2, err := kc.Get(hash) + defer r.Close() + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash)) - go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status) + content, err2 := ioutil.ReadAll(r) + c.Check(err2, Equals, nil) + c.Check(content, DeepEquals, []byte("foo")) - writer.Write([]byte("foo")) - writer.Close() + log.Printf("TestGet done") +} - <-st.handled - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"}) +func (s *StandaloneSuite) TestGetFail(c *C) { + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + st := FailHandler{make(chan string, 1)} + + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() + + kc, _ := MakeKeepClient() + kc.ApiToken = "abc123" + kc.Service_roots = []string{url} + + r, n, url2, err := kc.Get(hash) + c.Check(err, Equals, BlockNotFound) + c.Check(n, Equals, int64(0)) + c.Check(url2, Equals, "") + c.Check(r, Equals, nil) } -type FailHandler struct { - handled chan bool +type BarHandler struct { + handled chan string } -func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - resp.WriteHeader(400) - this.handled <- true +func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.Write([]byte("bar")) + this.handled <- fmt.Sprintf("http://%s", req.Host) } -/*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { - log.Printf("blup") +func (s *StandaloneSuite) TestChecksum(c *C) { + foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar"))) - c.Check(true, Equals, false) + st := BarHandler{make(chan string, 1)} - log.Printf("blug") + listener, url := RunBogusKeepServer(st, 2990) + defer listener.Close() - st := FailHandler{make(chan bool)} - server := http.Server{Handler: st} + kc, _ := MakeKeepClient() + kc.ApiToken = "abc123" + kc.Service_roots = []string{url} - listener, _ := net.ListenTCP("tcp", &net.TCPAddr{}) - defer listener.Close() + r, n, _, err := kc.Get(barhash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, nil) + + <-st.handled + + r, n, _, err = kc.Get(foohash) + _, err = ioutil.ReadAll(r) + c.Check(n, Equals, int64(3)) + c.Check(err, Equals, BadChecksum) + + <-st.handled +} + +func (s *StandaloneSuite) TestGetWithFailures(c *C) { + + hash := fmt.Sprintf("%x", md5.Sum([]byte("foo"))) + + fh := FailHandler{ + make(chan string, 1)} + + st := StubGetHandler{ + c, + hash, + "abc123", + []byte("foo")} - go server.Serve(listener) kc, _ := MakeKeepClient() kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 5) - reader, writer := io.Pipe() - upload_status := make(chan UploadError) + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + ks2 := RunSomeFakeKeepServers(fh, 4, 2991) - go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status) + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + for i, k := range ks2 { + kc.Service_roots[len(ks1)+i] = k.url + defer k.listener.Close() + } - log.Printf("Writing 1") + sort.Strings(kc.Service_roots) - writer.Write([]byte("foo")) + r, n, url2, err := kc.Get(hash) + <-fh.handled + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash)) - log.Printf("Writing 2") + content, err2 := ioutil.ReadAll(r) + c.Check(err2, Equals, nil) + c.Check(content, DeepEquals, []byte("foo")) +} - writer.Close() +func (s *ServerRequiredSuite) TestPutGetHead(c *C) { + os.Setenv("ARVADOS_API_HOST", "localhost:3001") + os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") + os.Setenv("ARVADOS_API_HOST_INSECURE", "true") + + kc, err := MakeKeepClient() + c.Assert(err, Equals, nil) - log.Printf("Writing 3") + hash, replicas, err := kc.PutB([]byte("foo")) + c.Check(hash, Equals, fmt.Sprintf("%x", md5.Sum([]byte("foo")))) + c.Check(replicas, Equals, 2) + c.Check(err, Equals, nil) - <-st.handled + { + r, n, url2, err := kc.Get(hash) + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) - log.Printf("Handled?!") + content, err2 := ioutil.ReadAll(r) + c.Check(err2, Equals, nil) + c.Check(content, DeepEquals, []byte("foo")) + } - status := <-upload_status - c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"}) -}*/ + { + n, url2, err := kc.Ask(hash) + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) + } +}