1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
21 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
23 "git.curoverse.com/arvados.git/sdk/go/streamer"
27 // Gocheck boilerplate
28 func Test(t *testing.T) {
32 // Gocheck boilerplate
33 var _ = Suite(&ServerRequiredSuite{})
34 var _ = Suite(&StandaloneSuite{})
36 // Tests that require the Keep server running
37 type ServerRequiredSuite struct{}
40 type StandaloneSuite struct{}
42 func (s *StandaloneSuite) SetUpTest(c *C) {
43 RefreshServiceDiscovery()
46 func pythonDir() string {
48 return fmt.Sprintf("%s/../../python/tests", cwd)
51 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
52 arvadostest.StartAPI()
53 arvadostest.StartKeep(2, false)
56 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
57 arvadostest.StopKeep(2)
61 func (s *ServerRequiredSuite) SetUpTest(c *C) {
62 RefreshServiceDiscovery()
65 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
66 arv, err := arvadosclient.MakeArvadosClient()
67 c.Assert(err, Equals, nil)
69 kc, err := MakeKeepClient(arv)
71 c.Assert(err, Equals, nil)
72 c.Check(len(kc.LocalRoots()), Equals, 2)
73 for _, root := range kc.LocalRoots() {
74 c.Check(root, Matches, "http://localhost:\\d+")
78 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
79 arv, err := arvadosclient.MakeArvadosClient()
80 c.Assert(err, Equals, nil)
82 kc, err := MakeKeepClient(arv)
83 c.Assert(kc.Want_replicas, Equals, 2)
85 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
86 kc, err = MakeKeepClient(arv)
87 c.Assert(kc.Want_replicas, Equals, 3)
89 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
90 kc, err = MakeKeepClient(arv)
91 c.Assert(kc.Want_replicas, Equals, 1)
94 type StubPutHandler struct {
102 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
103 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
104 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
105 body, err := ioutil.ReadAll(req.Body)
106 sph.c.Check(err, Equals, nil)
107 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
108 resp.WriteHeader(200)
109 sph.handled <- fmt.Sprintf("http://%s", req.Host)
112 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
114 // If we don't explicitly bind it to localhost, ks.listener.Addr() will
115 // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
116 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
118 panic(fmt.Sprintf("Could not listen on any port"))
120 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
121 go http.Serve(ks.listener, st)
125 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
126 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
128 ks := RunFakeKeepServer(st)
129 defer ks.listener.Close()
131 arv, _ := arvadosclient.MakeArvadosClient()
132 arv.ApiToken = "abc123"
134 kc, _ := MakeKeepClient(arv)
136 reader, writer := io.Pipe()
137 upload_status := make(chan uploadStatus)
139 f(kc, ks.url, reader, writer, upload_status)
142 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
143 log.Printf("TestUploadToStubKeepServer")
145 st := StubPutHandler{
147 "acbd18db4cc2f85cedef654fccc4a4d8",
152 UploadToStubHelper(c, st,
153 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
155 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
157 writer.Write([]byte("foo"))
161 status := <-upload_status
162 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
166 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
167 st := StubPutHandler{
169 "acbd18db4cc2f85cedef654fccc4a4d8",
174 UploadToStubHelper(c, st,
175 func(kc *KeepClient, url string, reader io.ReadCloser,
176 writer io.WriteCloser, upload_status chan uploadStatus) {
178 tr := streamer.AsyncStreamFromReader(512, reader)
181 br1 := tr.MakeStreamReader()
183 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
185 writer.Write([]byte("foo"))
190 status := <-upload_status
191 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
195 type FailHandler struct {
199 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
200 resp.WriteHeader(500)
201 fh.handled <- fmt.Sprintf("http://%s", req.Host)
204 type FailThenSucceedHandler struct {
207 successhandler StubGetHandler
210 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
212 resp.WriteHeader(500)
214 fh.handled <- fmt.Sprintf("http://%s", req.Host)
216 fh.successhandler.ServeHTTP(resp, req)
220 type Error404Handler struct {
224 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
225 resp.WriteHeader(404)
226 fh.handled <- fmt.Sprintf("http://%s", req.Host)
229 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
233 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
235 UploadToStubHelper(c, st,
236 func(kc *KeepClient, url string, reader io.ReadCloser,
237 writer io.WriteCloser, upload_status chan uploadStatus) {
239 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
241 writer.Write([]byte("foo"))
246 status := <-upload_status
247 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
248 c.Check(status.statusCode, Equals, 500)
252 type KeepServer struct {
253 listener net.Listener
257 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
258 ks = make([]KeepServer, n)
260 for i := 0; i < n; i += 1 {
261 ks[i] = RunFakeKeepServer(st)
267 func (s *StandaloneSuite) TestPutB(c *C) {
268 hash := Md5String("foo")
270 st := StubPutHandler{
275 make(chan string, 5)}
277 arv, _ := arvadosclient.MakeArvadosClient()
278 kc, _ := MakeKeepClient(arv)
281 arv.ApiToken = "abc123"
282 localRoots := make(map[string]string)
283 writableLocalRoots := make(map[string]string)
285 ks := RunSomeFakeKeepServers(st, 5)
287 for i, k := range ks {
288 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
289 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
290 defer k.listener.Close()
293 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
295 kc.PutB([]byte("foo"))
297 shuff := NewRootSorter(
298 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
302 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
303 (s1 == shuff[1] && s2 == shuff[0]),
308 func (s *StandaloneSuite) TestPutHR(c *C) {
309 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
311 st := StubPutHandler{
316 make(chan string, 5)}
318 arv, _ := arvadosclient.MakeArvadosClient()
319 kc, _ := MakeKeepClient(arv)
322 arv.ApiToken = "abc123"
323 localRoots := make(map[string]string)
324 writableLocalRoots := make(map[string]string)
326 ks := RunSomeFakeKeepServers(st, 5)
328 for i, k := range ks {
329 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
330 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
331 defer k.listener.Close()
334 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
336 reader, writer := io.Pipe()
339 writer.Write([]byte("foo"))
343 kc.PutHR(hash, reader, 3)
345 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
350 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
351 (s1 == shuff[1] && s2 == shuff[0]),
356 func (s *StandaloneSuite) TestPutWithFail(c *C) {
357 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
359 st := StubPutHandler{
364 make(chan string, 4)}
367 make(chan string, 1)}
369 arv, err := arvadosclient.MakeArvadosClient()
370 kc, _ := MakeKeepClient(arv)
373 arv.ApiToken = "abc123"
374 localRoots := make(map[string]string)
375 writableLocalRoots := make(map[string]string)
377 ks1 := RunSomeFakeKeepServers(st, 4)
378 ks2 := RunSomeFakeKeepServers(fh, 1)
380 for i, k := range ks1 {
381 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
382 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
383 defer k.listener.Close()
385 for i, k := range ks2 {
386 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
387 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
388 defer k.listener.Close()
391 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
393 shuff := NewRootSorter(
394 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
397 phash, replicas, err := kc.PutB([]byte("foo"))
401 c.Check(err, Equals, nil)
402 c.Check(phash, Equals, "")
403 c.Check(replicas, Equals, 2)
408 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
409 (s1 == shuff[2] && s2 == shuff[1]),
414 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
415 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
417 st := StubPutHandler{
422 make(chan string, 1)}
425 make(chan string, 4)}
427 arv, err := arvadosclient.MakeArvadosClient()
428 kc, _ := MakeKeepClient(arv)
432 arv.ApiToken = "abc123"
433 localRoots := make(map[string]string)
434 writableLocalRoots := make(map[string]string)
436 ks1 := RunSomeFakeKeepServers(st, 1)
437 ks2 := RunSomeFakeKeepServers(fh, 4)
439 for i, k := range ks1 {
440 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
441 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
442 defer k.listener.Close()
444 for i, k := range ks2 {
445 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
446 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
447 defer k.listener.Close()
450 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
452 _, replicas, err := kc.PutB([]byte("foo"))
454 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
455 c.Check(replicas, Equals, 1)
456 c.Check(<-st.handled, Equals, ks1[0].url)
459 type StubGetHandler struct {
462 expectApiToken string
467 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
468 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
469 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
470 resp.WriteHeader(sgh.httpStatus)
471 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
475 func (s *StandaloneSuite) TestGet(c *C) {
476 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
478 st := StubGetHandler{
485 ks := RunFakeKeepServer(st)
486 defer ks.listener.Close()
488 arv, err := arvadosclient.MakeArvadosClient()
489 kc, _ := MakeKeepClient(arv)
490 arv.ApiToken = "abc123"
491 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
493 r, n, url2, err := kc.Get(hash)
495 c.Check(err, Equals, nil)
496 c.Check(n, Equals, int64(3))
497 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
499 content, err2 := ioutil.ReadAll(r)
500 c.Check(err2, Equals, nil)
501 c.Check(content, DeepEquals, []byte("foo"))
504 func (s *StandaloneSuite) TestGet404(c *C) {
505 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
507 st := Error404Handler{make(chan string, 1)}
509 ks := RunFakeKeepServer(st)
510 defer ks.listener.Close()
512 arv, err := arvadosclient.MakeArvadosClient()
513 kc, _ := MakeKeepClient(arv)
514 arv.ApiToken = "abc123"
515 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
517 r, n, url2, err := kc.Get(hash)
518 c.Check(err, Equals, BlockNotFound)
519 c.Check(n, Equals, int64(0))
520 c.Check(url2, Equals, "")
521 c.Check(r, Equals, nil)
524 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
525 st := Error404Handler{make(chan string, 1)}
527 ks := RunFakeKeepServer(st)
528 defer ks.listener.Close()
530 arv, err := arvadosclient.MakeArvadosClient()
531 kc, _ := MakeKeepClient(arv)
532 arv.ApiToken = "abc123"
533 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
535 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
537 c.Check(n, Equals, int64(0))
538 c.Check(url2, Equals, "")
540 buf, err := ioutil.ReadAll(r)
542 c.Check(buf, DeepEquals, []byte{})
545 func (s *StandaloneSuite) TestGetFail(c *C) {
546 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
548 st := FailHandler{make(chan string, 1)}
550 ks := RunFakeKeepServer(st)
551 defer ks.listener.Close()
553 arv, err := arvadosclient.MakeArvadosClient()
554 kc, _ := MakeKeepClient(arv)
555 arv.ApiToken = "abc123"
556 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
559 r, n, url2, err := kc.Get(hash)
560 errNotFound, _ := err.(*ErrNotFound)
561 c.Check(errNotFound, NotNil)
562 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
563 c.Check(errNotFound.Temporary(), Equals, true)
564 c.Check(n, Equals, int64(0))
565 c.Check(url2, Equals, "")
566 c.Check(r, Equals, nil)
569 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
570 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
572 st := &FailThenSucceedHandler{make(chan string, 1), 0,
580 ks := RunFakeKeepServer(st)
581 defer ks.listener.Close()
583 arv, err := arvadosclient.MakeArvadosClient()
584 kc, _ := MakeKeepClient(arv)
585 arv.ApiToken = "abc123"
586 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
588 r, n, url2, err := kc.Get(hash)
590 c.Check(err, Equals, nil)
591 c.Check(n, Equals, int64(3))
592 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
594 content, err2 := ioutil.ReadAll(r)
595 c.Check(err2, Equals, nil)
596 c.Check(content, DeepEquals, []byte("foo"))
599 func (s *StandaloneSuite) TestGetNetError(c *C) {
600 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
602 arv, err := arvadosclient.MakeArvadosClient()
603 kc, _ := MakeKeepClient(arv)
604 arv.ApiToken = "abc123"
605 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
607 r, n, url2, err := kc.Get(hash)
608 errNotFound, _ := err.(*ErrNotFound)
609 c.Check(errNotFound, NotNil)
610 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
611 c.Check(errNotFound.Temporary(), Equals, true)
612 c.Check(n, Equals, int64(0))
613 c.Check(url2, Equals, "")
614 c.Check(r, Equals, nil)
617 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
618 uuid := "zzzzz-bi6l4-123451234512345"
619 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
621 // This one shouldn't be used:
622 ks0 := RunFakeKeepServer(StubGetHandler{
628 defer ks0.listener.Close()
629 // This one should be used:
630 ks := RunFakeKeepServer(StubGetHandler{
636 defer ks.listener.Close()
638 arv, err := arvadosclient.MakeArvadosClient()
639 kc, _ := MakeKeepClient(arv)
640 arv.ApiToken = "abc123"
642 map[string]string{"x": ks0.url},
644 map[string]string{uuid: ks.url})
646 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
648 c.Check(err, Equals, nil)
649 c.Check(n, Equals, int64(3))
650 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
652 content, err := ioutil.ReadAll(r)
653 c.Check(err, Equals, nil)
654 c.Check(content, DeepEquals, []byte("foo"))
657 // Use a service hint to fetch from a local disk service, overriding
658 // rendezvous probe order.
659 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
660 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
661 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
663 // This one shouldn't be used, although it appears first in
664 // rendezvous probe order:
665 ks0 := RunFakeKeepServer(StubGetHandler{
671 defer ks0.listener.Close()
672 // This one should be used:
673 ks := RunFakeKeepServer(StubGetHandler{
679 defer ks.listener.Close()
681 arv, err := arvadosclient.MakeArvadosClient()
682 kc, _ := MakeKeepClient(arv)
683 arv.ApiToken = "abc123"
686 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
687 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
688 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
692 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
693 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
694 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
698 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
700 c.Check(err, Equals, nil)
701 c.Check(n, Equals, int64(3))
702 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
704 content, err := ioutil.ReadAll(r)
705 c.Check(err, Equals, nil)
706 c.Check(content, DeepEquals, []byte("foo"))
709 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
710 uuid := "zzzzz-bi6l4-123451234512345"
711 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
713 ksLocal := RunFakeKeepServer(StubGetHandler{
719 defer ksLocal.listener.Close()
720 ksGateway := RunFakeKeepServer(StubGetHandler{
724 http.StatusInternalServerError,
726 defer ksGateway.listener.Close()
728 arv, err := arvadosclient.MakeArvadosClient()
729 kc, _ := MakeKeepClient(arv)
730 arv.ApiToken = "abc123"
732 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
734 map[string]string{uuid: ksGateway.url})
736 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
737 c.Assert(err, Equals, nil)
739 c.Check(n, Equals, int64(3))
740 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
742 content, err := ioutil.ReadAll(r)
743 c.Check(err, Equals, nil)
744 c.Check(content, DeepEquals, []byte("foo"))
747 type BarHandler struct {
751 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
752 resp.Write([]byte("bar"))
753 this.handled <- fmt.Sprintf("http://%s", req.Host)
756 func (s *StandaloneSuite) TestChecksum(c *C) {
757 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
758 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
760 st := BarHandler{make(chan string, 1)}
762 ks := RunFakeKeepServer(st)
763 defer ks.listener.Close()
765 arv, err := arvadosclient.MakeArvadosClient()
766 kc, _ := MakeKeepClient(arv)
767 arv.ApiToken = "abc123"
768 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
770 r, n, _, err := kc.Get(barhash)
771 _, err = ioutil.ReadAll(r)
772 c.Check(n, Equals, int64(3))
773 c.Check(err, Equals, nil)
777 r, n, _, err = kc.Get(foohash)
778 _, err = ioutil.ReadAll(r)
779 c.Check(n, Equals, int64(3))
780 c.Check(err, Equals, BadChecksum)
785 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
786 content := []byte("waz")
787 hash := fmt.Sprintf("%x", md5.Sum(content))
789 fh := Error404Handler{
790 make(chan string, 4)}
792 st := StubGetHandler{
799 arv, err := arvadosclient.MakeArvadosClient()
800 kc, _ := MakeKeepClient(arv)
801 arv.ApiToken = "abc123"
802 localRoots := make(map[string]string)
803 writableLocalRoots := make(map[string]string)
805 ks1 := RunSomeFakeKeepServers(st, 1)
806 ks2 := RunSomeFakeKeepServers(fh, 4)
808 for i, k := range ks1 {
809 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
810 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
811 defer k.listener.Close()
813 for i, k := range ks2 {
814 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
815 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
816 defer k.listener.Close()
819 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
822 // This test works only if one of the failing services is
823 // attempted before the succeeding service. Otherwise,
824 // <-fh.handled below will just hang! (Probe order depends on
825 // the choice of block content "waz" and the UUIDs of the fake
826 // servers, so we just tried different strings until we found
827 // an example that passes this Assert.)
828 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
830 r, n, url2, err := kc.Get(hash)
833 c.Check(err, Equals, nil)
834 c.Check(n, Equals, int64(3))
835 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
837 read_content, err2 := ioutil.ReadAll(r)
838 c.Check(err2, Equals, nil)
839 c.Check(read_content, DeepEquals, content)
842 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
843 content := []byte("TestPutGetHead")
845 arv, err := arvadosclient.MakeArvadosClient()
846 kc, err := MakeKeepClient(arv)
847 c.Assert(err, Equals, nil)
849 hash := fmt.Sprintf("%x", md5.Sum(content))
852 n, _, err := kc.Ask(hash)
853 c.Check(err, Equals, BlockNotFound)
854 c.Check(n, Equals, int64(0))
857 hash2, replicas, err := kc.PutB(content)
858 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
859 c.Check(replicas, Equals, 2)
860 c.Check(err, Equals, nil)
863 r, n, url2, err := kc.Get(hash)
864 c.Check(err, Equals, nil)
865 c.Check(n, Equals, int64(len(content)))
866 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
868 read_content, err2 := ioutil.ReadAll(r)
869 c.Check(err2, Equals, nil)
870 c.Check(read_content, DeepEquals, content)
873 n, url2, err := kc.Ask(hash)
874 c.Check(err, Equals, nil)
875 c.Check(n, Equals, int64(len(content)))
876 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
880 type StubProxyHandler struct {
884 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
885 resp.Header().Set("X-Keep-Replicas-Stored", "2")
886 this.handled <- fmt.Sprintf("http://%s", req.Host)
889 func (s *StandaloneSuite) TestPutProxy(c *C) {
890 st := StubProxyHandler{make(chan string, 1)}
892 arv, err := arvadosclient.MakeArvadosClient()
893 kc, _ := MakeKeepClient(arv)
896 arv.ApiToken = "abc123"
897 localRoots := make(map[string]string)
898 writableLocalRoots := make(map[string]string)
900 ks1 := RunSomeFakeKeepServers(st, 1)
902 for i, k := range ks1 {
903 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
904 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
905 defer k.listener.Close()
908 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
910 _, replicas, err := kc.PutB([]byte("foo"))
913 c.Check(err, Equals, nil)
914 c.Check(replicas, Equals, 2)
917 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
918 st := StubProxyHandler{make(chan string, 1)}
920 arv, err := arvadosclient.MakeArvadosClient()
921 kc, _ := MakeKeepClient(arv)
924 arv.ApiToken = "abc123"
925 localRoots := make(map[string]string)
926 writableLocalRoots := make(map[string]string)
928 ks1 := RunSomeFakeKeepServers(st, 1)
930 for i, k := range ks1 {
931 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
932 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
933 defer k.listener.Close()
935 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
937 _, replicas, err := kc.PutB([]byte("foo"))
940 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
941 c.Check(replicas, Equals, 2)
944 func (s *StandaloneSuite) TestMakeLocator(c *C) {
945 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
946 c.Check(err, Equals, nil)
947 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
948 c.Check(l.Size, Equals, 3)
949 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
952 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
953 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
954 c.Check(err, Equals, nil)
955 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
956 c.Check(l.Size, Equals, -1)
957 c.Check(l.Hints, DeepEquals, []string{})
960 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
961 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
962 c.Check(err, Equals, nil)
963 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
964 c.Check(l.Size, Equals, -1)
965 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
968 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
969 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
970 l, err := MakeLocator(str)
971 c.Check(err, Equals, nil)
972 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
973 c.Check(l.Size, Equals, 3)
974 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
975 c.Check(l.String(), Equals, str)
978 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
979 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
980 c.Check(err, Equals, InvalidLocatorError)
983 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
984 hash := Md5String("foo")
986 st := StubPutHandler{
991 make(chan string, 5)}
993 arv, _ := arvadosclient.MakeArvadosClient()
994 kc, _ := MakeKeepClient(arv)
997 arv.ApiToken = "abc123"
998 localRoots := make(map[string]string)
999 writableLocalRoots := make(map[string]string)
1001 ks := RunSomeFakeKeepServers(st, 5)
1003 for i, k := range ks {
1004 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1006 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1008 defer k.listener.Close()
1011 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1013 _, replicas, err := kc.PutB([]byte("foo"))
1015 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1016 c.Check(replicas, Equals, 1)
1018 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1021 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1022 hash := Md5String("foo")
1024 st := StubPutHandler{
1029 make(chan string, 5)}
1031 arv, _ := arvadosclient.MakeArvadosClient()
1032 kc, _ := MakeKeepClient(arv)
1034 kc.Want_replicas = 2
1035 arv.ApiToken = "abc123"
1036 localRoots := make(map[string]string)
1037 writableLocalRoots := make(map[string]string)
1039 ks := RunSomeFakeKeepServers(st, 5)
1041 for i, k := range ks {
1042 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1043 defer k.listener.Close()
1046 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1048 _, replicas, err := kc.PutB([]byte("foo"))
1050 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1051 c.Check(replicas, Equals, 0)
1054 type StubGetIndexHandler struct {
1057 expectAPIToken string
1062 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1063 h.c.Check(req.URL.Path, Equals, h.expectPath)
1064 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1065 resp.WriteHeader(h.httpStatus)
1066 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1070 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1071 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1073 st := StubGetIndexHandler{
1078 []byte(hash + "+3 1443559274\n\n")}
1080 ks := RunFakeKeepServer(st)
1081 defer ks.listener.Close()
1083 arv, err := arvadosclient.MakeArvadosClient()
1084 c.Assert(err, IsNil)
1085 kc, err := MakeKeepClient(arv)
1086 c.Assert(err, IsNil)
1087 arv.ApiToken = "abc123"
1088 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1090 r, err := kc.GetIndex("x", "")
1093 content, err2 := ioutil.ReadAll(r)
1094 c.Check(err2, Equals, nil)
1095 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1098 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1099 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1101 st := StubGetIndexHandler{
1103 "/index/" + hash[0:3],
1106 []byte(hash + "+3 1443559274\n\n")}
1108 ks := RunFakeKeepServer(st)
1109 defer ks.listener.Close()
1111 arv, err := arvadosclient.MakeArvadosClient()
1112 kc, _ := MakeKeepClient(arv)
1113 arv.ApiToken = "abc123"
1114 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1116 r, err := kc.GetIndex("x", hash[0:3])
1117 c.Assert(err, Equals, nil)
1119 content, err2 := ioutil.ReadAll(r)
1120 c.Check(err2, Equals, nil)
1121 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1124 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1125 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1127 st := StubGetIndexHandler{
1129 "/index/" + hash[0:3],
1134 ks := RunFakeKeepServer(st)
1135 defer ks.listener.Close()
1137 arv, err := arvadosclient.MakeArvadosClient()
1138 kc, _ := MakeKeepClient(arv)
1139 arv.ApiToken = "abc123"
1140 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1142 _, err = kc.GetIndex("x", hash[0:3])
1143 c.Check(err, Equals, ErrIncompleteIndex)
1146 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1147 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1149 st := StubGetIndexHandler{
1151 "/index/" + hash[0:3],
1156 ks := RunFakeKeepServer(st)
1157 defer ks.listener.Close()
1159 arv, err := arvadosclient.MakeArvadosClient()
1160 kc, _ := MakeKeepClient(arv)
1161 arv.ApiToken = "abc123"
1162 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1164 _, err = kc.GetIndex("y", hash[0:3])
1165 c.Check(err, Equals, ErrNoSuchKeepServer)
1168 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1169 st := StubGetIndexHandler{
1176 ks := RunFakeKeepServer(st)
1177 defer ks.listener.Close()
1179 arv, err := arvadosclient.MakeArvadosClient()
1180 kc, _ := MakeKeepClient(arv)
1181 arv.ApiToken = "abc123"
1182 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1184 r, err := kc.GetIndex("x", "abcd")
1185 c.Check(err, Equals, nil)
1187 content, err2 := ioutil.ReadAll(r)
1188 c.Check(err2, Equals, nil)
1189 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1192 type FailThenSucceedPutHandler struct {
1195 successhandler StubPutHandler
1198 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1200 resp.WriteHeader(500)
1202 h.handled <- fmt.Sprintf("http://%s", req.Host)
1204 h.successhandler.ServeHTTP(resp, req)
1208 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1209 st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1215 make(chan string, 5)}}
1217 arv, _ := arvadosclient.MakeArvadosClient()
1218 kc, _ := MakeKeepClient(arv)
1220 kc.Want_replicas = 2
1221 arv.ApiToken = "abc123"
1222 localRoots := make(map[string]string)
1223 writableLocalRoots := make(map[string]string)
1225 ks := RunSomeFakeKeepServers(st, 2)
1227 for i, k := range ks {
1228 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1229 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1230 defer k.listener.Close()
1233 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1235 hash, replicas, err := kc.PutB([]byte("foo"))
1237 c.Check(err, Equals, nil)
1238 c.Check(hash, Equals, "")
1239 c.Check(replicas, Equals, 2)
1242 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1243 arv, err := arvadosclient.MakeArvadosClient()
1244 c.Assert(err, Equals, nil)
1246 // Add an additional "testblobstore" keepservice
1247 blobKeepService := make(arvadosclient.Dict)
1248 err = arv.Create("keep_services",
1249 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1250 "service_host": "localhost",
1251 "service_port": "21321",
1252 "service_type": "testblobstore"}},
1254 c.Assert(err, Equals, nil)
1255 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1256 RefreshServiceDiscovery()
1258 // Make a keepclient and ensure that the testblobstore is included
1259 kc, err := MakeKeepClient(arv)
1260 c.Assert(err, Equals, nil)
1262 // verify kc.LocalRoots
1263 c.Check(len(kc.LocalRoots()), Equals, 3)
1264 for _, root := range kc.LocalRoots() {
1265 c.Check(root, Matches, "http://localhost:\\d+")
1267 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1269 // verify kc.GatewayRoots
1270 c.Check(len(kc.GatewayRoots()), Equals, 3)
1271 for _, root := range kc.GatewayRoots() {
1272 c.Check(root, Matches, "http://localhost:\\d+")
1274 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1276 // verify kc.WritableLocalRoots
1277 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1278 for _, root := range kc.WritableLocalRoots() {
1279 c.Check(root, Matches, "http://localhost:\\d+")
1281 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1283 c.Assert(kc.replicasPerService, Equals, 0)
1284 c.Assert(kc.foundNonDiskSvc, Equals, true)
1285 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)