1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
22 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
23 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
27 // Gocheck boilerplate
28 func Test(t *testing.T) {
32 // Gocheck boilerplate
33 var _ = Suite(&ServerRequiredSuite{})
34 var _ = Suite(&StandaloneSuite{})
36 // Tests that require the Keep server running
37 type ServerRequiredSuite struct{}
40 type StandaloneSuite struct{}
42 func (s *StandaloneSuite) SetUpTest(c *C) {
43 RefreshServiceDiscovery()
46 func pythonDir() string {
48 return fmt.Sprintf("%s/../../python/tests", cwd)
51 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
52 arvadostest.StartAPI()
53 arvadostest.StartKeep(2, false)
56 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
57 arvadostest.StopKeep(2)
61 func (s *ServerRequiredSuite) SetUpTest(c *C) {
62 RefreshServiceDiscovery()
65 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
66 arv, err := arvadosclient.MakeArvadosClient()
67 c.Assert(err, Equals, nil)
69 kc, err := MakeKeepClient(arv)
71 c.Assert(err, Equals, nil)
72 c.Check(len(kc.LocalRoots()), Equals, 2)
73 for _, root := range kc.LocalRoots() {
74 c.Check(root, Matches, "http://localhost:\\d+")
78 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
79 arv, err := arvadosclient.MakeArvadosClient()
80 c.Assert(err, Equals, nil)
82 kc, err := MakeKeepClient(arv)
83 c.Assert(kc.Want_replicas, Equals, 2)
85 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
86 kc, err = MakeKeepClient(arv)
87 c.Assert(kc.Want_replicas, Equals, 3)
89 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
90 kc, err = MakeKeepClient(arv)
92 c.Assert(kc.Want_replicas, Equals, 1)
95 type StubPutHandler struct {
103 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
104 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
105 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
106 body, err := ioutil.ReadAll(req.Body)
107 sph.c.Check(err, Equals, nil)
108 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
109 resp.WriteHeader(200)
110 sph.handled <- fmt.Sprintf("http://%s", req.Host)
113 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
115 // If we don't explicitly bind it to localhost, ks.listener.Addr() will
116 // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
117 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
119 panic(fmt.Sprintf("Could not listen on any port"))
121 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
122 go http.Serve(ks.listener, st)
126 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
127 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
129 ks := RunFakeKeepServer(st)
130 defer ks.listener.Close()
132 arv, _ := arvadosclient.MakeArvadosClient()
133 arv.ApiToken = "abc123"
135 kc, _ := MakeKeepClient(arv)
137 reader, writer := io.Pipe()
138 upload_status := make(chan uploadStatus)
140 f(kc, ks.url, reader, writer, upload_status)
143 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
144 log.Printf("TestUploadToStubKeepServer")
146 st := StubPutHandler{
148 "acbd18db4cc2f85cedef654fccc4a4d8",
153 UploadToStubHelper(c, st,
154 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
156 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
158 writer.Write([]byte("foo"))
162 status := <-upload_status
163 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
167 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
168 st := StubPutHandler{
170 "acbd18db4cc2f85cedef654fccc4a4d8",
175 UploadToStubHelper(c, st,
176 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
177 go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID())
181 status := <-upload_status
182 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
186 type FailHandler struct {
190 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
191 resp.WriteHeader(500)
192 fh.handled <- fmt.Sprintf("http://%s", req.Host)
195 type FailThenSucceedHandler struct {
198 successhandler http.Handler
202 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
203 fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
205 resp.WriteHeader(500)
207 fh.handled <- fmt.Sprintf("http://%s", req.Host)
209 fh.successhandler.ServeHTTP(resp, req)
213 type Error404Handler struct {
217 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
218 resp.WriteHeader(404)
219 fh.handled <- fmt.Sprintf("http://%s", req.Host)
222 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
226 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
228 UploadToStubHelper(c, st,
229 func(kc *KeepClient, url string, reader io.ReadCloser,
230 writer io.WriteCloser, upload_status chan uploadStatus) {
232 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID())
234 writer.Write([]byte("foo"))
239 status := <-upload_status
240 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
241 c.Check(status.statusCode, Equals, 500)
245 type KeepServer struct {
246 listener net.Listener
250 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
251 ks = make([]KeepServer, n)
253 for i := 0; i < n; i += 1 {
254 ks[i] = RunFakeKeepServer(st)
260 func (s *StandaloneSuite) TestPutB(c *C) {
261 hash := Md5String("foo")
263 st := StubPutHandler{
268 make(chan string, 5)}
270 arv, _ := arvadosclient.MakeArvadosClient()
271 kc, _ := MakeKeepClient(arv)
274 arv.ApiToken = "abc123"
275 localRoots := make(map[string]string)
276 writableLocalRoots := make(map[string]string)
278 ks := RunSomeFakeKeepServers(st, 5)
280 for i, k := range ks {
281 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
282 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
283 defer k.listener.Close()
286 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
288 kc.PutB([]byte("foo"))
290 shuff := NewRootSorter(
291 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
295 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
296 (s1 == shuff[1] && s2 == shuff[0]),
301 func (s *StandaloneSuite) TestPutHR(c *C) {
302 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
304 st := StubPutHandler{
309 make(chan string, 5)}
311 arv, _ := arvadosclient.MakeArvadosClient()
312 kc, _ := MakeKeepClient(arv)
315 arv.ApiToken = "abc123"
316 localRoots := make(map[string]string)
317 writableLocalRoots := make(map[string]string)
319 ks := RunSomeFakeKeepServers(st, 5)
321 for i, k := range ks {
322 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
323 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
324 defer k.listener.Close()
327 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
329 reader, writer := io.Pipe()
332 writer.Write([]byte("foo"))
336 kc.PutHR(hash, reader, 3)
338 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
343 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
344 (s1 == shuff[1] && s2 == shuff[0]),
349 func (s *StandaloneSuite) TestPutWithFail(c *C) {
350 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
352 st := StubPutHandler{
357 make(chan string, 4)}
360 make(chan string, 1)}
362 arv, err := arvadosclient.MakeArvadosClient()
363 kc, _ := MakeKeepClient(arv)
366 arv.ApiToken = "abc123"
367 localRoots := make(map[string]string)
368 writableLocalRoots := make(map[string]string)
370 ks1 := RunSomeFakeKeepServers(st, 4)
371 ks2 := RunSomeFakeKeepServers(fh, 1)
373 for i, k := range ks1 {
374 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
375 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
376 defer k.listener.Close()
378 for i, k := range ks2 {
379 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
380 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
381 defer k.listener.Close()
384 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
386 shuff := NewRootSorter(
387 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
390 phash, replicas, err := kc.PutB([]byte("foo"))
394 c.Check(err, Equals, nil)
395 c.Check(phash, Equals, "")
396 c.Check(replicas, Equals, 2)
401 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
402 (s1 == shuff[2] && s2 == shuff[1]),
407 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
408 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
410 st := StubPutHandler{
415 make(chan string, 1)}
418 make(chan string, 4)}
420 arv, err := arvadosclient.MakeArvadosClient()
421 kc, _ := MakeKeepClient(arv)
425 arv.ApiToken = "abc123"
426 localRoots := make(map[string]string)
427 writableLocalRoots := make(map[string]string)
429 ks1 := RunSomeFakeKeepServers(st, 1)
430 ks2 := RunSomeFakeKeepServers(fh, 4)
432 for i, k := range ks1 {
433 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
434 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
435 defer k.listener.Close()
437 for i, k := range ks2 {
438 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
439 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
440 defer k.listener.Close()
443 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
445 _, replicas, err := kc.PutB([]byte("foo"))
447 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
448 c.Check(replicas, Equals, 1)
449 c.Check(<-st.handled, Equals, ks1[0].url)
452 type StubGetHandler struct {
455 expectApiToken string
460 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
461 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
462 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
463 resp.WriteHeader(sgh.httpStatus)
464 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
468 func (s *StandaloneSuite) TestGet(c *C) {
469 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
471 st := StubGetHandler{
478 ks := RunFakeKeepServer(st)
479 defer ks.listener.Close()
481 arv, err := arvadosclient.MakeArvadosClient()
482 kc, _ := MakeKeepClient(arv)
483 arv.ApiToken = "abc123"
484 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
486 r, n, url2, err := kc.Get(hash)
488 c.Check(err, Equals, nil)
489 c.Check(n, Equals, int64(3))
490 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
492 content, err2 := ioutil.ReadAll(r)
493 c.Check(err2, Equals, nil)
494 c.Check(content, DeepEquals, []byte("foo"))
497 func (s *StandaloneSuite) TestGet404(c *C) {
498 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
500 st := Error404Handler{make(chan string, 1)}
502 ks := RunFakeKeepServer(st)
503 defer ks.listener.Close()
505 arv, err := arvadosclient.MakeArvadosClient()
506 kc, _ := MakeKeepClient(arv)
507 arv.ApiToken = "abc123"
508 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
510 r, n, url2, err := kc.Get(hash)
511 c.Check(err, Equals, BlockNotFound)
512 c.Check(n, Equals, int64(0))
513 c.Check(url2, Equals, "")
514 c.Check(r, Equals, nil)
517 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
518 st := Error404Handler{make(chan string, 1)}
520 ks := RunFakeKeepServer(st)
521 defer ks.listener.Close()
523 arv, err := arvadosclient.MakeArvadosClient()
524 kc, _ := MakeKeepClient(arv)
525 arv.ApiToken = "abc123"
526 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
528 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
530 c.Check(n, Equals, int64(0))
531 c.Check(url2, Equals, "")
533 buf, err := ioutil.ReadAll(r)
535 c.Check(buf, DeepEquals, []byte{})
538 func (s *StandaloneSuite) TestGetFail(c *C) {
539 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
541 st := FailHandler{make(chan string, 1)}
543 ks := RunFakeKeepServer(st)
544 defer ks.listener.Close()
546 arv, err := arvadosclient.MakeArvadosClient()
547 kc, _ := MakeKeepClient(arv)
548 arv.ApiToken = "abc123"
549 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
552 r, n, url2, err := kc.Get(hash)
553 errNotFound, _ := err.(*ErrNotFound)
554 c.Check(errNotFound, NotNil)
555 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
556 c.Check(errNotFound.Temporary(), Equals, true)
557 c.Check(n, Equals, int64(0))
558 c.Check(url2, Equals, "")
559 c.Check(r, Equals, nil)
562 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
563 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
565 st := &FailThenSucceedHandler{
566 handled: make(chan string, 1),
567 successhandler: StubGetHandler{
574 ks := RunFakeKeepServer(st)
575 defer ks.listener.Close()
577 arv, err := arvadosclient.MakeArvadosClient()
578 kc, _ := MakeKeepClient(arv)
579 arv.ApiToken = "abc123"
580 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
582 r, n, url2, err := kc.Get(hash)
584 c.Check(err, Equals, nil)
585 c.Check(n, Equals, int64(3))
586 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
588 content, err2 := ioutil.ReadAll(r)
589 c.Check(err2, Equals, nil)
590 c.Check(content, DeepEquals, []byte("foo"))
592 c.Logf("%q", st.reqIDs)
593 c.Assert(len(st.reqIDs) > 1, Equals, true)
594 for _, reqid := range st.reqIDs {
595 c.Check(reqid, Not(Equals), "")
596 c.Check(reqid, Equals, st.reqIDs[0])
600 func (s *StandaloneSuite) TestGetNetError(c *C) {
601 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
603 arv, err := arvadosclient.MakeArvadosClient()
604 kc, _ := MakeKeepClient(arv)
605 arv.ApiToken = "abc123"
606 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
608 r, n, url2, err := kc.Get(hash)
609 errNotFound, _ := err.(*ErrNotFound)
610 c.Check(errNotFound, NotNil)
611 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
612 c.Check(errNotFound.Temporary(), Equals, true)
613 c.Check(n, Equals, int64(0))
614 c.Check(url2, Equals, "")
615 c.Check(r, Equals, nil)
618 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
619 uuid := "zzzzz-bi6l4-123451234512345"
620 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
622 // This one shouldn't be used:
623 ks0 := RunFakeKeepServer(StubGetHandler{
629 defer ks0.listener.Close()
630 // This one should be used:
631 ks := RunFakeKeepServer(StubGetHandler{
637 defer ks.listener.Close()
639 arv, err := arvadosclient.MakeArvadosClient()
640 kc, _ := MakeKeepClient(arv)
641 arv.ApiToken = "abc123"
643 map[string]string{"x": ks0.url},
645 map[string]string{uuid: ks.url})
647 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
649 c.Check(err, Equals, nil)
650 c.Check(n, Equals, int64(3))
651 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
653 content, err := ioutil.ReadAll(r)
654 c.Check(err, Equals, nil)
655 c.Check(content, DeepEquals, []byte("foo"))
658 // Use a service hint to fetch from a local disk service, overriding
659 // rendezvous probe order.
660 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
661 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
662 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
664 // This one shouldn't be used, although it appears first in
665 // rendezvous probe order:
666 ks0 := RunFakeKeepServer(StubGetHandler{
672 defer ks0.listener.Close()
673 // This one should be used:
674 ks := RunFakeKeepServer(StubGetHandler{
680 defer ks.listener.Close()
682 arv, err := arvadosclient.MakeArvadosClient()
683 kc, _ := MakeKeepClient(arv)
684 arv.ApiToken = "abc123"
687 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
688 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
689 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
693 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
694 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
695 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
699 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
701 c.Check(err, Equals, nil)
702 c.Check(n, Equals, int64(3))
703 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
705 content, err := ioutil.ReadAll(r)
706 c.Check(err, Equals, nil)
707 c.Check(content, DeepEquals, []byte("foo"))
710 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
711 uuid := "zzzzz-bi6l4-123451234512345"
712 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
714 ksLocal := RunFakeKeepServer(StubGetHandler{
720 defer ksLocal.listener.Close()
721 ksGateway := RunFakeKeepServer(StubGetHandler{
725 http.StatusInternalServerError,
727 defer ksGateway.listener.Close()
729 arv, err := arvadosclient.MakeArvadosClient()
730 kc, _ := MakeKeepClient(arv)
731 arv.ApiToken = "abc123"
733 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
735 map[string]string{uuid: ksGateway.url})
737 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
738 c.Assert(err, Equals, nil)
740 c.Check(n, Equals, int64(3))
741 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
743 content, err := ioutil.ReadAll(r)
744 c.Check(err, Equals, nil)
745 c.Check(content, DeepEquals, []byte("foo"))
748 type BarHandler struct {
752 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
753 resp.Write([]byte("bar"))
754 this.handled <- fmt.Sprintf("http://%s", req.Host)
757 func (s *StandaloneSuite) TestChecksum(c *C) {
758 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
759 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
761 st := BarHandler{make(chan string, 1)}
763 ks := RunFakeKeepServer(st)
764 defer ks.listener.Close()
766 arv, err := arvadosclient.MakeArvadosClient()
767 kc, _ := MakeKeepClient(arv)
768 arv.ApiToken = "abc123"
769 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
771 r, n, _, err := kc.Get(barhash)
772 _, err = ioutil.ReadAll(r)
773 c.Check(n, Equals, int64(3))
774 c.Check(err, Equals, nil)
778 r, n, _, err = kc.Get(foohash)
779 _, err = ioutil.ReadAll(r)
780 c.Check(n, Equals, int64(3))
781 c.Check(err, Equals, BadChecksum)
786 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
787 content := []byte("waz")
788 hash := fmt.Sprintf("%x", md5.Sum(content))
790 fh := Error404Handler{
791 make(chan string, 4)}
793 st := StubGetHandler{
800 arv, err := arvadosclient.MakeArvadosClient()
801 kc, _ := MakeKeepClient(arv)
802 arv.ApiToken = "abc123"
803 localRoots := make(map[string]string)
804 writableLocalRoots := make(map[string]string)
806 ks1 := RunSomeFakeKeepServers(st, 1)
807 ks2 := RunSomeFakeKeepServers(fh, 4)
809 for i, k := range ks1 {
810 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
811 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
812 defer k.listener.Close()
814 for i, k := range ks2 {
815 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
816 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
817 defer k.listener.Close()
820 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
823 // This test works only if one of the failing services is
824 // attempted before the succeeding service. Otherwise,
825 // <-fh.handled below will just hang! (Probe order depends on
826 // the choice of block content "waz" and the UUIDs of the fake
827 // servers, so we just tried different strings until we found
828 // an example that passes this Assert.)
829 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
831 r, n, url2, err := kc.Get(hash)
834 c.Check(err, Equals, nil)
835 c.Check(n, Equals, int64(3))
836 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
838 read_content, err2 := ioutil.ReadAll(r)
839 c.Check(err2, Equals, nil)
840 c.Check(read_content, DeepEquals, content)
843 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
844 content := []byte("TestPutGetHead")
846 arv, err := arvadosclient.MakeArvadosClient()
847 kc, err := MakeKeepClient(arv)
848 c.Assert(err, Equals, nil)
850 hash := fmt.Sprintf("%x", md5.Sum(content))
853 n, _, err := kc.Ask(hash)
854 c.Check(err, Equals, BlockNotFound)
855 c.Check(n, Equals, int64(0))
858 hash2, replicas, err := kc.PutB(content)
859 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
860 c.Check(replicas, Equals, 2)
861 c.Check(err, Equals, nil)
864 r, n, url2, err := kc.Get(hash)
865 c.Check(err, Equals, nil)
866 c.Check(n, Equals, int64(len(content)))
867 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
869 read_content, err2 := ioutil.ReadAll(r)
870 c.Check(err2, Equals, nil)
871 c.Check(read_content, DeepEquals, content)
874 n, url2, err := kc.Ask(hash)
875 c.Check(err, Equals, nil)
876 c.Check(n, Equals, int64(len(content)))
877 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
881 type StubProxyHandler struct {
885 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
886 resp.Header().Set("X-Keep-Replicas-Stored", "2")
887 this.handled <- fmt.Sprintf("http://%s", req.Host)
890 func (s *StandaloneSuite) TestPutProxy(c *C) {
891 st := StubProxyHandler{make(chan string, 1)}
893 arv, err := arvadosclient.MakeArvadosClient()
894 kc, _ := MakeKeepClient(arv)
897 arv.ApiToken = "abc123"
898 localRoots := make(map[string]string)
899 writableLocalRoots := make(map[string]string)
901 ks1 := RunSomeFakeKeepServers(st, 1)
903 for i, k := range ks1 {
904 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
905 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
906 defer k.listener.Close()
909 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
911 _, replicas, err := kc.PutB([]byte("foo"))
914 c.Check(err, Equals, nil)
915 c.Check(replicas, Equals, 2)
918 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
919 st := StubProxyHandler{make(chan string, 1)}
921 arv, err := arvadosclient.MakeArvadosClient()
922 kc, _ := MakeKeepClient(arv)
925 arv.ApiToken = "abc123"
926 localRoots := make(map[string]string)
927 writableLocalRoots := make(map[string]string)
929 ks1 := RunSomeFakeKeepServers(st, 1)
931 for i, k := range ks1 {
932 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
933 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
934 defer k.listener.Close()
936 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
938 _, replicas, err := kc.PutB([]byte("foo"))
941 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
942 c.Check(replicas, Equals, 2)
945 func (s *StandaloneSuite) TestMakeLocator(c *C) {
946 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
947 c.Check(err, Equals, nil)
948 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
949 c.Check(l.Size, Equals, 3)
950 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
953 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
954 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
955 c.Check(err, Equals, nil)
956 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
957 c.Check(l.Size, Equals, -1)
958 c.Check(l.Hints, DeepEquals, []string{})
961 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
962 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
963 c.Check(err, Equals, nil)
964 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
965 c.Check(l.Size, Equals, -1)
966 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
969 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
970 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
971 l, err := MakeLocator(str)
972 c.Check(err, Equals, nil)
973 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
974 c.Check(l.Size, Equals, 3)
975 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
976 c.Check(l.String(), Equals, str)
979 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
980 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
981 c.Check(err, Equals, InvalidLocatorError)
984 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
985 hash := Md5String("foo")
987 st := StubPutHandler{
992 make(chan string, 5)}
994 arv, _ := arvadosclient.MakeArvadosClient()
995 kc, _ := MakeKeepClient(arv)
998 arv.ApiToken = "abc123"
999 localRoots := make(map[string]string)
1000 writableLocalRoots := make(map[string]string)
1002 ks := RunSomeFakeKeepServers(st, 5)
1004 for i, k := range ks {
1005 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1007 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1009 defer k.listener.Close()
1012 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1014 _, replicas, err := kc.PutB([]byte("foo"))
1016 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1017 c.Check(replicas, Equals, 1)
1019 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1022 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1023 hash := Md5String("foo")
1025 st := StubPutHandler{
1030 make(chan string, 5)}
1032 arv, _ := arvadosclient.MakeArvadosClient()
1033 kc, _ := MakeKeepClient(arv)
1035 kc.Want_replicas = 2
1036 arv.ApiToken = "abc123"
1037 localRoots := make(map[string]string)
1038 writableLocalRoots := make(map[string]string)
1040 ks := RunSomeFakeKeepServers(st, 5)
1042 for i, k := range ks {
1043 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1044 defer k.listener.Close()
1047 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1049 _, replicas, err := kc.PutB([]byte("foo"))
1051 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1052 c.Check(replicas, Equals, 0)
1055 type StubGetIndexHandler struct {
1058 expectAPIToken string
1063 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1064 h.c.Check(req.URL.Path, Equals, h.expectPath)
1065 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1066 resp.WriteHeader(h.httpStatus)
1067 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1071 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1072 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1074 st := StubGetIndexHandler{
1079 []byte(hash + "+3 1443559274\n\n")}
1081 ks := RunFakeKeepServer(st)
1082 defer ks.listener.Close()
1084 arv, err := arvadosclient.MakeArvadosClient()
1085 c.Assert(err, IsNil)
1086 kc, err := MakeKeepClient(arv)
1087 c.Assert(err, IsNil)
1088 arv.ApiToken = "abc123"
1089 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1091 r, err := kc.GetIndex("x", "")
1094 content, err2 := ioutil.ReadAll(r)
1095 c.Check(err2, Equals, nil)
1096 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1099 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1100 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1102 st := StubGetIndexHandler{
1104 "/index/" + hash[0:3],
1107 []byte(hash + "+3 1443559274\n\n")}
1109 ks := RunFakeKeepServer(st)
1110 defer ks.listener.Close()
1112 arv, err := arvadosclient.MakeArvadosClient()
1113 kc, _ := MakeKeepClient(arv)
1114 arv.ApiToken = "abc123"
1115 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1117 r, err := kc.GetIndex("x", hash[0:3])
1118 c.Assert(err, Equals, nil)
1120 content, err2 := ioutil.ReadAll(r)
1121 c.Check(err2, Equals, nil)
1122 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1125 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1126 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1128 st := StubGetIndexHandler{
1130 "/index/" + hash[0:3],
1135 ks := RunFakeKeepServer(st)
1136 defer ks.listener.Close()
1138 arv, err := arvadosclient.MakeArvadosClient()
1139 kc, _ := MakeKeepClient(arv)
1140 arv.ApiToken = "abc123"
1141 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1143 _, err = kc.GetIndex("x", hash[0:3])
1144 c.Check(err, Equals, ErrIncompleteIndex)
1147 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1148 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1150 st := StubGetIndexHandler{
1152 "/index/" + hash[0:3],
1157 ks := RunFakeKeepServer(st)
1158 defer ks.listener.Close()
1160 arv, err := arvadosclient.MakeArvadosClient()
1161 kc, _ := MakeKeepClient(arv)
1162 arv.ApiToken = "abc123"
1163 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1165 _, err = kc.GetIndex("y", hash[0:3])
1166 c.Check(err, Equals, ErrNoSuchKeepServer)
1169 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1170 st := StubGetIndexHandler{
1177 ks := RunFakeKeepServer(st)
1178 defer ks.listener.Close()
1180 arv, err := arvadosclient.MakeArvadosClient()
1181 kc, _ := MakeKeepClient(arv)
1182 arv.ApiToken = "abc123"
1183 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1185 r, err := kc.GetIndex("x", "abcd")
1186 c.Check(err, Equals, nil)
1188 content, err2 := ioutil.ReadAll(r)
1189 c.Check(err2, Equals, nil)
1190 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1193 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1194 st := &FailThenSucceedHandler{
1195 handled: make(chan string, 1),
1196 successhandler: StubPutHandler{
1201 make(chan string, 5)}}
1203 arv, _ := arvadosclient.MakeArvadosClient()
1204 kc, _ := MakeKeepClient(arv)
1206 kc.Want_replicas = 2
1207 arv.ApiToken = "abc123"
1208 localRoots := make(map[string]string)
1209 writableLocalRoots := make(map[string]string)
1211 ks := RunSomeFakeKeepServers(st, 2)
1213 for i, k := range ks {
1214 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1215 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1216 defer k.listener.Close()
1219 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1221 hash, replicas, err := kc.PutB([]byte("foo"))
1223 c.Check(err, Equals, nil)
1224 c.Check(hash, Equals, "")
1225 c.Check(replicas, Equals, 2)
1228 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1229 arv, err := arvadosclient.MakeArvadosClient()
1230 c.Assert(err, Equals, nil)
1232 // Add an additional "testblobstore" keepservice
1233 blobKeepService := make(arvadosclient.Dict)
1234 err = arv.Create("keep_services",
1235 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1236 "service_host": "localhost",
1237 "service_port": "21321",
1238 "service_type": "testblobstore"}},
1240 c.Assert(err, Equals, nil)
1241 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1242 RefreshServiceDiscovery()
1244 // Make a keepclient and ensure that the testblobstore is included
1245 kc, err := MakeKeepClient(arv)
1246 c.Assert(err, Equals, nil)
1248 // verify kc.LocalRoots
1249 c.Check(len(kc.LocalRoots()), Equals, 3)
1250 for _, root := range kc.LocalRoots() {
1251 c.Check(root, Matches, "http://localhost:\\d+")
1253 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1255 // verify kc.GatewayRoots
1256 c.Check(len(kc.GatewayRoots()), Equals, 3)
1257 for _, root := range kc.GatewayRoots() {
1258 c.Check(root, Matches, "http://localhost:\\d+")
1260 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1262 // verify kc.WritableLocalRoots
1263 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1264 for _, root := range kc.WritableLocalRoots() {
1265 c.Check(root, Matches, "http://localhost:\\d+")
1267 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1269 c.Assert(kc.replicasPerService, Equals, 0)
1270 c.Assert(kc.foundNonDiskSvc, Equals, true)
1271 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)