4 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
5 "git.curoverse.com/arvados.git/sdk/go/streamer"
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
39 return fmt.Sprintf("%s/../../python/tests", cwd)
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
49 cmd := exec.Command("python", "run_test_server.py", "start")
50 stderr, err := cmd.StderrPipe()
52 log.Fatalf("Setting up stderr pipe: %s", err)
54 go io.Copy(os.Stderr, stderr)
55 if err := cmd.Run(); err != nil {
56 panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
60 cmd := exec.Command("python", "run_test_server.py", "start_keep")
61 stderr, err := cmd.StderrPipe()
63 log.Fatalf("Setting up stderr pipe: %s", err)
65 go io.Copy(os.Stderr, stderr)
66 if err := cmd.Run(); err != nil {
67 panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
72 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
74 exec.Command("python", "run_test_server.py", "stop_keep").Run()
75 exec.Command("python", "run_test_server.py", "stop").Run()
78 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
79 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
80 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
81 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
83 arv, err := arvadosclient.MakeArvadosClient()
84 c.Assert(err, Equals, nil)
86 kc, err := MakeKeepClient(&arv)
88 c.Assert(err, Equals, nil)
89 c.Check(len(kc.ServiceRoots()), Equals, 2)
90 c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:25107")
91 c.Check(kc.ServiceRoots()[1], Equals, "http://localhost:25108")
94 func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
96 kc.SetServiceRoots([]string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"})
98 // "foo" acbd18db4cc2f85cedef654fccc4a4d8
99 foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
100 c.Check(kc.shuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
102 // "bar" 37b51d194a7513e45b56f6524f2d51f2
103 bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
104 c.Check(kc.shuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
107 type StubPutHandler struct {
110 expectApiToken string
115 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
116 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
117 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
118 body, err := ioutil.ReadAll(req.Body)
119 this.c.Check(err, Equals, nil)
120 this.c.Check(body, DeepEquals, []byte(this.expectBody))
121 resp.WriteHeader(200)
122 this.handled <- fmt.Sprintf("http://%s", req.Host)
125 func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
127 listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
129 panic(fmt.Sprintf("Could not listen on tcp port %v", port))
132 url = fmt.Sprintf("http://localhost:%d", port)
134 go http.Serve(listener, st)
138 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
139 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
141 listener, url := RunBogusKeepServer(st, 2990)
142 defer listener.Close()
144 arv, _ := arvadosclient.MakeArvadosClient()
145 arv.ApiToken = "abc123"
147 kc, _ := MakeKeepClient(&arv)
149 reader, writer := io.Pipe()
150 upload_status := make(chan uploadStatus)
152 f(kc, url, reader, writer, upload_status)
155 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
156 log.Printf("TestUploadToStubKeepServer")
158 st := StubPutHandler{
160 "acbd18db4cc2f85cedef654fccc4a4d8",
165 UploadToStubHelper(c, st,
166 func(kc KeepClient, url string, reader io.ReadCloser,
167 writer io.WriteCloser, upload_status chan uploadStatus) {
169 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
171 writer.Write([]byte("foo"))
175 status := <-upload_status
176 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
179 log.Printf("TestUploadToStubKeepServer done")
182 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
183 log.Printf("TestUploadToStubKeepServerBufferReader")
185 st := StubPutHandler{
187 "acbd18db4cc2f85cedef654fccc4a4d8",
192 UploadToStubHelper(c, st,
193 func(kc KeepClient, url string, reader io.ReadCloser,
194 writer io.WriteCloser, upload_status chan uploadStatus) {
196 tr := streamer.AsyncStreamFromReader(512, reader)
199 br1 := tr.MakeStreamReader()
201 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
203 writer.Write([]byte("foo"))
208 status := <-upload_status
209 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
212 log.Printf("TestUploadToStubKeepServerBufferReader done")
215 type FailHandler struct {
219 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
220 resp.WriteHeader(500)
221 this.handled <- fmt.Sprintf("http://%s", req.Host)
224 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
225 log.Printf("TestFailedUploadToStubKeepServer")
230 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
232 UploadToStubHelper(c, st,
233 func(kc KeepClient, url string, reader io.ReadCloser,
234 writer io.WriteCloser, upload_status chan uploadStatus) {
236 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
238 writer.Write([]byte("foo"))
243 status := <-upload_status
244 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
245 c.Check(status.statusCode, Equals, 500)
247 log.Printf("TestFailedUploadToStubKeepServer done")
250 type KeepServer struct {
251 listener net.Listener
255 func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
256 ks = make([]KeepServer, n)
258 for i := 0; i < n; i += 1 {
259 boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
260 ks[i] = KeepServer{boguslistener, bogusurl}
266 func (s *StandaloneSuite) TestPutB(c *C) {
267 log.Printf("TestPutB")
269 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
271 st := StubPutHandler{
276 make(chan string, 2)}
278 arv, _ := arvadosclient.MakeArvadosClient()
279 kc, _ := MakeKeepClient(&arv)
282 arv.ApiToken = "abc123"
283 service_roots := make([]string, 5)
285 ks := RunSomeFakeKeepServers(st, 5, 2990)
287 for i := 0; i < len(ks); i += 1 {
288 service_roots[i] = ks[i].url
289 defer ks[i].listener.Close()
292 kc.SetServiceRoots(service_roots)
294 kc.PutB([]byte("foo"))
296 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
300 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
301 (s1 == shuff[1] && s2 == shuff[0]),
305 log.Printf("TestPutB done")
308 func (s *StandaloneSuite) TestPutHR(c *C) {
309 log.Printf("TestPutHR")
311 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
313 st := StubPutHandler{
318 make(chan string, 2)}
320 arv, _ := arvadosclient.MakeArvadosClient()
321 kc, _ := MakeKeepClient(&arv)
324 arv.ApiToken = "abc123"
325 service_roots := make([]string, 5)
327 ks := RunSomeFakeKeepServers(st, 5, 2990)
329 for i := 0; i < len(ks); i += 1 {
330 service_roots[i] = ks[i].url
331 defer ks[i].listener.Close()
334 kc.SetServiceRoots(service_roots)
336 reader, writer := io.Pipe()
339 writer.Write([]byte("foo"))
343 kc.PutHR(hash, reader, 3)
345 shuff := kc.shuffledServiceRoots(hash)
351 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
352 (s1 == shuff[1] && s2 == shuff[0]),
356 log.Printf("TestPutHR done")
359 func (s *StandaloneSuite) TestPutWithFail(c *C) {
360 log.Printf("TestPutWithFail")
362 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
364 st := StubPutHandler{
369 make(chan string, 2)}
372 make(chan string, 1)}
374 arv, err := arvadosclient.MakeArvadosClient()
375 kc, _ := MakeKeepClient(&arv)
378 arv.ApiToken = "abc123"
379 service_roots := make([]string, 5)
381 ks1 := RunSomeFakeKeepServers(st, 4, 2990)
382 ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
384 for i, k := range ks1 {
385 service_roots[i] = k.url
386 defer k.listener.Close()
388 for i, k := range ks2 {
389 service_roots[len(ks1)+i] = k.url
390 defer k.listener.Close()
393 kc.SetServiceRoots(service_roots)
395 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
397 phash, replicas, err := kc.PutB([]byte("foo"))
401 c.Check(err, Equals, nil)
402 c.Check(phash, Equals, "")
403 c.Check(replicas, Equals, 2)
404 c.Check(<-st.handled, Equals, shuff[1])
405 c.Check(<-st.handled, Equals, shuff[2])
408 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
409 log.Printf("TestPutWithTooManyFail")
411 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
413 st := StubPutHandler{
418 make(chan string, 1)}
421 make(chan string, 4)}
423 arv, err := arvadosclient.MakeArvadosClient()
424 kc, _ := MakeKeepClient(&arv)
427 arv.ApiToken = "abc123"
428 service_roots := make([]string, 5)
430 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
431 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
433 for i, k := range ks1 {
434 service_roots[i] = k.url
435 defer k.listener.Close()
437 for i, k := range ks2 {
438 service_roots[len(ks1)+i] = k.url
439 defer k.listener.Close()
442 kc.SetServiceRoots(service_roots)
444 shuff := kc.shuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
446 _, replicas, err := kc.PutB([]byte("foo"))
448 c.Check(err, Equals, InsufficientReplicasError)
449 c.Check(replicas, Equals, 1)
450 c.Check(<-st.handled, Equals, shuff[1])
452 log.Printf("TestPutWithTooManyFail done")
455 type StubGetHandler struct {
458 expectApiToken string
462 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
463 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
464 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
465 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
466 resp.Write(this.returnBody)
469 func (s *StandaloneSuite) TestGet(c *C) {
470 log.Printf("TestGet")
472 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
474 st := StubGetHandler{
480 listener, url := RunBogusKeepServer(st, 2990)
481 defer listener.Close()
483 arv, err := arvadosclient.MakeArvadosClient()
484 kc, _ := MakeKeepClient(&arv)
485 arv.ApiToken = "abc123"
486 kc.SetServiceRoots([]string{url})
488 r, n, url2, err := kc.Get(hash)
490 c.Check(err, Equals, nil)
491 c.Check(n, Equals, int64(3))
492 c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
494 content, err2 := ioutil.ReadAll(r)
495 c.Check(err2, Equals, nil)
496 c.Check(content, DeepEquals, []byte("foo"))
498 log.Printf("TestGet done")
501 func (s *StandaloneSuite) TestGetFail(c *C) {
502 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
504 st := FailHandler{make(chan string, 1)}
506 listener, url := RunBogusKeepServer(st, 2990)
507 defer listener.Close()
509 arv, err := arvadosclient.MakeArvadosClient()
510 kc, _ := MakeKeepClient(&arv)
511 arv.ApiToken = "abc123"
512 kc.SetServiceRoots([]string{url})
514 r, n, url2, err := kc.Get(hash)
515 c.Check(err, Equals, BlockNotFound)
516 c.Check(n, Equals, int64(0))
517 c.Check(url2, Equals, "")
518 c.Check(r, Equals, nil)
521 type BarHandler struct {
525 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
526 resp.Write([]byte("bar"))
527 this.handled <- fmt.Sprintf("http://%s", req.Host)
530 func (s *StandaloneSuite) TestChecksum(c *C) {
531 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
532 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
534 st := BarHandler{make(chan string, 1)}
536 listener, url := RunBogusKeepServer(st, 2990)
537 defer listener.Close()
539 arv, err := arvadosclient.MakeArvadosClient()
540 kc, _ := MakeKeepClient(&arv)
541 arv.ApiToken = "abc123"
542 kc.SetServiceRoots([]string{url})
544 r, n, _, err := kc.Get(barhash)
545 _, err = ioutil.ReadAll(r)
546 c.Check(n, Equals, int64(3))
547 c.Check(err, Equals, nil)
551 r, n, _, err = kc.Get(foohash)
552 _, err = ioutil.ReadAll(r)
553 c.Check(n, Equals, int64(3))
554 c.Check(err, Equals, BadChecksum)
559 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
561 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
564 make(chan string, 1)}
566 st := StubGetHandler{
572 arv, err := arvadosclient.MakeArvadosClient()
573 kc, _ := MakeKeepClient(&arv)
574 arv.ApiToken = "abc123"
575 service_roots := make([]string, 5)
577 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
578 ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
580 for i, k := range ks1 {
581 service_roots[i] = k.url
582 defer k.listener.Close()
584 for i, k := range ks2 {
585 service_roots[len(ks1)+i] = k.url
586 defer k.listener.Close()
589 kc.SetServiceRoots(service_roots)
591 r, n, url2, err := kc.Get(hash)
593 c.Check(err, Equals, nil)
594 c.Check(n, Equals, int64(3))
595 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
597 content, err2 := ioutil.ReadAll(r)
598 c.Check(err2, Equals, nil)
599 c.Check(content, DeepEquals, []byte("foo"))
602 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
603 os.Setenv("ARVADOS_API_HOST", "localhost:3001")
604 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
605 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
607 arv, err := arvadosclient.MakeArvadosClient()
608 kc, err := MakeKeepClient(&arv)
609 c.Assert(err, Equals, nil)
611 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
614 n, _, err := kc.Ask(hash)
615 c.Check(err, Equals, BlockNotFound)
616 c.Check(n, Equals, int64(0))
619 hash2, replicas, err := kc.PutB([]byte("foo"))
620 c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
621 c.Check(replicas, Equals, 2)
622 c.Check(err, Equals, nil)
625 r, n, url2, err := kc.Get(hash)
626 c.Check(err, Equals, nil)
627 c.Check(n, Equals, int64(3))
628 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
630 content, err2 := ioutil.ReadAll(r)
631 c.Check(err2, Equals, nil)
632 c.Check(content, DeepEquals, []byte("foo"))
635 n, url2, err := kc.Ask(hash)
636 c.Check(err, Equals, nil)
637 c.Check(n, Equals, int64(3))
638 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
642 type StubProxyHandler struct {
646 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
647 resp.Header().Set("X-Keep-Replicas-Stored", "2")
648 this.handled <- fmt.Sprintf("http://%s", req.Host)
651 func (s *StandaloneSuite) TestPutProxy(c *C) {
652 log.Printf("TestPutProxy")
654 st := StubProxyHandler{make(chan string, 1)}
656 arv, err := arvadosclient.MakeArvadosClient()
657 kc, _ := MakeKeepClient(&arv)
660 kc.Using_proxy = true
661 arv.ApiToken = "abc123"
662 service_roots := make([]string, 1)
664 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
666 for i, k := range ks1 {
667 service_roots[i] = k.url
668 defer k.listener.Close()
671 kc.SetServiceRoots(service_roots)
673 _, replicas, err := kc.PutB([]byte("foo"))
676 c.Check(err, Equals, nil)
677 c.Check(replicas, Equals, 2)
679 log.Printf("TestPutProxy done")
682 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
683 log.Printf("TestPutProxy")
685 st := StubProxyHandler{make(chan string, 1)}
687 arv, err := arvadosclient.MakeArvadosClient()
688 kc, _ := MakeKeepClient(&arv)
691 kc.Using_proxy = true
692 arv.ApiToken = "abc123"
693 service_roots := make([]string, 1)
695 ks1 := RunSomeFakeKeepServers(st, 1, 2990)
697 for i, k := range ks1 {
698 service_roots[i] = k.url
699 defer k.listener.Close()
701 kc.SetServiceRoots(service_roots)
703 _, replicas, err := kc.PutB([]byte("foo"))
706 c.Check(err, Equals, InsufficientReplicasError)
707 c.Check(replicas, Equals, 2)
709 log.Printf("TestPutProxy done")
712 func (s *StandaloneSuite) TestMakeLocator(c *C) {
713 l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
715 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
716 c.Check(l.Size, Equals, 3)
717 c.Check(l.Signature, Equals, "abcde")
718 c.Check(l.Timestamp, Equals, "12345678")