17 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
19 "git.curoverse.com/arvados.git/sdk/go/streamer"
23 // Gocheck boilerplate
24 func Test(t *testing.T) {
28 // Gocheck boilerplate
29 var _ = Suite(&ServerRequiredSuite{})
30 var _ = Suite(&StandaloneSuite{})
32 // Tests that require the Keep server running
33 type ServerRequiredSuite struct{}
36 type StandaloneSuite struct{}
38 func (s *StandaloneSuite) SetUpTest(c *C) {
39 RefreshServiceDiscovery()
42 func pythonDir() string {
44 return fmt.Sprintf("%s/../../python/tests", cwd)
47 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
48 arvadostest.StartAPI()
49 arvadostest.StartKeep(2, false)
52 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
53 arvadostest.StopKeep(2)
57 func (s *ServerRequiredSuite) SetUpTest(c *C) {
58 RefreshServiceDiscovery()
61 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
62 arv, err := arvadosclient.MakeArvadosClient()
63 c.Assert(err, Equals, nil)
65 kc, err := MakeKeepClient(arv)
67 c.Assert(err, Equals, nil)
68 c.Check(len(kc.LocalRoots()), Equals, 2)
69 for _, root := range kc.LocalRoots() {
70 c.Check(root, Matches, "http://localhost:\\d+")
74 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
75 arv, err := arvadosclient.MakeArvadosClient()
76 c.Assert(err, Equals, nil)
78 kc, err := MakeKeepClient(arv)
79 c.Assert(kc.Want_replicas, Equals, 2)
81 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
82 kc, err = MakeKeepClient(arv)
83 c.Assert(kc.Want_replicas, Equals, 3)
85 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
86 kc, err = MakeKeepClient(arv)
87 c.Assert(kc.Want_replicas, Equals, 1)
90 type StubPutHandler struct {
98 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
99 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
100 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
101 body, err := ioutil.ReadAll(req.Body)
102 sph.c.Check(err, Equals, nil)
103 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
104 resp.WriteHeader(200)
105 sph.handled <- fmt.Sprintf("http://%s", req.Host)
108 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
110 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
112 panic(fmt.Sprintf("Could not listen on any port"))
114 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
115 go http.Serve(ks.listener, st)
119 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
120 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
122 ks := RunFakeKeepServer(st)
123 defer ks.listener.Close()
125 arv, _ := arvadosclient.MakeArvadosClient()
126 arv.ApiToken = "abc123"
128 kc, _ := MakeKeepClient(arv)
130 reader, writer := io.Pipe()
131 upload_status := make(chan uploadStatus)
133 f(kc, ks.url, reader, writer, upload_status)
136 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
137 log.Printf("TestUploadToStubKeepServer")
139 st := StubPutHandler{
141 "acbd18db4cc2f85cedef654fccc4a4d8",
146 UploadToStubHelper(c, st,
147 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
149 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
151 writer.Write([]byte("foo"))
155 status := <-upload_status
156 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
160 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
161 st := StubPutHandler{
163 "acbd18db4cc2f85cedef654fccc4a4d8",
168 UploadToStubHelper(c, st,
169 func(kc *KeepClient, url string, reader io.ReadCloser,
170 writer io.WriteCloser, upload_status chan uploadStatus) {
172 tr := streamer.AsyncStreamFromReader(512, reader)
175 br1 := tr.MakeStreamReader()
177 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
179 writer.Write([]byte("foo"))
184 status := <-upload_status
185 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
189 type FailHandler struct {
193 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
194 resp.WriteHeader(500)
195 fh.handled <- fmt.Sprintf("http://%s", req.Host)
198 type FailThenSucceedHandler struct {
201 successhandler StubGetHandler
204 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
206 resp.WriteHeader(500)
208 fh.handled <- fmt.Sprintf("http://%s", req.Host)
210 fh.successhandler.ServeHTTP(resp, req)
214 type Error404Handler struct {
218 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
219 resp.WriteHeader(404)
220 fh.handled <- fmt.Sprintf("http://%s", req.Host)
223 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
227 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
229 UploadToStubHelper(c, st,
230 func(kc *KeepClient, url string, reader io.ReadCloser,
231 writer io.WriteCloser, upload_status chan uploadStatus) {
233 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
235 writer.Write([]byte("foo"))
240 status := <-upload_status
241 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
242 c.Check(status.statusCode, Equals, 500)
246 type KeepServer struct {
247 listener net.Listener
251 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
252 ks = make([]KeepServer, n)
254 for i := 0; i < n; i += 1 {
255 ks[i] = RunFakeKeepServer(st)
261 func (s *StandaloneSuite) TestPutB(c *C) {
262 hash := Md5String("foo")
264 st := StubPutHandler{
269 make(chan string, 5)}
271 arv, _ := arvadosclient.MakeArvadosClient()
272 kc, _ := MakeKeepClient(arv)
275 arv.ApiToken = "abc123"
276 localRoots := make(map[string]string)
277 writableLocalRoots := make(map[string]string)
279 ks := RunSomeFakeKeepServers(st, 5)
281 for i, k := range ks {
282 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
283 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
284 defer k.listener.Close()
287 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
289 kc.PutB([]byte("foo"))
291 shuff := NewRootSorter(
292 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
296 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
297 (s1 == shuff[1] && s2 == shuff[0]),
302 func (s *StandaloneSuite) TestPutHR(c *C) {
303 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
305 st := StubPutHandler{
310 make(chan string, 5)}
312 arv, _ := arvadosclient.MakeArvadosClient()
313 kc, _ := MakeKeepClient(arv)
316 arv.ApiToken = "abc123"
317 localRoots := make(map[string]string)
318 writableLocalRoots := make(map[string]string)
320 ks := RunSomeFakeKeepServers(st, 5)
322 for i, k := range ks {
323 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
324 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
325 defer k.listener.Close()
328 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
330 reader, writer := io.Pipe()
333 writer.Write([]byte("foo"))
337 kc.PutHR(hash, reader, 3)
339 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
344 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
345 (s1 == shuff[1] && s2 == shuff[0]),
350 func (s *StandaloneSuite) TestPutWithFail(c *C) {
351 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
353 st := StubPutHandler{
358 make(chan string, 4)}
361 make(chan string, 1)}
363 arv, err := arvadosclient.MakeArvadosClient()
364 kc, _ := MakeKeepClient(arv)
367 arv.ApiToken = "abc123"
368 localRoots := make(map[string]string)
369 writableLocalRoots := make(map[string]string)
371 ks1 := RunSomeFakeKeepServers(st, 4)
372 ks2 := RunSomeFakeKeepServers(fh, 1)
374 for i, k := range ks1 {
375 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
376 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
377 defer k.listener.Close()
379 for i, k := range ks2 {
380 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
381 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
382 defer k.listener.Close()
385 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
387 shuff := NewRootSorter(
388 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
391 phash, replicas, err := kc.PutB([]byte("foo"))
395 c.Check(err, Equals, nil)
396 c.Check(phash, Equals, "")
397 c.Check(replicas, Equals, 2)
402 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
403 (s1 == shuff[2] && s2 == shuff[1]),
408 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
409 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
411 st := StubPutHandler{
416 make(chan string, 1)}
419 make(chan string, 4)}
421 arv, err := arvadosclient.MakeArvadosClient()
422 kc, _ := MakeKeepClient(arv)
426 arv.ApiToken = "abc123"
427 localRoots := make(map[string]string)
428 writableLocalRoots := make(map[string]string)
430 ks1 := RunSomeFakeKeepServers(st, 1)
431 ks2 := RunSomeFakeKeepServers(fh, 4)
433 for i, k := range ks1 {
434 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
435 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
436 defer k.listener.Close()
438 for i, k := range ks2 {
439 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
440 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
441 defer k.listener.Close()
444 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
446 _, replicas, err := kc.PutB([]byte("foo"))
448 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
449 c.Check(replicas, Equals, 1)
450 c.Check(<-st.handled, Equals, ks1[0].url)
453 type StubGetHandler struct {
456 expectApiToken string
461 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
462 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
463 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
464 resp.WriteHeader(sgh.httpStatus)
465 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
469 func (s *StandaloneSuite) TestGet(c *C) {
470 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
472 st := StubGetHandler{
479 ks := RunFakeKeepServer(st)
480 defer ks.listener.Close()
482 arv, err := arvadosclient.MakeArvadosClient()
483 kc, _ := MakeKeepClient(arv)
484 arv.ApiToken = "abc123"
485 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
487 r, n, url2, err := kc.Get(hash)
489 c.Check(err, Equals, nil)
490 c.Check(n, Equals, int64(3))
491 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
493 content, err2 := ioutil.ReadAll(r)
494 c.Check(err2, Equals, nil)
495 c.Check(content, DeepEquals, []byte("foo"))
498 func (s *StandaloneSuite) TestGet404(c *C) {
499 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
501 st := Error404Handler{make(chan string, 1)}
503 ks := RunFakeKeepServer(st)
504 defer ks.listener.Close()
506 arv, err := arvadosclient.MakeArvadosClient()
507 kc, _ := MakeKeepClient(arv)
508 arv.ApiToken = "abc123"
509 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
511 r, n, url2, err := kc.Get(hash)
512 c.Check(err, Equals, BlockNotFound)
513 c.Check(n, Equals, int64(0))
514 c.Check(url2, Equals, "")
515 c.Check(r, Equals, nil)
518 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
519 st := Error404Handler{make(chan string, 1)}
521 ks := RunFakeKeepServer(st)
522 defer ks.listener.Close()
524 arv, err := arvadosclient.MakeArvadosClient()
525 kc, _ := MakeKeepClient(arv)
526 arv.ApiToken = "abc123"
527 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
529 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
531 c.Check(n, Equals, int64(0))
532 c.Check(url2, Equals, "")
534 buf, err := ioutil.ReadAll(r)
536 c.Check(buf, DeepEquals, []byte{})
539 func (s *StandaloneSuite) TestGetFail(c *C) {
540 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
542 st := FailHandler{make(chan string, 1)}
544 ks := RunFakeKeepServer(st)
545 defer ks.listener.Close()
547 arv, err := arvadosclient.MakeArvadosClient()
548 kc, _ := MakeKeepClient(arv)
549 arv.ApiToken = "abc123"
550 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
553 r, n, url2, err := kc.Get(hash)
554 errNotFound, _ := err.(*ErrNotFound)
555 c.Check(errNotFound, NotNil)
556 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
557 c.Check(errNotFound.Temporary(), Equals, true)
558 c.Check(n, Equals, int64(0))
559 c.Check(url2, Equals, "")
560 c.Check(r, Equals, nil)
563 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
564 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
566 st := &FailThenSucceedHandler{make(chan string, 1), 0,
574 ks := RunFakeKeepServer(st)
575 defer ks.listener.Close()
577 arv, err := arvadosclient.MakeArvadosClient()
578 kc, _ := MakeKeepClient(arv)
579 arv.ApiToken = "abc123"
580 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
582 r, n, url2, err := kc.Get(hash)
584 c.Check(err, Equals, nil)
585 c.Check(n, Equals, int64(3))
586 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
588 content, err2 := ioutil.ReadAll(r)
589 c.Check(err2, Equals, nil)
590 c.Check(content, DeepEquals, []byte("foo"))
593 func (s *StandaloneSuite) TestGetNetError(c *C) {
594 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
596 arv, err := arvadosclient.MakeArvadosClient()
597 kc, _ := MakeKeepClient(arv)
598 arv.ApiToken = "abc123"
599 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
601 r, n, url2, err := kc.Get(hash)
602 errNotFound, _ := err.(*ErrNotFound)
603 c.Check(errNotFound, NotNil)
604 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
605 c.Check(errNotFound.Temporary(), Equals, true)
606 c.Check(n, Equals, int64(0))
607 c.Check(url2, Equals, "")
608 c.Check(r, Equals, nil)
611 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
612 uuid := "zzzzz-bi6l4-123451234512345"
613 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
615 // This one shouldn't be used:
616 ks0 := RunFakeKeepServer(StubGetHandler{
622 defer ks0.listener.Close()
623 // This one should be used:
624 ks := RunFakeKeepServer(StubGetHandler{
630 defer ks.listener.Close()
632 arv, err := arvadosclient.MakeArvadosClient()
633 kc, _ := MakeKeepClient(arv)
634 arv.ApiToken = "abc123"
636 map[string]string{"x": ks0.url},
638 map[string]string{uuid: ks.url})
640 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
642 c.Check(err, Equals, nil)
643 c.Check(n, Equals, int64(3))
644 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
646 content, err := ioutil.ReadAll(r)
647 c.Check(err, Equals, nil)
648 c.Check(content, DeepEquals, []byte("foo"))
651 // Use a service hint to fetch from a local disk service, overriding
652 // rendezvous probe order.
653 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
654 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
655 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
657 // This one shouldn't be used, although it appears first in
658 // rendezvous probe order:
659 ks0 := RunFakeKeepServer(StubGetHandler{
665 defer ks0.listener.Close()
666 // This one should be used:
667 ks := RunFakeKeepServer(StubGetHandler{
673 defer ks.listener.Close()
675 arv, err := arvadosclient.MakeArvadosClient()
676 kc, _ := MakeKeepClient(arv)
677 arv.ApiToken = "abc123"
680 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
681 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
682 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
686 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
687 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
688 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
692 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
694 c.Check(err, Equals, nil)
695 c.Check(n, Equals, int64(3))
696 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
698 content, err := ioutil.ReadAll(r)
699 c.Check(err, Equals, nil)
700 c.Check(content, DeepEquals, []byte("foo"))
703 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
704 uuid := "zzzzz-bi6l4-123451234512345"
705 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
707 ksLocal := RunFakeKeepServer(StubGetHandler{
713 defer ksLocal.listener.Close()
714 ksGateway := RunFakeKeepServer(StubGetHandler{
718 http.StatusInternalServerError,
720 defer ksGateway.listener.Close()
722 arv, err := arvadosclient.MakeArvadosClient()
723 kc, _ := MakeKeepClient(arv)
724 arv.ApiToken = "abc123"
726 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
728 map[string]string{uuid: ksGateway.url})
730 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
731 c.Assert(err, Equals, nil)
733 c.Check(n, Equals, int64(3))
734 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
736 content, err := ioutil.ReadAll(r)
737 c.Check(err, Equals, nil)
738 c.Check(content, DeepEquals, []byte("foo"))
741 type BarHandler struct {
745 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
746 resp.Write([]byte("bar"))
747 this.handled <- fmt.Sprintf("http://%s", req.Host)
750 func (s *StandaloneSuite) TestChecksum(c *C) {
751 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
752 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
754 st := BarHandler{make(chan string, 1)}
756 ks := RunFakeKeepServer(st)
757 defer ks.listener.Close()
759 arv, err := arvadosclient.MakeArvadosClient()
760 kc, _ := MakeKeepClient(arv)
761 arv.ApiToken = "abc123"
762 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
764 r, n, _, err := kc.Get(barhash)
765 _, err = ioutil.ReadAll(r)
766 c.Check(n, Equals, int64(3))
767 c.Check(err, Equals, nil)
771 r, n, _, err = kc.Get(foohash)
772 _, err = ioutil.ReadAll(r)
773 c.Check(n, Equals, int64(3))
774 c.Check(err, Equals, BadChecksum)
779 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
780 content := []byte("waz")
781 hash := fmt.Sprintf("%x", md5.Sum(content))
783 fh := Error404Handler{
784 make(chan string, 4)}
786 st := StubGetHandler{
793 arv, err := arvadosclient.MakeArvadosClient()
794 kc, _ := MakeKeepClient(arv)
795 arv.ApiToken = "abc123"
796 localRoots := make(map[string]string)
797 writableLocalRoots := make(map[string]string)
799 ks1 := RunSomeFakeKeepServers(st, 1)
800 ks2 := RunSomeFakeKeepServers(fh, 4)
802 for i, k := range ks1 {
803 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
804 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
805 defer k.listener.Close()
807 for i, k := range ks2 {
808 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
809 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
810 defer k.listener.Close()
813 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
816 // This test works only if one of the failing services is
817 // attempted before the succeeding service. Otherwise,
818 // <-fh.handled below will just hang! (Probe order depends on
819 // the choice of block content "waz" and the UUIDs of the fake
820 // servers, so we just tried different strings until we found
821 // an example that passes this Assert.)
822 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
824 r, n, url2, err := kc.Get(hash)
827 c.Check(err, Equals, nil)
828 c.Check(n, Equals, int64(3))
829 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
831 read_content, err2 := ioutil.ReadAll(r)
832 c.Check(err2, Equals, nil)
833 c.Check(read_content, DeepEquals, content)
836 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
837 content := []byte("TestPutGetHead")
839 arv, err := arvadosclient.MakeArvadosClient()
840 kc, err := MakeKeepClient(arv)
841 c.Assert(err, Equals, nil)
843 hash := fmt.Sprintf("%x", md5.Sum(content))
846 n, _, err := kc.Ask(hash)
847 c.Check(err, Equals, BlockNotFound)
848 c.Check(n, Equals, int64(0))
851 hash2, replicas, err := kc.PutB(content)
852 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
853 c.Check(replicas, Equals, 2)
854 c.Check(err, Equals, nil)
857 r, n, url2, err := kc.Get(hash)
858 c.Check(err, Equals, nil)
859 c.Check(n, Equals, int64(len(content)))
860 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
862 read_content, err2 := ioutil.ReadAll(r)
863 c.Check(err2, Equals, nil)
864 c.Check(read_content, DeepEquals, content)
867 n, url2, err := kc.Ask(hash)
868 c.Check(err, Equals, nil)
869 c.Check(n, Equals, int64(len(content)))
870 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
874 type StubProxyHandler struct {
878 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
879 resp.Header().Set("X-Keep-Replicas-Stored", "2")
880 this.handled <- fmt.Sprintf("http://%s", req.Host)
883 func (s *StandaloneSuite) TestPutProxy(c *C) {
884 st := StubProxyHandler{make(chan string, 1)}
886 arv, err := arvadosclient.MakeArvadosClient()
887 kc, _ := MakeKeepClient(arv)
890 arv.ApiToken = "abc123"
891 localRoots := make(map[string]string)
892 writableLocalRoots := make(map[string]string)
894 ks1 := RunSomeFakeKeepServers(st, 1)
896 for i, k := range ks1 {
897 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
898 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
899 defer k.listener.Close()
902 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
904 _, replicas, err := kc.PutB([]byte("foo"))
907 c.Check(err, Equals, nil)
908 c.Check(replicas, Equals, 2)
911 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
912 st := StubProxyHandler{make(chan string, 1)}
914 arv, err := arvadosclient.MakeArvadosClient()
915 kc, _ := MakeKeepClient(arv)
918 arv.ApiToken = "abc123"
919 localRoots := make(map[string]string)
920 writableLocalRoots := make(map[string]string)
922 ks1 := RunSomeFakeKeepServers(st, 1)
924 for i, k := range ks1 {
925 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
926 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
927 defer k.listener.Close()
929 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
931 _, replicas, err := kc.PutB([]byte("foo"))
934 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
935 c.Check(replicas, Equals, 2)
938 func (s *StandaloneSuite) TestMakeLocator(c *C) {
939 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
940 c.Check(err, Equals, nil)
941 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
942 c.Check(l.Size, Equals, 3)
943 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
946 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
947 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
948 c.Check(err, Equals, nil)
949 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
950 c.Check(l.Size, Equals, -1)
951 c.Check(l.Hints, DeepEquals, []string{})
954 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
955 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
956 c.Check(err, Equals, nil)
957 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
958 c.Check(l.Size, Equals, -1)
959 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
962 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
963 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
964 l, err := MakeLocator(str)
965 c.Check(err, Equals, nil)
966 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
967 c.Check(l.Size, Equals, 3)
968 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
969 c.Check(l.String(), Equals, str)
972 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
973 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
974 c.Check(err, Equals, InvalidLocatorError)
977 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
978 hash := Md5String("foo")
980 st := StubPutHandler{
985 make(chan string, 5)}
987 arv, _ := arvadosclient.MakeArvadosClient()
988 kc, _ := MakeKeepClient(arv)
991 arv.ApiToken = "abc123"
992 localRoots := make(map[string]string)
993 writableLocalRoots := make(map[string]string)
995 ks := RunSomeFakeKeepServers(st, 5)
997 for i, k := range ks {
998 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1000 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1002 defer k.listener.Close()
1005 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1007 _, replicas, err := kc.PutB([]byte("foo"))
1009 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1010 c.Check(replicas, Equals, 1)
1012 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1015 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1016 hash := Md5String("foo")
1018 st := StubPutHandler{
1023 make(chan string, 5)}
1025 arv, _ := arvadosclient.MakeArvadosClient()
1026 kc, _ := MakeKeepClient(arv)
1028 kc.Want_replicas = 2
1029 arv.ApiToken = "abc123"
1030 localRoots := make(map[string]string)
1031 writableLocalRoots := make(map[string]string)
1033 ks := RunSomeFakeKeepServers(st, 5)
1035 for i, k := range ks {
1036 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1037 defer k.listener.Close()
1040 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1042 _, replicas, err := kc.PutB([]byte("foo"))
1044 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1045 c.Check(replicas, Equals, 0)
1048 type StubGetIndexHandler struct {
1051 expectAPIToken string
1056 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1057 h.c.Check(req.URL.Path, Equals, h.expectPath)
1058 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1059 resp.WriteHeader(h.httpStatus)
1060 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1064 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1065 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1067 st := StubGetIndexHandler{
1072 []byte(hash + "+3 1443559274\n\n")}
1074 ks := RunFakeKeepServer(st)
1075 defer ks.listener.Close()
1077 arv, err := arvadosclient.MakeArvadosClient()
1078 c.Assert(err, IsNil)
1079 kc, err := MakeKeepClient(arv)
1080 c.Assert(err, IsNil)
1081 arv.ApiToken = "abc123"
1082 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1084 r, err := kc.GetIndex("x", "")
1087 content, err2 := ioutil.ReadAll(r)
1088 c.Check(err2, Equals, nil)
1089 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1092 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1093 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1095 st := StubGetIndexHandler{
1097 "/index/" + hash[0:3],
1100 []byte(hash + "+3 1443559274\n\n")}
1102 ks := RunFakeKeepServer(st)
1103 defer ks.listener.Close()
1105 arv, err := arvadosclient.MakeArvadosClient()
1106 kc, _ := MakeKeepClient(arv)
1107 arv.ApiToken = "abc123"
1108 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1110 r, err := kc.GetIndex("x", hash[0:3])
1111 c.Assert(err, Equals, nil)
1113 content, err2 := ioutil.ReadAll(r)
1114 c.Check(err2, Equals, nil)
1115 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1118 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1119 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1121 st := StubGetIndexHandler{
1123 "/index/" + hash[0:3],
1128 ks := RunFakeKeepServer(st)
1129 defer ks.listener.Close()
1131 arv, err := arvadosclient.MakeArvadosClient()
1132 kc, _ := MakeKeepClient(arv)
1133 arv.ApiToken = "abc123"
1134 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1136 _, err = kc.GetIndex("x", hash[0:3])
1137 c.Check(err, Equals, ErrIncompleteIndex)
1140 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1141 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1143 st := StubGetIndexHandler{
1145 "/index/" + hash[0:3],
1150 ks := RunFakeKeepServer(st)
1151 defer ks.listener.Close()
1153 arv, err := arvadosclient.MakeArvadosClient()
1154 kc, _ := MakeKeepClient(arv)
1155 arv.ApiToken = "abc123"
1156 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1158 _, err = kc.GetIndex("y", hash[0:3])
1159 c.Check(err, Equals, ErrNoSuchKeepServer)
1162 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1163 st := StubGetIndexHandler{
1170 ks := RunFakeKeepServer(st)
1171 defer ks.listener.Close()
1173 arv, err := arvadosclient.MakeArvadosClient()
1174 kc, _ := MakeKeepClient(arv)
1175 arv.ApiToken = "abc123"
1176 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1178 r, err := kc.GetIndex("x", "abcd")
1179 c.Check(err, Equals, nil)
1181 content, err2 := ioutil.ReadAll(r)
1182 c.Check(err2, Equals, nil)
1183 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1186 type FailThenSucceedPutHandler struct {
1189 successhandler StubPutHandler
1192 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1194 resp.WriteHeader(500)
1196 h.handled <- fmt.Sprintf("http://%s", req.Host)
1198 h.successhandler.ServeHTTP(resp, req)
1202 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1203 st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1209 make(chan string, 5)}}
1211 arv, _ := arvadosclient.MakeArvadosClient()
1212 kc, _ := MakeKeepClient(arv)
1214 kc.Want_replicas = 2
1215 arv.ApiToken = "abc123"
1216 localRoots := make(map[string]string)
1217 writableLocalRoots := make(map[string]string)
1219 ks := RunSomeFakeKeepServers(st, 2)
1221 for i, k := range ks {
1222 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1223 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1224 defer k.listener.Close()
1227 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1229 hash, replicas, err := kc.PutB([]byte("foo"))
1231 c.Check(err, Equals, nil)
1232 c.Check(hash, Equals, "")
1233 c.Check(replicas, Equals, 2)
1236 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1237 arv, err := arvadosclient.MakeArvadosClient()
1238 c.Assert(err, Equals, nil)
1240 // Add an additional "testblobstore" keepservice
1241 blobKeepService := make(arvadosclient.Dict)
1242 err = arv.Create("keep_services",
1243 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1244 "service_host": "localhost",
1245 "service_port": "21321",
1246 "service_type": "testblobstore"}},
1248 c.Assert(err, Equals, nil)
1249 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1250 RefreshServiceDiscovery()
1252 // Make a keepclient and ensure that the testblobstore is included
1253 kc, err := MakeKeepClient(arv)
1254 c.Assert(err, Equals, nil)
1256 // verify kc.LocalRoots
1257 c.Check(len(kc.LocalRoots()), Equals, 3)
1258 for _, root := range kc.LocalRoots() {
1259 c.Check(root, Matches, "http://localhost:\\d+")
1261 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1263 // verify kc.GatewayRoots
1264 c.Check(len(kc.GatewayRoots()), Equals, 3)
1265 for _, root := range kc.GatewayRoots() {
1266 c.Check(root, Matches, "http://localhost:\\d+")
1268 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1270 // verify kc.WritableLocalRoots
1271 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1272 for _, root := range kc.WritableLocalRoots() {
1273 c.Check(root, Matches, "http://localhost:\\d+")
1275 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1277 c.Assert(kc.replicasPerService, Equals, 0)
1278 c.Assert(kc.foundNonDiskSvc, Equals, true)
1279 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)