7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9 "git.curoverse.com/arvados.git/sdk/go/streamer"
20 // Gocheck boilerplate
21 func Test(t *testing.T) {
25 // Gocheck boilerplate
26 var _ = Suite(&ServerRequiredSuite{})
27 var _ = Suite(&StandaloneSuite{})
29 var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
31 // Tests that require the Keep server running
32 type ServerRequiredSuite struct{}
35 type StandaloneSuite struct{}
37 func pythonDir() string {
39 return fmt.Sprintf("%s/../../python/tests", cwd)
42 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
44 c.Skip("Skipping tests that require server")
47 arvadostest.StartAPI()
48 arvadostest.StartKeep()
51 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
55 arvadostest.StopKeep()
59 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
60 arv, err := arvadosclient.MakeArvadosClient()
61 c.Assert(err, Equals, nil)
63 kc, err := MakeKeepClient(&arv)
65 c.Assert(err, Equals, nil)
66 c.Check(len(kc.LocalRoots()), Equals, 2)
67 for _, root := range kc.LocalRoots() {
68 c.Check(root, Matches, "http://localhost:\\d+")
72 type StubPutHandler struct {
80 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
81 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
82 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
83 body, err := ioutil.ReadAll(req.Body)
84 sph.c.Check(err, Equals, nil)
85 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
87 sph.handled <- fmt.Sprintf("http://%s", req.Host)
90 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
92 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
94 panic(fmt.Sprintf("Could not listen on any port"))
96 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
97 go http.Serve(ks.listener, st)
101 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
102 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
104 ks := RunFakeKeepServer(st)
105 defer ks.listener.Close()
107 arv, _ := arvadosclient.MakeArvadosClient()
108 arv.ApiToken = "abc123"
110 kc, _ := MakeKeepClient(&arv)
112 reader, writer := io.Pipe()
113 upload_status := make(chan uploadStatus)
115 f(kc, ks.url, reader, writer, upload_status)
118 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
119 log.Printf("TestUploadToStubKeepServer")
121 st := StubPutHandler{
123 "acbd18db4cc2f85cedef654fccc4a4d8",
128 UploadToStubHelper(c, st,
129 func(kc *KeepClient, url string, reader io.ReadCloser,
130 writer io.WriteCloser, upload_status chan uploadStatus) {
132 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
134 writer.Write([]byte("foo"))
138 status := <-upload_status
139 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
142 log.Printf("TestUploadToStubKeepServer done")
145 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
146 log.Printf("TestUploadToStubKeepServerBufferReader")
148 st := StubPutHandler{
150 "acbd18db4cc2f85cedef654fccc4a4d8",
155 UploadToStubHelper(c, st,
156 func(kc *KeepClient, url string, reader io.ReadCloser,
157 writer io.WriteCloser, upload_status chan uploadStatus) {
159 tr := streamer.AsyncStreamFromReader(512, reader)
162 br1 := tr.MakeStreamReader()
164 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
166 writer.Write([]byte("foo"))
171 status := <-upload_status
172 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
175 log.Printf("TestUploadToStubKeepServerBufferReader done")
178 type FailHandler struct {
182 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
183 resp.WriteHeader(500)
184 fh.handled <- fmt.Sprintf("http://%s", req.Host)
187 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
188 log.Printf("TestFailedUploadToStubKeepServer")
193 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
195 UploadToStubHelper(c, st,
196 func(kc *KeepClient, url string, reader io.ReadCloser,
197 writer io.WriteCloser, upload_status chan uploadStatus) {
199 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
201 writer.Write([]byte("foo"))
206 status := <-upload_status
207 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
208 c.Check(status.statusCode, Equals, 500)
210 log.Printf("TestFailedUploadToStubKeepServer done")
213 type KeepServer struct {
214 listener net.Listener
218 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
219 ks = make([]KeepServer, n)
221 for i := 0; i < n; i += 1 {
222 ks[i] = RunFakeKeepServer(st)
228 func (s *StandaloneSuite) TestPutB(c *C) {
229 log.Printf("TestPutB")
231 hash := Md5String("foo")
233 st := StubPutHandler{
238 make(chan string, 5)}
240 arv, _ := arvadosclient.MakeArvadosClient()
241 kc, _ := MakeKeepClient(&arv)
244 arv.ApiToken = "abc123"
245 localRoots := make(map[string]string)
247 ks := RunSomeFakeKeepServers(st, 5)
249 for i, k := range ks {
250 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
251 defer k.listener.Close()
254 kc.SetServiceRoots(localRoots, nil)
256 kc.PutB([]byte("foo"))
258 shuff := NewRootSorter(
259 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
263 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
264 (s1 == shuff[1] && s2 == shuff[0]),
268 log.Printf("TestPutB done")
271 func (s *StandaloneSuite) TestPutHR(c *C) {
272 log.Printf("TestPutHR")
274 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
276 st := StubPutHandler{
281 make(chan string, 5)}
283 arv, _ := arvadosclient.MakeArvadosClient()
284 kc, _ := MakeKeepClient(&arv)
287 arv.ApiToken = "abc123"
288 localRoots := make(map[string]string)
290 ks := RunSomeFakeKeepServers(st, 5)
292 for i, k := range ks {
293 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
294 defer k.listener.Close()
297 kc.SetServiceRoots(localRoots, nil)
299 reader, writer := io.Pipe()
302 writer.Write([]byte("foo"))
306 kc.PutHR(hash, reader, 3)
308 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
314 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
315 (s1 == shuff[1] && s2 == shuff[0]),
319 log.Printf("TestPutHR done")
322 func (s *StandaloneSuite) TestPutWithFail(c *C) {
323 log.Printf("TestPutWithFail")
325 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
327 st := StubPutHandler{
332 make(chan string, 4)}
335 make(chan string, 1)}
337 arv, err := arvadosclient.MakeArvadosClient()
338 kc, _ := MakeKeepClient(&arv)
341 arv.ApiToken = "abc123"
342 localRoots := make(map[string]string)
344 ks1 := RunSomeFakeKeepServers(st, 4)
345 ks2 := RunSomeFakeKeepServers(fh, 1)
347 for i, k := range ks1 {
348 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
349 defer k.listener.Close()
351 for i, k := range ks2 {
352 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
353 defer k.listener.Close()
356 kc.SetServiceRoots(localRoots, nil)
358 shuff := NewRootSorter(
359 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
361 phash, replicas, err := kc.PutB([]byte("foo"))
365 c.Check(err, Equals, nil)
366 c.Check(phash, Equals, "")
367 c.Check(replicas, Equals, 2)
372 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
373 (s1 == shuff[2] && s2 == shuff[1]),
378 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
379 log.Printf("TestPutWithTooManyFail")
381 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
383 st := StubPutHandler{
388 make(chan string, 1)}
391 make(chan string, 4)}
393 arv, err := arvadosclient.MakeArvadosClient()
394 kc, _ := MakeKeepClient(&arv)
397 arv.ApiToken = "abc123"
398 localRoots := make(map[string]string)
400 ks1 := RunSomeFakeKeepServers(st, 1)
401 ks2 := RunSomeFakeKeepServers(fh, 4)
403 for i, k := range ks1 {
404 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
405 defer k.listener.Close()
407 for i, k := range ks2 {
408 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
409 defer k.listener.Close()
412 kc.SetServiceRoots(localRoots, nil)
414 _, replicas, err := kc.PutB([]byte("foo"))
416 c.Check(err, Equals, InsufficientReplicasError)
417 c.Check(replicas, Equals, 1)
418 c.Check(<-st.handled, Equals, ks1[0].url)
420 log.Printf("TestPutWithTooManyFail done")
423 type StubGetHandler struct {
426 expectApiToken string
431 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
432 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
433 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
434 resp.WriteHeader(sgh.httpStatus)
435 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
439 func (s *StandaloneSuite) TestGet(c *C) {
440 log.Printf("TestGet")
442 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
444 st := StubGetHandler{
451 ks := RunFakeKeepServer(st)
452 defer ks.listener.Close()
454 arv, err := arvadosclient.MakeArvadosClient()
455 kc, _ := MakeKeepClient(&arv)
456 arv.ApiToken = "abc123"
457 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
459 r, n, url2, err := kc.Get(hash)
461 c.Check(err, Equals, nil)
462 c.Check(n, Equals, int64(3))
463 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
465 content, err2 := ioutil.ReadAll(r)
466 c.Check(err2, Equals, nil)
467 c.Check(content, DeepEquals, []byte("foo"))
469 log.Printf("TestGet done")
472 func (s *StandaloneSuite) TestGetFail(c *C) {
473 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
475 st := FailHandler{make(chan string, 1)}
477 ks := RunFakeKeepServer(st)
478 defer ks.listener.Close()
480 arv, err := arvadosclient.MakeArvadosClient()
481 kc, _ := MakeKeepClient(&arv)
482 arv.ApiToken = "abc123"
483 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
485 r, n, url2, err := kc.Get(hash)
486 c.Check(err, Equals, BlockNotFound)
487 c.Check(n, Equals, int64(0))
488 c.Check(url2, Equals, "")
489 c.Check(r, Equals, nil)
492 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
493 uuid := "zzzzz-bi6l4-123451234512345"
494 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
496 // This one shouldn't be used:
497 ks0 := RunFakeKeepServer(StubGetHandler{
503 defer ks0.listener.Close()
504 // This one should be used:
505 ks := RunFakeKeepServer(StubGetHandler{
511 defer ks.listener.Close()
513 arv, err := arvadosclient.MakeArvadosClient()
514 kc, _ := MakeKeepClient(&arv)
515 arv.ApiToken = "abc123"
517 map[string]string{"x": ks0.url},
518 map[string]string{uuid: ks.url})
520 r, n, uri, err := kc.Get(hash+"+K@"+uuid)
522 c.Check(err, Equals, nil)
523 c.Check(n, Equals, int64(3))
524 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
526 content, err := ioutil.ReadAll(r)
527 c.Check(err, Equals, nil)
528 c.Check(content, DeepEquals, []byte("foo"))
531 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
532 uuid := "zzzzz-bi6l4-123451234512345"
533 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
535 ksLocal := RunFakeKeepServer(StubGetHandler{
541 defer ksLocal.listener.Close()
542 ksGateway := RunFakeKeepServer(StubGetHandler{
546 http.StatusInternalServerError,
548 defer ksGateway.listener.Close()
550 arv, err := arvadosclient.MakeArvadosClient()
551 kc, _ := MakeKeepClient(&arv)
552 arv.ApiToken = "abc123"
554 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
555 map[string]string{uuid: ksGateway.url})
557 r, n, uri, err := kc.Get(hash+"+K@"+uuid)
558 c.Assert(err, Equals, nil)
560 c.Check(n, Equals, int64(3))
561 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
563 content, err := ioutil.ReadAll(r)
564 c.Check(err, Equals, nil)
565 c.Check(content, DeepEquals, []byte("foo"))
568 type BarHandler struct {
572 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
573 resp.Write([]byte("bar"))
574 this.handled <- fmt.Sprintf("http://%s", req.Host)
577 func (s *StandaloneSuite) TestChecksum(c *C) {
578 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
579 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
581 st := BarHandler{make(chan string, 1)}
583 ks := RunFakeKeepServer(st)
584 defer ks.listener.Close()
586 arv, err := arvadosclient.MakeArvadosClient()
587 kc, _ := MakeKeepClient(&arv)
588 arv.ApiToken = "abc123"
589 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
591 r, n, _, err := kc.Get(barhash)
592 _, err = ioutil.ReadAll(r)
593 c.Check(n, Equals, int64(3))
594 c.Check(err, Equals, nil)
598 r, n, _, err = kc.Get(foohash)
599 _, err = ioutil.ReadAll(r)
600 c.Check(n, Equals, int64(3))
601 c.Check(err, Equals, BadChecksum)
606 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
607 content := []byte("waz")
608 hash := fmt.Sprintf("%x", md5.Sum(content))
611 make(chan string, 4)}
613 st := StubGetHandler{
620 arv, err := arvadosclient.MakeArvadosClient()
621 kc, _ := MakeKeepClient(&arv)
622 arv.ApiToken = "abc123"
623 localRoots := make(map[string]string)
625 ks1 := RunSomeFakeKeepServers(st, 1)
626 ks2 := RunSomeFakeKeepServers(fh, 4)
628 for i, k := range ks1 {
629 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
630 defer k.listener.Close()
632 for i, k := range ks2 {
633 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
634 defer k.listener.Close()
637 kc.SetServiceRoots(localRoots, nil)
639 // This test works only if one of the failing services is
640 // attempted before the succeeding service. Otherwise,
641 // <-fh.handled below will just hang! (Probe order depends on
642 // the choice of block content "waz" and the UUIDs of the fake
643 // servers, so we just tried different strings until we found
644 // an example that passes this Assert.)
645 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
647 r, n, url2, err := kc.Get(hash)
650 c.Check(err, Equals, nil)
651 c.Check(n, Equals, int64(3))
652 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
654 read_content, err2 := ioutil.ReadAll(r)
655 c.Check(err2, Equals, nil)
656 c.Check(read_content, DeepEquals, content)
659 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
660 content := []byte("TestPutGetHead")
662 arv, err := arvadosclient.MakeArvadosClient()
663 kc, err := MakeKeepClient(&arv)
664 c.Assert(err, Equals, nil)
666 hash := fmt.Sprintf("%x", md5.Sum(content))
669 n, _, err := kc.Ask(hash)
670 c.Check(err, Equals, BlockNotFound)
671 c.Check(n, Equals, int64(0))
674 hash2, replicas, err := kc.PutB(content)
675 c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content)))
676 c.Check(replicas, Equals, 2)
677 c.Check(err, Equals, nil)
680 r, n, url2, err := kc.Get(hash)
681 c.Check(err, Equals, nil)
682 c.Check(n, Equals, int64(len(content)))
683 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
685 read_content, err2 := ioutil.ReadAll(r)
686 c.Check(err2, Equals, nil)
687 c.Check(read_content, DeepEquals, content)
690 n, url2, err := kc.Ask(hash)
691 c.Check(err, Equals, nil)
692 c.Check(n, Equals, int64(len(content)))
693 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
697 type StubProxyHandler struct {
701 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
702 resp.Header().Set("X-Keep-Replicas-Stored", "2")
703 this.handled <- fmt.Sprintf("http://%s", req.Host)
706 func (s *StandaloneSuite) TestPutProxy(c *C) {
707 log.Printf("TestPutProxy")
709 st := StubProxyHandler{make(chan string, 1)}
711 arv, err := arvadosclient.MakeArvadosClient()
712 kc, _ := MakeKeepClient(&arv)
715 kc.Using_proxy = true
716 arv.ApiToken = "abc123"
717 localRoots := make(map[string]string)
719 ks1 := RunSomeFakeKeepServers(st, 1)
721 for i, k := range ks1 {
722 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
723 defer k.listener.Close()
726 kc.SetServiceRoots(localRoots, nil)
728 _, replicas, err := kc.PutB([]byte("foo"))
731 c.Check(err, Equals, nil)
732 c.Check(replicas, Equals, 2)
734 log.Printf("TestPutProxy done")
737 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
738 log.Printf("TestPutProxy")
740 st := StubProxyHandler{make(chan string, 1)}
742 arv, err := arvadosclient.MakeArvadosClient()
743 kc, _ := MakeKeepClient(&arv)
746 kc.Using_proxy = true
747 arv.ApiToken = "abc123"
748 localRoots := make(map[string]string)
750 ks1 := RunSomeFakeKeepServers(st, 1)
752 for i, k := range ks1 {
753 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
754 defer k.listener.Close()
756 kc.SetServiceRoots(localRoots, nil)
758 _, replicas, err := kc.PutB([]byte("foo"))
761 c.Check(err, Equals, InsufficientReplicasError)
762 c.Check(replicas, Equals, 2)
764 log.Printf("TestPutProxy done")
767 func (s *StandaloneSuite) TestMakeLocator(c *C) {
768 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
769 c.Check(err, Equals, nil)
770 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
771 c.Check(l.Size, Equals, 3)
772 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
775 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
776 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
777 c.Check(err, Equals, nil)
778 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
779 c.Check(l.Size, Equals, -1)
780 c.Check(l.Hints, DeepEquals, []string{})
783 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
784 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
785 c.Check(err, Equals, nil)
786 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
787 c.Check(l.Size, Equals, -1)
788 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
791 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
792 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
793 c.Check(err, Equals, InvalidLocatorError)