7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/streamer"
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
39 return fmt.Sprintf("%s/../../python/tests", cwd)
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
49 cmd := exec.Command("python", "run_test_server.py", "start")
50 stderr, err := cmd.StderrPipe()
52 log.Fatalf("Setting up stderr pipe: %s", err)
54 go io.Copy(os.Stderr, stderr)
55 if err := cmd.Run(); err != nil {
56 panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
60 cmd := exec.Command("python", "run_test_server.py", "start_keep")
61 stderr, err := cmd.StderrPipe()
63 log.Fatalf("Setting up stderr pipe: %s", err)
65 go io.Copy(os.Stderr, stderr)
66 if err := cmd.Run(); err != nil {
67 panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
72 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
74 exec.Command("python", "run_test_server.py", "stop_keep").Run()
75 exec.Command("python", "run_test_server.py", "stop").Run()
78 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
79 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
80 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
81 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
83 arv, err := arvadosclient.MakeArvadosClient()
84 c.Assert(err, Equals, nil)
86 kc, err := MakeKeepClient(&arv)
88 c.Assert(err, Equals, nil)
89 c.Check(len(kc.ServiceRoots()), Equals, 2)
90 for _, root := range kc.ServiceRoots() {
91 c.Check(root, Matches, "http://localhost:2510[\\d]")
95 type StubPutHandler struct {
103 func (this StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
105 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
106 body, err := ioutil.ReadAll(req.Body)
107 this.c.Check(err, Equals, nil)
108 this.c.Check(body, DeepEquals, []byte(this.expectBody))
109 resp.WriteHeader(200)
110 this.handled <- fmt.Sprintf("http://%s", req.Host)
113 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
115 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
117 panic(fmt.Sprintf("Could not listen on any port"))
119 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
120 go http.Serve(ks.listener, st)
124 func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
125 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
127 ks := RunFakeKeepServer(st)
128 defer ks.listener.Close()
130 arv, _ := arvadosclient.MakeArvadosClient()
131 arv.ApiToken = "abc123"
133 kc, _ := MakeKeepClient(&arv)
135 reader, writer := io.Pipe()
136 upload_status := make(chan uploadStatus)
138 f(kc, ks.url, reader, writer, upload_status)
141 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
142 log.Printf("TestUploadToStubKeepServer")
144 st := StubPutHandler{
146 "acbd18db4cc2f85cedef654fccc4a4d8",
151 UploadToStubHelper(c, st,
152 func(kc KeepClient, url string, reader io.ReadCloser,
153 writer io.WriteCloser, upload_status chan uploadStatus) {
155 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
157 writer.Write([]byte("foo"))
161 status := <-upload_status
162 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
165 log.Printf("TestUploadToStubKeepServer done")
168 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
169 log.Printf("TestUploadToStubKeepServerBufferReader")
171 st := StubPutHandler{
173 "acbd18db4cc2f85cedef654fccc4a4d8",
178 UploadToStubHelper(c, st,
179 func(kc KeepClient, url string, reader io.ReadCloser,
180 writer io.WriteCloser, upload_status chan uploadStatus) {
182 tr := streamer.AsyncStreamFromReader(512, reader)
185 br1 := tr.MakeStreamReader()
187 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
189 writer.Write([]byte("foo"))
194 status := <-upload_status
195 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
198 log.Printf("TestUploadToStubKeepServerBufferReader done")
201 type FailHandler struct {
205 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
206 resp.WriteHeader(500)
207 this.handled <- fmt.Sprintf("http://%s", req.Host)
210 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
211 log.Printf("TestFailedUploadToStubKeepServer")
216 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
218 UploadToStubHelper(c, st,
219 func(kc KeepClient, url string, reader io.ReadCloser,
220 writer io.WriteCloser, upload_status chan uploadStatus) {
222 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
224 writer.Write([]byte("foo"))
229 status := <-upload_status
230 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
231 c.Check(status.statusCode, Equals, 500)
233 log.Printf("TestFailedUploadToStubKeepServer done")
236 type KeepServer struct {
237 listener net.Listener
241 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
242 ks = make([]KeepServer, n)
244 for i := 0; i < n; i += 1 {
245 ks[i] = RunFakeKeepServer(st)
251 func (s *StandaloneSuite) TestPutB(c *C) {
252 log.Printf("TestPutB")
254 hash := Md5String("foo")
256 st := StubPutHandler{
261 make(chan string, 5)}
263 arv, _ := arvadosclient.MakeArvadosClient()
264 kc, _ := MakeKeepClient(&arv)
267 arv.ApiToken = "abc123"
268 service_roots := make(map[string]string)
270 ks := RunSomeFakeKeepServers(st, 5)
272 for i, k := range ks {
273 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
274 defer k.listener.Close()
277 kc.SetServiceRoots(service_roots)
279 kc.PutB([]byte("foo"))
281 shuff := NewRootSorter(
282 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
286 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
287 (s1 == shuff[1] && s2 == shuff[0]),
291 log.Printf("TestPutB done")
294 func (s *StandaloneSuite) TestPutHR(c *C) {
295 log.Printf("TestPutHR")
297 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
299 st := StubPutHandler{
304 make(chan string, 5)}
306 arv, _ := arvadosclient.MakeArvadosClient()
307 kc, _ := MakeKeepClient(&arv)
310 arv.ApiToken = "abc123"
311 service_roots := make(map[string]string)
313 ks := RunSomeFakeKeepServers(st, 5)
315 for i, k := range ks {
316 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
317 defer k.listener.Close()
320 kc.SetServiceRoots(service_roots)
322 reader, writer := io.Pipe()
325 writer.Write([]byte("foo"))
329 kc.PutHR(hash, reader, 3)
331 shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
337 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
338 (s1 == shuff[1] && s2 == shuff[0]),
342 log.Printf("TestPutHR done")
345 func (s *StandaloneSuite) TestPutWithFail(c *C) {
346 log.Printf("TestPutWithFail")
348 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
350 st := StubPutHandler{
355 make(chan string, 4)}
358 make(chan string, 1)}
360 arv, err := arvadosclient.MakeArvadosClient()
361 kc, _ := MakeKeepClient(&arv)
364 arv.ApiToken = "abc123"
365 service_roots := make(map[string]string)
367 ks1 := RunSomeFakeKeepServers(st, 4)
368 ks2 := RunSomeFakeKeepServers(fh, 1)
370 for i, k := range ks1 {
371 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
372 defer k.listener.Close()
374 for i, k := range ks2 {
375 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
376 defer k.listener.Close()
379 kc.SetServiceRoots(service_roots)
381 shuff := NewRootSorter(
382 kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
384 phash, replicas, err := kc.PutB([]byte("foo"))
388 c.Check(err, Equals, nil)
389 c.Check(phash, Equals, "")
390 c.Check(replicas, Equals, 2)
395 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
396 (s1 == shuff[2] && s2 == shuff[1]),
401 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
402 log.Printf("TestPutWithTooManyFail")
404 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
406 st := StubPutHandler{
411 make(chan string, 1)}
414 make(chan string, 4)}
416 arv, err := arvadosclient.MakeArvadosClient()
417 kc, _ := MakeKeepClient(&arv)
420 arv.ApiToken = "abc123"
421 service_roots := make(map[string]string)
423 ks1 := RunSomeFakeKeepServers(st, 1)
424 ks2 := RunSomeFakeKeepServers(fh, 4)
426 for i, k := range ks1 {
427 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
428 defer k.listener.Close()
430 for i, k := range ks2 {
431 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
432 defer k.listener.Close()
435 kc.SetServiceRoots(service_roots)
437 _, replicas, err := kc.PutB([]byte("foo"))
439 c.Check(err, Equals, InsufficientReplicasError)
440 c.Check(replicas, Equals, 1)
441 c.Check(<-st.handled, Equals, ks1[0].url)
443 log.Printf("TestPutWithTooManyFail done")
446 type StubGetHandler struct {
449 expectApiToken string
453 func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
454 this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
455 this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
456 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(this.returnBody)))
457 resp.Write(this.returnBody)
460 func (s *StandaloneSuite) TestGet(c *C) {
461 log.Printf("TestGet")
463 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
465 st := StubGetHandler{
471 ks := RunFakeKeepServer(st)
472 defer ks.listener.Close()
474 arv, err := arvadosclient.MakeArvadosClient()
475 kc, _ := MakeKeepClient(&arv)
476 arv.ApiToken = "abc123"
477 kc.SetServiceRoots(map[string]string{"x": ks.url})
479 r, n, url2, err := kc.Get(hash)
481 c.Check(err, Equals, nil)
482 c.Check(n, Equals, int64(3))
483 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
485 content, err2 := ioutil.ReadAll(r)
486 c.Check(err2, Equals, nil)
487 c.Check(content, DeepEquals, []byte("foo"))
489 log.Printf("TestGet done")
492 func (s *StandaloneSuite) TestGetFail(c *C) {
493 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
495 st := FailHandler{make(chan string, 1)}
497 ks := RunFakeKeepServer(st)
498 defer ks.listener.Close()
500 arv, err := arvadosclient.MakeArvadosClient()
501 kc, _ := MakeKeepClient(&arv)
502 arv.ApiToken = "abc123"
503 kc.SetServiceRoots(map[string]string{"x": ks.url})
505 r, n, url2, err := kc.Get(hash)
506 c.Check(err, Equals, BlockNotFound)
507 c.Check(n, Equals, int64(0))
508 c.Check(url2, Equals, "")
509 c.Check(r, Equals, nil)
512 type BarHandler struct {
516 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
517 resp.Write([]byte("bar"))
518 this.handled <- fmt.Sprintf("http://%s", req.Host)
521 func (s *StandaloneSuite) TestChecksum(c *C) {
522 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
523 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
525 st := BarHandler{make(chan string, 1)}
527 ks := RunFakeKeepServer(st)
528 defer ks.listener.Close()
530 arv, err := arvadosclient.MakeArvadosClient()
531 kc, _ := MakeKeepClient(&arv)
532 arv.ApiToken = "abc123"
533 kc.SetServiceRoots(map[string]string{"x": ks.url})
535 r, n, _, err := kc.Get(barhash)
536 _, err = ioutil.ReadAll(r)
537 c.Check(n, Equals, int64(3))
538 c.Check(err, Equals, nil)
542 r, n, _, err = kc.Get(foohash)
543 _, err = ioutil.ReadAll(r)
544 c.Check(n, Equals, int64(3))
545 c.Check(err, Equals, BadChecksum)
550 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
551 content := []byte("waz")
552 hash := fmt.Sprintf("%x", md5.Sum(content))
555 make(chan string, 4)}
557 st := StubGetHandler{
563 arv, err := arvadosclient.MakeArvadosClient()
564 kc, _ := MakeKeepClient(&arv)
565 arv.ApiToken = "abc123"
566 service_roots := make(map[string]string)
568 ks1 := RunSomeFakeKeepServers(st, 1)
569 ks2 := RunSomeFakeKeepServers(fh, 4)
571 for i, k := range ks1 {
572 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
573 defer k.listener.Close()
575 for i, k := range ks2 {
576 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
577 defer k.listener.Close()
580 kc.SetServiceRoots(service_roots)
582 // This test works only if one of the failing services is
583 // attempted before the succeeding service. Otherwise,
584 // <-fh.handled below will just hang! (Probe order depends on
585 // the choice of block content "waz" and the UUIDs of the fake
586 // servers, so we just tried different strings until we found
587 // an example that passes this Assert.)
588 c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
590 r, n, url2, err := kc.Get(hash)
593 c.Check(err, Equals, nil)
594 c.Check(n, Equals, int64(3))
595 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
597 read_content, err2 := ioutil.ReadAll(r)
598 c.Check(err2, Equals, nil)
599 c.Check(read_content, DeepEquals, content)
602 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
603 os.Setenv("ARVADOS_API_HOST", "localhost:3000")
604 os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
605 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
606 content := []byte("TestPutGetHead")
608 arv, err := arvadosclient.MakeArvadosClient()
609 kc, err := MakeKeepClient(&arv)
610 c.Assert(err, Equals, nil)
612 hash := fmt.Sprintf("%x", md5.Sum(content))
615 n, _, err := kc.Ask(hash)
616 c.Check(err, Equals, BlockNotFound)
617 c.Check(n, Equals, int64(0))
620 hash2, replicas, err := kc.PutB(content)
621 c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
622 c.Check(replicas, Equals, 2)
623 c.Check(err, Equals, nil)
626 r, n, url2, err := kc.Get(hash)
627 c.Check(err, Equals, nil)
628 c.Check(n, Equals, int64(len(content)))
629 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
631 read_content, err2 := ioutil.ReadAll(r)
632 c.Check(err2, Equals, nil)
633 c.Check(read_content, DeepEquals, content)
636 n, url2, err := kc.Ask(hash)
637 c.Check(err, Equals, nil)
638 c.Check(n, Equals, int64(len(content)))
639 c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
643 type StubProxyHandler struct {
647 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
648 resp.Header().Set("X-Keep-Replicas-Stored", "2")
649 this.handled <- fmt.Sprintf("http://%s", req.Host)
652 func (s *StandaloneSuite) TestPutProxy(c *C) {
653 log.Printf("TestPutProxy")
655 st := StubProxyHandler{make(chan string, 1)}
657 arv, err := arvadosclient.MakeArvadosClient()
658 kc, _ := MakeKeepClient(&arv)
661 kc.Using_proxy = true
662 arv.ApiToken = "abc123"
663 service_roots := make(map[string]string)
665 ks1 := RunSomeFakeKeepServers(st, 1)
667 for i, k := range ks1 {
668 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
669 defer k.listener.Close()
672 kc.SetServiceRoots(service_roots)
674 _, replicas, err := kc.PutB([]byte("foo"))
677 c.Check(err, Equals, nil)
678 c.Check(replicas, Equals, 2)
680 log.Printf("TestPutProxy done")
683 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
684 log.Printf("TestPutProxy")
686 st := StubProxyHandler{make(chan string, 1)}
688 arv, err := arvadosclient.MakeArvadosClient()
689 kc, _ := MakeKeepClient(&arv)
692 kc.Using_proxy = true
693 arv.ApiToken = "abc123"
694 service_roots := make(map[string]string)
696 ks1 := RunSomeFakeKeepServers(st, 1)
698 for i, k := range ks1 {
699 service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
700 defer k.listener.Close()
702 kc.SetServiceRoots(service_roots)
704 _, replicas, err := kc.PutB([]byte("foo"))
707 c.Check(err, Equals, InsufficientReplicasError)
708 c.Check(replicas, Equals, 2)
710 log.Printf("TestPutProxy done")
713 func (s *StandaloneSuite) TestMakeLocator(c *C) {
714 l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
716 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
717 c.Check(l.Size, Equals, 3)
718 c.Check(l.Signature, Equals, "abcde")
719 c.Check(l.Timestamp, Equals, "12345678")