6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8 "git.curoverse.com/arvados.git/sdk/go/streamer"
21 // Gocheck boilerplate
22 func Test(t *testing.T) {
26 // Gocheck boilerplate
27 var _ = Suite(&ServerRequiredSuite{})
28 var _ = Suite(&StandaloneSuite{})
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
34 type StandaloneSuite struct{}
36 func pythonDir() string {
38 return fmt.Sprintf("%s/../../python/tests", cwd)
41 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
42 arvadostest.StartAPI()
43 arvadostest.StartKeep(2, false)
46 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
47 arvadostest.StopKeep(2)
51 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
52 arv, err := arvadosclient.MakeArvadosClient()
53 c.Assert(err, Equals, nil)
55 kc, err := MakeKeepClient(arv)
57 c.Assert(err, Equals, nil)
58 c.Check(len(kc.LocalRoots()), Equals, 2)
59 for _, root := range kc.LocalRoots() {
60 c.Check(root, Matches, "http://localhost:\\d+")
64 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
65 arv, err := arvadosclient.MakeArvadosClient()
66 c.Assert(err, Equals, nil)
68 kc, err := MakeKeepClient(arv)
69 c.Assert(kc.Want_replicas, Equals, 2)
71 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
72 kc, err = MakeKeepClient(arv)
73 c.Assert(kc.Want_replicas, Equals, 3)
75 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
76 kc, err = MakeKeepClient(arv)
77 c.Assert(kc.Want_replicas, Equals, 1)
80 type StubPutHandler struct {
88 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
89 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
90 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
91 body, err := ioutil.ReadAll(req.Body)
92 sph.c.Check(err, Equals, nil)
93 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
95 sph.handled <- fmt.Sprintf("http://%s", req.Host)
98 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
100 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
102 panic(fmt.Sprintf("Could not listen on any port"))
104 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
105 go http.Serve(ks.listener, st)
109 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
110 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
112 ks := RunFakeKeepServer(st)
113 defer ks.listener.Close()
115 arv, _ := arvadosclient.MakeArvadosClient()
116 arv.ApiToken = "abc123"
118 kc, _ := MakeKeepClient(arv)
120 reader, writer := io.Pipe()
121 upload_status := make(chan uploadStatus)
123 f(kc, ks.url, reader, writer, upload_status)
126 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
127 log.Printf("TestUploadToStubKeepServer")
129 st := StubPutHandler{
131 "acbd18db4cc2f85cedef654fccc4a4d8",
136 UploadToStubHelper(c, st,
137 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
139 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
141 writer.Write([]byte("foo"))
145 status := <-upload_status
146 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
150 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
151 st := StubPutHandler{
153 "acbd18db4cc2f85cedef654fccc4a4d8",
158 UploadToStubHelper(c, st,
159 func(kc *KeepClient, url string, reader io.ReadCloser,
160 writer io.WriteCloser, upload_status chan uploadStatus) {
162 tr := streamer.AsyncStreamFromReader(512, reader)
165 br1 := tr.MakeStreamReader()
167 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
169 writer.Write([]byte("foo"))
174 status := <-upload_status
175 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
179 type FailHandler struct {
183 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
184 resp.WriteHeader(500)
185 fh.handled <- fmt.Sprintf("http://%s", req.Host)
188 type FailThenSucceedHandler struct {
191 successhandler StubGetHandler
194 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
196 resp.WriteHeader(500)
198 fh.handled <- fmt.Sprintf("http://%s", req.Host)
200 fh.successhandler.ServeHTTP(resp, req)
204 type Error404Handler struct {
208 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
209 resp.WriteHeader(404)
210 fh.handled <- fmt.Sprintf("http://%s", req.Host)
213 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
217 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
219 UploadToStubHelper(c, st,
220 func(kc *KeepClient, url string, reader io.ReadCloser,
221 writer io.WriteCloser, upload_status chan uploadStatus) {
223 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
225 writer.Write([]byte("foo"))
230 status := <-upload_status
231 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
232 c.Check(status.statusCode, Equals, 500)
236 type KeepServer struct {
237 listener net.Listener
241 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
242 ks = make([]KeepServer, n)
244 for i := 0; i < n; i += 1 {
245 ks[i] = RunFakeKeepServer(st)
251 func (s *StandaloneSuite) TestPutB(c *C) {
252 hash := Md5String("foo")
254 st := StubPutHandler{
259 make(chan string, 5)}
261 arv, _ := arvadosclient.MakeArvadosClient()
262 kc, _ := MakeKeepClient(arv)
265 arv.ApiToken = "abc123"
266 localRoots := make(map[string]string)
267 writableLocalRoots := make(map[string]string)
269 ks := RunSomeFakeKeepServers(st, 5)
271 for i, k := range ks {
272 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
273 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
274 defer k.listener.Close()
277 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
279 kc.PutB([]byte("foo"))
281 shuff := NewRootSorter(
282 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
286 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
287 (s1 == shuff[1] && s2 == shuff[0]),
292 func (s *StandaloneSuite) TestPutHR(c *C) {
293 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
295 st := StubPutHandler{
300 make(chan string, 5)}
302 arv, _ := arvadosclient.MakeArvadosClient()
303 kc, _ := MakeKeepClient(arv)
306 arv.ApiToken = "abc123"
307 localRoots := make(map[string]string)
308 writableLocalRoots := make(map[string]string)
310 ks := RunSomeFakeKeepServers(st, 5)
312 for i, k := range ks {
313 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
314 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
315 defer k.listener.Close()
318 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
320 reader, writer := io.Pipe()
323 writer.Write([]byte("foo"))
327 kc.PutHR(hash, reader, 3)
329 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
334 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
335 (s1 == shuff[1] && s2 == shuff[0]),
340 func (s *StandaloneSuite) TestPutWithFail(c *C) {
341 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
343 st := StubPutHandler{
348 make(chan string, 4)}
351 make(chan string, 1)}
353 arv, err := arvadosclient.MakeArvadosClient()
354 kc, _ := MakeKeepClient(arv)
357 arv.ApiToken = "abc123"
358 localRoots := make(map[string]string)
359 writableLocalRoots := make(map[string]string)
361 ks1 := RunSomeFakeKeepServers(st, 4)
362 ks2 := RunSomeFakeKeepServers(fh, 1)
364 for i, k := range ks1 {
365 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
366 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
367 defer k.listener.Close()
369 for i, k := range ks2 {
370 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
371 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
372 defer k.listener.Close()
375 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
377 shuff := NewRootSorter(
378 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
381 phash, replicas, err := kc.PutB([]byte("foo"))
385 c.Check(err, Equals, nil)
386 c.Check(phash, Equals, "")
387 c.Check(replicas, Equals, 2)
392 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
393 (s1 == shuff[2] && s2 == shuff[1]),
398 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
399 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
401 st := StubPutHandler{
406 make(chan string, 1)}
409 make(chan string, 4)}
411 arv, err := arvadosclient.MakeArvadosClient()
412 kc, _ := MakeKeepClient(arv)
416 arv.ApiToken = "abc123"
417 localRoots := make(map[string]string)
418 writableLocalRoots := make(map[string]string)
420 ks1 := RunSomeFakeKeepServers(st, 1)
421 ks2 := RunSomeFakeKeepServers(fh, 4)
423 for i, k := range ks1 {
424 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
425 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
426 defer k.listener.Close()
428 for i, k := range ks2 {
429 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
430 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
431 defer k.listener.Close()
434 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
436 _, replicas, err := kc.PutB([]byte("foo"))
438 c.Check(err, Equals, InsufficientReplicasError)
439 c.Check(replicas, Equals, 1)
440 c.Check(<-st.handled, Equals, ks1[0].url)
443 type StubGetHandler struct {
446 expectApiToken string
451 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
452 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
453 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
454 resp.WriteHeader(sgh.httpStatus)
455 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
459 func (s *StandaloneSuite) TestGet(c *C) {
460 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
462 st := StubGetHandler{
469 ks := RunFakeKeepServer(st)
470 defer ks.listener.Close()
472 arv, err := arvadosclient.MakeArvadosClient()
473 kc, _ := MakeKeepClient(arv)
474 arv.ApiToken = "abc123"
475 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
477 r, n, url2, err := kc.Get(hash)
479 c.Check(err, Equals, nil)
480 c.Check(n, Equals, int64(3))
481 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
483 content, err2 := ioutil.ReadAll(r)
484 c.Check(err2, Equals, nil)
485 c.Check(content, DeepEquals, []byte("foo"))
488 func (s *StandaloneSuite) TestGet404(c *C) {
489 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
491 st := Error404Handler{make(chan string, 1)}
493 ks := RunFakeKeepServer(st)
494 defer ks.listener.Close()
496 arv, err := arvadosclient.MakeArvadosClient()
497 kc, _ := MakeKeepClient(arv)
498 arv.ApiToken = "abc123"
499 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
501 r, n, url2, err := kc.Get(hash)
502 c.Check(err, Equals, BlockNotFound)
503 c.Check(n, Equals, int64(0))
504 c.Check(url2, Equals, "")
505 c.Check(r, Equals, nil)
508 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
509 st := Error404Handler{make(chan string, 1)}
511 ks := RunFakeKeepServer(st)
512 defer ks.listener.Close()
514 arv, err := arvadosclient.MakeArvadosClient()
515 kc, _ := MakeKeepClient(arv)
516 arv.ApiToken = "abc123"
517 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
519 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
521 c.Check(n, Equals, int64(0))
522 c.Check(url2, Equals, "")
524 buf, err := ioutil.ReadAll(r)
526 c.Check(buf, DeepEquals, []byte{})
529 func (s *StandaloneSuite) TestGetFail(c *C) {
530 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
532 st := FailHandler{make(chan string, 1)}
534 ks := RunFakeKeepServer(st)
535 defer ks.listener.Close()
537 arv, err := arvadosclient.MakeArvadosClient()
538 kc, _ := MakeKeepClient(arv)
539 arv.ApiToken = "abc123"
540 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
543 r, n, url2, err := kc.Get(hash)
544 errNotFound, _ := err.(*ErrNotFound)
545 c.Check(errNotFound, NotNil)
546 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
547 c.Check(errNotFound.Temporary(), Equals, true)
548 c.Check(n, Equals, int64(0))
549 c.Check(url2, Equals, "")
550 c.Check(r, Equals, nil)
553 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
554 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
556 st := &FailThenSucceedHandler{make(chan string, 1), 0,
564 ks := RunFakeKeepServer(st)
565 defer ks.listener.Close()
567 arv, err := arvadosclient.MakeArvadosClient()
568 kc, _ := MakeKeepClient(arv)
569 arv.ApiToken = "abc123"
570 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
572 r, n, url2, err := kc.Get(hash)
574 c.Check(err, Equals, nil)
575 c.Check(n, Equals, int64(3))
576 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
578 content, err2 := ioutil.ReadAll(r)
579 c.Check(err2, Equals, nil)
580 c.Check(content, DeepEquals, []byte("foo"))
583 func (s *StandaloneSuite) TestGetNetError(c *C) {
584 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
586 arv, err := arvadosclient.MakeArvadosClient()
587 kc, _ := MakeKeepClient(arv)
588 arv.ApiToken = "abc123"
589 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
591 r, n, url2, err := kc.Get(hash)
592 errNotFound, _ := err.(*ErrNotFound)
593 c.Check(errNotFound, NotNil)
594 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
595 c.Check(errNotFound.Temporary(), Equals, true)
596 c.Check(n, Equals, int64(0))
597 c.Check(url2, Equals, "")
598 c.Check(r, Equals, nil)
601 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
602 uuid := "zzzzz-bi6l4-123451234512345"
603 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
605 // This one shouldn't be used:
606 ks0 := RunFakeKeepServer(StubGetHandler{
612 defer ks0.listener.Close()
613 // This one should be used:
614 ks := RunFakeKeepServer(StubGetHandler{
620 defer ks.listener.Close()
622 arv, err := arvadosclient.MakeArvadosClient()
623 kc, _ := MakeKeepClient(arv)
624 arv.ApiToken = "abc123"
626 map[string]string{"x": ks0.url},
628 map[string]string{uuid: ks.url})
630 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
632 c.Check(err, Equals, nil)
633 c.Check(n, Equals, int64(3))
634 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
636 content, err := ioutil.ReadAll(r)
637 c.Check(err, Equals, nil)
638 c.Check(content, DeepEquals, []byte("foo"))
641 // Use a service hint to fetch from a local disk service, overriding
642 // rendezvous probe order.
643 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
644 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
645 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
647 // This one shouldn't be used, although it appears first in
648 // rendezvous probe order:
649 ks0 := RunFakeKeepServer(StubGetHandler{
655 defer ks0.listener.Close()
656 // This one should be used:
657 ks := RunFakeKeepServer(StubGetHandler{
663 defer ks.listener.Close()
665 arv, err := arvadosclient.MakeArvadosClient()
666 kc, _ := MakeKeepClient(arv)
667 arv.ApiToken = "abc123"
670 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
671 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
672 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
676 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
677 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
678 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
682 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
684 c.Check(err, Equals, nil)
685 c.Check(n, Equals, int64(3))
686 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
688 content, err := ioutil.ReadAll(r)
689 c.Check(err, Equals, nil)
690 c.Check(content, DeepEquals, []byte("foo"))
693 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
694 uuid := "zzzzz-bi6l4-123451234512345"
695 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
697 ksLocal := RunFakeKeepServer(StubGetHandler{
703 defer ksLocal.listener.Close()
704 ksGateway := RunFakeKeepServer(StubGetHandler{
708 http.StatusInternalServerError,
710 defer ksGateway.listener.Close()
712 arv, err := arvadosclient.MakeArvadosClient()
713 kc, _ := MakeKeepClient(arv)
714 arv.ApiToken = "abc123"
716 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
718 map[string]string{uuid: ksGateway.url})
720 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
721 c.Assert(err, Equals, nil)
723 c.Check(n, Equals, int64(3))
724 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
726 content, err := ioutil.ReadAll(r)
727 c.Check(err, Equals, nil)
728 c.Check(content, DeepEquals, []byte("foo"))
731 type BarHandler struct {
735 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
736 resp.Write([]byte("bar"))
737 this.handled <- fmt.Sprintf("http://%s", req.Host)
740 func (s *StandaloneSuite) TestChecksum(c *C) {
741 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
742 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
744 st := BarHandler{make(chan string, 1)}
746 ks := RunFakeKeepServer(st)
747 defer ks.listener.Close()
749 arv, err := arvadosclient.MakeArvadosClient()
750 kc, _ := MakeKeepClient(arv)
751 arv.ApiToken = "abc123"
752 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
754 r, n, _, err := kc.Get(barhash)
755 _, err = ioutil.ReadAll(r)
756 c.Check(n, Equals, int64(3))
757 c.Check(err, Equals, nil)
761 r, n, _, err = kc.Get(foohash)
762 _, err = ioutil.ReadAll(r)
763 c.Check(n, Equals, int64(3))
764 c.Check(err, Equals, BadChecksum)
769 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
770 content := []byte("waz")
771 hash := fmt.Sprintf("%x", md5.Sum(content))
773 fh := Error404Handler{
774 make(chan string, 4)}
776 st := StubGetHandler{
783 arv, err := arvadosclient.MakeArvadosClient()
784 kc, _ := MakeKeepClient(arv)
785 arv.ApiToken = "abc123"
786 localRoots := make(map[string]string)
787 writableLocalRoots := make(map[string]string)
789 ks1 := RunSomeFakeKeepServers(st, 1)
790 ks2 := RunSomeFakeKeepServers(fh, 4)
792 for i, k := range ks1 {
793 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
794 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
795 defer k.listener.Close()
797 for i, k := range ks2 {
798 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
799 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
800 defer k.listener.Close()
803 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
806 // This test works only if one of the failing services is
807 // attempted before the succeeding service. Otherwise,
808 // <-fh.handled below will just hang! (Probe order depends on
809 // the choice of block content "waz" and the UUIDs of the fake
810 // servers, so we just tried different strings until we found
811 // an example that passes this Assert.)
812 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
814 r, n, url2, err := kc.Get(hash)
817 c.Check(err, Equals, nil)
818 c.Check(n, Equals, int64(3))
819 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
821 read_content, err2 := ioutil.ReadAll(r)
822 c.Check(err2, Equals, nil)
823 c.Check(read_content, DeepEquals, content)
826 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
827 content := []byte("TestPutGetHead")
829 arv, err := arvadosclient.MakeArvadosClient()
830 kc, err := MakeKeepClient(arv)
831 c.Assert(err, Equals, nil)
833 hash := fmt.Sprintf("%x", md5.Sum(content))
836 n, _, err := kc.Ask(hash)
837 c.Check(err, Equals, BlockNotFound)
838 c.Check(n, Equals, int64(0))
841 hash2, replicas, err := kc.PutB(content)
842 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
843 c.Check(replicas, Equals, 2)
844 c.Check(err, Equals, nil)
847 r, n, url2, err := kc.Get(hash)
848 c.Check(err, Equals, nil)
849 c.Check(n, Equals, int64(len(content)))
850 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
852 read_content, err2 := ioutil.ReadAll(r)
853 c.Check(err2, Equals, nil)
854 c.Check(read_content, DeepEquals, content)
857 n, url2, err := kc.Ask(hash)
858 c.Check(err, Equals, nil)
859 c.Check(n, Equals, int64(len(content)))
860 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
864 type StubProxyHandler struct {
868 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
869 resp.Header().Set("X-Keep-Replicas-Stored", "2")
870 this.handled <- fmt.Sprintf("http://%s", req.Host)
873 func (s *StandaloneSuite) TestPutProxy(c *C) {
874 st := StubProxyHandler{make(chan string, 1)}
876 arv, err := arvadosclient.MakeArvadosClient()
877 kc, _ := MakeKeepClient(arv)
880 arv.ApiToken = "abc123"
881 localRoots := make(map[string]string)
882 writableLocalRoots := make(map[string]string)
884 ks1 := RunSomeFakeKeepServers(st, 1)
886 for i, k := range ks1 {
887 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
888 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
889 defer k.listener.Close()
892 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
894 _, replicas, err := kc.PutB([]byte("foo"))
897 c.Check(err, Equals, nil)
898 c.Check(replicas, Equals, 2)
901 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
902 st := StubProxyHandler{make(chan string, 1)}
904 arv, err := arvadosclient.MakeArvadosClient()
905 kc, _ := MakeKeepClient(arv)
908 arv.ApiToken = "abc123"
909 localRoots := make(map[string]string)
910 writableLocalRoots := make(map[string]string)
912 ks1 := RunSomeFakeKeepServers(st, 1)
914 for i, k := range ks1 {
915 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
916 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
917 defer k.listener.Close()
919 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
921 _, replicas, err := kc.PutB([]byte("foo"))
924 c.Check(err, Equals, InsufficientReplicasError)
925 c.Check(replicas, Equals, 2)
928 func (s *StandaloneSuite) TestMakeLocator(c *C) {
929 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
930 c.Check(err, Equals, nil)
931 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
932 c.Check(l.Size, Equals, 3)
933 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
936 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
937 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
938 c.Check(err, Equals, nil)
939 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
940 c.Check(l.Size, Equals, -1)
941 c.Check(l.Hints, DeepEquals, []string{})
944 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
945 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
946 c.Check(err, Equals, nil)
947 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
948 c.Check(l.Size, Equals, -1)
949 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
952 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
953 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
954 l, err := MakeLocator(str)
955 c.Check(err, Equals, nil)
956 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
957 c.Check(l.Size, Equals, 3)
958 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
959 c.Check(l.String(), Equals, str)
962 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
963 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
964 c.Check(err, Equals, InvalidLocatorError)
967 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
968 hash := Md5String("foo")
970 st := StubPutHandler{
975 make(chan string, 5)}
977 arv, _ := arvadosclient.MakeArvadosClient()
978 kc, _ := MakeKeepClient(arv)
981 arv.ApiToken = "abc123"
982 localRoots := make(map[string]string)
983 writableLocalRoots := make(map[string]string)
985 ks := RunSomeFakeKeepServers(st, 5)
987 for i, k := range ks {
988 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
990 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
992 defer k.listener.Close()
995 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
997 _, replicas, err := kc.PutB([]byte("foo"))
999 c.Check(err, Equals, InsufficientReplicasError)
1000 c.Check(replicas, Equals, 1)
1002 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1005 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1006 hash := Md5String("foo")
1008 st := StubPutHandler{
1013 make(chan string, 5)}
1015 arv, _ := arvadosclient.MakeArvadosClient()
1016 kc, _ := MakeKeepClient(arv)
1018 kc.Want_replicas = 2
1019 arv.ApiToken = "abc123"
1020 localRoots := make(map[string]string)
1021 writableLocalRoots := make(map[string]string)
1023 ks := RunSomeFakeKeepServers(st, 5)
1025 for i, k := range ks {
1026 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1027 defer k.listener.Close()
1030 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1032 _, replicas, err := kc.PutB([]byte("foo"))
1034 c.Check(err, Equals, InsufficientReplicasError)
1035 c.Check(replicas, Equals, 0)
1038 type StubGetIndexHandler struct {
1041 expectAPIToken string
1046 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1047 h.c.Check(req.URL.Path, Equals, h.expectPath)
1048 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1049 resp.WriteHeader(h.httpStatus)
1050 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1054 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1055 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1057 st := StubGetIndexHandler{
1062 []byte(hash + "+3 1443559274\n\n")}
1064 ks := RunFakeKeepServer(st)
1065 defer ks.listener.Close()
1067 arv, err := arvadosclient.MakeArvadosClient()
1068 kc, _ := MakeKeepClient(arv)
1069 arv.ApiToken = "abc123"
1070 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1072 r, err := kc.GetIndex("x", "")
1073 c.Check(err, Equals, nil)
1075 content, err2 := ioutil.ReadAll(r)
1076 c.Check(err2, Equals, nil)
1077 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1080 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1081 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1083 st := StubGetIndexHandler{
1085 "/index/" + hash[0:3],
1088 []byte(hash + "+3 1443559274\n\n")}
1090 ks := RunFakeKeepServer(st)
1091 defer ks.listener.Close()
1093 arv, err := arvadosclient.MakeArvadosClient()
1094 kc, _ := MakeKeepClient(arv)
1095 arv.ApiToken = "abc123"
1096 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1098 r, err := kc.GetIndex("x", hash[0:3])
1099 c.Check(err, Equals, nil)
1101 content, err2 := ioutil.ReadAll(r)
1102 c.Check(err2, Equals, nil)
1103 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1106 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1107 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1109 st := StubGetIndexHandler{
1111 "/index/" + hash[0:3],
1116 ks := RunFakeKeepServer(st)
1117 defer ks.listener.Close()
1119 arv, err := arvadosclient.MakeArvadosClient()
1120 kc, _ := MakeKeepClient(arv)
1121 arv.ApiToken = "abc123"
1122 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1124 _, err = kc.GetIndex("x", hash[0:3])
1125 c.Check(err, Equals, ErrIncompleteIndex)
1128 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1129 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1131 st := StubGetIndexHandler{
1133 "/index/" + hash[0:3],
1138 ks := RunFakeKeepServer(st)
1139 defer ks.listener.Close()
1141 arv, err := arvadosclient.MakeArvadosClient()
1142 kc, _ := MakeKeepClient(arv)
1143 arv.ApiToken = "abc123"
1144 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1146 _, err = kc.GetIndex("y", hash[0:3])
1147 c.Check(err, Equals, ErrNoSuchKeepServer)
1150 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1151 st := StubGetIndexHandler{
1158 ks := RunFakeKeepServer(st)
1159 defer ks.listener.Close()
1161 arv, err := arvadosclient.MakeArvadosClient()
1162 kc, _ := MakeKeepClient(arv)
1163 arv.ApiToken = "abc123"
1164 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1166 r, err := kc.GetIndex("x", "abcd")
1167 c.Check(err, Equals, nil)
1169 content, err2 := ioutil.ReadAll(r)
1170 c.Check(err2, Equals, nil)
1171 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1174 type FailThenSucceedPutHandler struct {
1177 successhandler StubPutHandler
1180 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1182 resp.WriteHeader(500)
1184 h.handled <- fmt.Sprintf("http://%s", req.Host)
1186 h.successhandler.ServeHTTP(resp, req)
1190 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1191 st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1197 make(chan string, 5)}}
1199 arv, _ := arvadosclient.MakeArvadosClient()
1200 kc, _ := MakeKeepClient(arv)
1202 kc.Want_replicas = 2
1203 arv.ApiToken = "abc123"
1204 localRoots := make(map[string]string)
1205 writableLocalRoots := make(map[string]string)
1207 ks := RunSomeFakeKeepServers(st, 2)
1209 for i, k := range ks {
1210 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1211 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1212 defer k.listener.Close()
1215 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1217 hash, replicas, err := kc.PutB([]byte("foo"))
1219 c.Check(err, Equals, nil)
1220 c.Check(hash, Equals, "")
1221 c.Check(replicas, Equals, 2)
1224 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1225 arv, err := arvadosclient.MakeArvadosClient()
1226 c.Assert(err, Equals, nil)
1228 // Add an additional "testblobstore" keepservice
1229 blobKeepService := make(arvadosclient.Dict)
1230 err = arv.Create("keep_services",
1231 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1232 "service_host": "localhost",
1233 "service_port": "21321",
1234 "service_type": "testblobstore"}},
1236 c.Assert(err, Equals, nil)
1237 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1239 // Make a keepclient and ensure that the testblobstore is included
1240 kc, err := MakeKeepClient(arv)
1241 c.Assert(err, Equals, nil)
1243 // verify kc.LocalRoots
1244 c.Check(len(kc.LocalRoots()), Equals, 3)
1245 for _, root := range kc.LocalRoots() {
1246 c.Check(root, Matches, "http://localhost:\\d+")
1248 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1250 // verify kc.GatewayRoots
1251 c.Check(len(kc.GatewayRoots()), Equals, 3)
1252 for _, root := range kc.GatewayRoots() {
1253 c.Check(root, Matches, "http://localhost:\\d+")
1255 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1257 // verify kc.WritableLocalRoots
1258 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1259 for _, root := range kc.WritableLocalRoots() {
1260 c.Check(root, Matches, "http://localhost:\\d+")
1262 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1264 c.Assert(kc.replicasPerService, Equals, 0)
1265 c.Assert(kc.foundNonDiskSvc, Equals, true)
1266 c.Assert(kc.Client.Timeout, Equals, 300*time.Second)