1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
22 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
27 // Gocheck boilerplate
28 func Test(t *testing.T) {
32 // Gocheck boilerplate
33 var _ = Suite(&ServerRequiredSuite{})
34 var _ = Suite(&StandaloneSuite{})
36 // Tests that require the Keep server running
37 type ServerRequiredSuite struct{}
40 type StandaloneSuite struct{}
42 func (s *StandaloneSuite) SetUpTest(c *C) {
43 RefreshServiceDiscovery()
46 func pythonDir() string {
48 return fmt.Sprintf("%s/../../python/tests", cwd)
51 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
52 arvadostest.StartAPI()
53 arvadostest.StartKeep(2, false)
56 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
57 arvadostest.StopKeep(2)
61 func (s *ServerRequiredSuite) SetUpTest(c *C) {
62 RefreshServiceDiscovery()
65 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
66 arv, err := arvadosclient.MakeArvadosClient()
67 c.Assert(err, Equals, nil)
69 kc, err := MakeKeepClient(arv)
71 c.Assert(err, Equals, nil)
72 c.Check(len(kc.LocalRoots()), Equals, 2)
73 for _, root := range kc.LocalRoots() {
74 c.Check(root, Matches, "http://localhost:\\d+")
78 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
79 arv, err := arvadosclient.MakeArvadosClient()
80 c.Assert(err, Equals, nil)
82 kc, err := MakeKeepClient(arv)
84 c.Assert(kc.Want_replicas, Equals, 2)
86 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
87 kc, err = MakeKeepClient(arv)
89 c.Assert(kc.Want_replicas, Equals, 3)
91 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
92 kc, err = MakeKeepClient(arv)
94 c.Assert(kc.Want_replicas, Equals, 1)
97 type StubPutHandler struct {
100 expectAPIToken string
102 expectStorageClass string
103 returnStorageClasses string
107 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
108 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
109 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
110 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
111 body, err := ioutil.ReadAll(req.Body)
112 sph.c.Check(err, Equals, nil)
113 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
114 resp.Header().Set("X-Keep-Replicas-Stored", "1")
115 if sph.returnStorageClasses != "" {
116 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
118 resp.WriteHeader(200)
119 sph.handled <- fmt.Sprintf("http://%s", req.Host)
122 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
124 // If we don't explicitly bind it to localhost, ks.listener.Addr() will
125 // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
126 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
128 panic(fmt.Sprintf("Could not listen on any port"))
130 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
131 go http.Serve(ks.listener, st)
135 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
136 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
138 ks := RunFakeKeepServer(st)
139 defer ks.listener.Close()
141 arv, _ := arvadosclient.MakeArvadosClient()
142 arv.ApiToken = "abc123"
144 kc, _ := MakeKeepClient(arv)
146 reader, writer := io.Pipe()
147 uploadStatusChan := make(chan uploadStatus)
149 f(kc, ks.url, reader, writer, uploadStatusChan)
152 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
153 log.Printf("TestUploadToStubKeepServer")
155 st := StubPutHandler{
157 "acbd18db4cc2f85cedef654fccc4a4d8",
163 UploadToStubHelper(c, st,
164 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
165 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
167 writer.Write([]byte("foo"))
171 status := <-uploadStatusChan
172 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
176 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
177 st := StubPutHandler{
179 "acbd18db4cc2f85cedef654fccc4a4d8",
185 UploadToStubHelper(c, st,
186 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
187 go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
191 status := <-uploadStatusChan
192 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
196 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
197 for _, trial := range []struct {
199 expectMap map[string]int
202 {"foo=1", map[string]int{"foo": 1}},
203 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
207 st := StubPutHandler{
209 "acbd18db4cc2f85cedef654fccc4a4d8",
212 "", trial.respHeader,
215 UploadToStubHelper(c, st,
216 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
217 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
219 writer.Write([]byte("foo"))
223 status := <-uploadStatusChan
224 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
229 type FailHandler struct {
233 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
234 resp.WriteHeader(500)
235 fh.handled <- fmt.Sprintf("http://%s", req.Host)
238 type FailThenSucceedHandler struct {
241 successhandler http.Handler
245 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
246 fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
248 resp.WriteHeader(500)
250 fh.handled <- fmt.Sprintf("http://%s", req.Host)
252 fh.successhandler.ServeHTTP(resp, req)
256 type Error404Handler struct {
260 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
261 resp.WriteHeader(404)
262 fh.handled <- fmt.Sprintf("http://%s", req.Host)
265 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
269 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
271 UploadToStubHelper(c, st,
272 func(kc *KeepClient, url string, reader io.ReadCloser,
273 writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
275 go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
277 writer.Write([]byte("foo"))
282 status := <-uploadStatusChan
283 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
284 c.Check(status.statusCode, Equals, 500)
288 type KeepServer struct {
289 listener net.Listener
293 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
294 ks = make([]KeepServer, n)
296 for i := 0; i < n; i++ {
297 ks[i] = RunFakeKeepServer(st)
303 func (s *StandaloneSuite) TestPutB(c *C) {
304 hash := Md5String("foo")
306 st := StubPutHandler{
313 make(chan string, 5)}
315 arv, _ := arvadosclient.MakeArvadosClient()
316 kc, _ := MakeKeepClient(arv)
319 arv.ApiToken = "abc123"
320 localRoots := make(map[string]string)
321 writableLocalRoots := make(map[string]string)
323 ks := RunSomeFakeKeepServers(st, 5)
325 for i, k := range ks {
326 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
327 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
328 defer k.listener.Close()
331 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
333 kc.PutB([]byte("foo"))
335 shuff := NewRootSorter(
336 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
340 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
341 (s1 == shuff[1] && s2 == shuff[0]),
346 func (s *StandaloneSuite) TestPutHR(c *C) {
347 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
349 st := StubPutHandler{
356 make(chan string, 5)}
358 arv, _ := arvadosclient.MakeArvadosClient()
359 kc, _ := MakeKeepClient(arv)
362 arv.ApiToken = "abc123"
363 localRoots := make(map[string]string)
364 writableLocalRoots := make(map[string]string)
366 ks := RunSomeFakeKeepServers(st, 5)
368 for i, k := range ks {
369 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
370 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
371 defer k.listener.Close()
374 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
376 reader, writer := io.Pipe()
379 writer.Write([]byte("foo"))
383 kc.PutHR(hash, reader, 3)
385 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
390 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
391 (s1 == shuff[1] && s2 == shuff[0]),
396 func (s *StandaloneSuite) TestPutWithFail(c *C) {
397 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
399 st := StubPutHandler{
406 make(chan string, 4)}
409 make(chan string, 1)}
411 arv, err := arvadosclient.MakeArvadosClient()
413 kc, _ := MakeKeepClient(arv)
416 arv.ApiToken = "abc123"
417 localRoots := make(map[string]string)
418 writableLocalRoots := make(map[string]string)
420 ks1 := RunSomeFakeKeepServers(st, 4)
421 ks2 := RunSomeFakeKeepServers(fh, 1)
423 for i, k := range ks1 {
424 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
425 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
426 defer k.listener.Close()
428 for i, k := range ks2 {
429 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
430 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
431 defer k.listener.Close()
434 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
436 shuff := NewRootSorter(
437 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
440 phash, replicas, err := kc.PutB([]byte("foo"))
444 c.Check(err, Equals, nil)
445 c.Check(phash, Equals, "")
446 c.Check(replicas, Equals, 2)
451 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
452 (s1 == shuff[2] && s2 == shuff[1]),
457 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
458 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
460 st := StubPutHandler{
467 make(chan string, 1)}
470 make(chan string, 4)}
472 arv, err := arvadosclient.MakeArvadosClient()
474 kc, _ := MakeKeepClient(arv)
478 arv.ApiToken = "abc123"
479 localRoots := make(map[string]string)
480 writableLocalRoots := make(map[string]string)
482 ks1 := RunSomeFakeKeepServers(st, 1)
483 ks2 := RunSomeFakeKeepServers(fh, 4)
485 for i, k := range ks1 {
486 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
487 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
488 defer k.listener.Close()
490 for i, k := range ks2 {
491 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
492 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
493 defer k.listener.Close()
496 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
498 _, replicas, err := kc.PutB([]byte("foo"))
500 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
501 c.Check(replicas, Equals, 1)
502 c.Check(<-st.handled, Equals, ks1[0].url)
505 type StubGetHandler struct {
508 expectAPIToken string
513 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
514 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
515 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
516 resp.WriteHeader(sgh.httpStatus)
517 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
521 func (s *StandaloneSuite) TestGet(c *C) {
522 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
524 st := StubGetHandler{
531 ks := RunFakeKeepServer(st)
532 defer ks.listener.Close()
534 arv, err := arvadosclient.MakeArvadosClient()
536 kc, _ := MakeKeepClient(arv)
537 arv.ApiToken = "abc123"
538 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
540 r, n, url2, err := kc.Get(hash)
542 c.Check(err, Equals, nil)
543 c.Check(n, Equals, int64(3))
544 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
546 content, err2 := ioutil.ReadAll(r)
547 c.Check(err2, Equals, nil)
548 c.Check(content, DeepEquals, []byte("foo"))
551 func (s *StandaloneSuite) TestGet404(c *C) {
552 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
554 st := Error404Handler{make(chan string, 1)}
556 ks := RunFakeKeepServer(st)
557 defer ks.listener.Close()
559 arv, err := arvadosclient.MakeArvadosClient()
561 kc, _ := MakeKeepClient(arv)
562 arv.ApiToken = "abc123"
563 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
565 r, n, url2, err := kc.Get(hash)
566 c.Check(err, Equals, BlockNotFound)
567 c.Check(n, Equals, int64(0))
568 c.Check(url2, Equals, "")
569 c.Check(r, Equals, nil)
572 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
573 st := Error404Handler{make(chan string, 1)}
575 ks := RunFakeKeepServer(st)
576 defer ks.listener.Close()
578 arv, err := arvadosclient.MakeArvadosClient()
580 kc, _ := MakeKeepClient(arv)
581 arv.ApiToken = "abc123"
582 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
584 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
586 c.Check(n, Equals, int64(0))
587 c.Check(url2, Equals, "")
589 buf, err := ioutil.ReadAll(r)
591 c.Check(buf, DeepEquals, []byte{})
594 func (s *StandaloneSuite) TestGetFail(c *C) {
595 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
597 st := FailHandler{make(chan string, 1)}
599 ks := RunFakeKeepServer(st)
600 defer ks.listener.Close()
602 arv, err := arvadosclient.MakeArvadosClient()
604 kc, _ := MakeKeepClient(arv)
605 arv.ApiToken = "abc123"
606 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
609 r, n, url2, err := kc.Get(hash)
610 errNotFound, _ := err.(*ErrNotFound)
611 c.Check(errNotFound, NotNil)
612 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
613 c.Check(errNotFound.Temporary(), Equals, true)
614 c.Check(n, Equals, int64(0))
615 c.Check(url2, Equals, "")
616 c.Check(r, Equals, nil)
619 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
620 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
622 st := &FailThenSucceedHandler{
623 handled: make(chan string, 1),
624 successhandler: StubGetHandler{
631 ks := RunFakeKeepServer(st)
632 defer ks.listener.Close()
634 arv, err := arvadosclient.MakeArvadosClient()
636 kc, _ := MakeKeepClient(arv)
637 arv.ApiToken = "abc123"
638 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
640 r, n, url2, err := kc.Get(hash)
642 c.Check(err, Equals, nil)
643 c.Check(n, Equals, int64(3))
644 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
646 content, err2 := ioutil.ReadAll(r)
647 c.Check(err2, Equals, nil)
648 c.Check(content, DeepEquals, []byte("foo"))
650 c.Logf("%q", st.reqIDs)
651 c.Assert(len(st.reqIDs) > 1, Equals, true)
652 for _, reqid := range st.reqIDs {
653 c.Check(reqid, Not(Equals), "")
654 c.Check(reqid, Equals, st.reqIDs[0])
658 func (s *StandaloneSuite) TestGetNetError(c *C) {
659 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
661 arv, err := arvadosclient.MakeArvadosClient()
663 kc, _ := MakeKeepClient(arv)
664 arv.ApiToken = "abc123"
665 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
667 r, n, url2, err := kc.Get(hash)
668 errNotFound, _ := err.(*ErrNotFound)
669 c.Check(errNotFound, NotNil)
670 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
671 c.Check(errNotFound.Temporary(), Equals, true)
672 c.Check(n, Equals, int64(0))
673 c.Check(url2, Equals, "")
674 c.Check(r, Equals, nil)
677 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
678 uuid := "zzzzz-bi6l4-123451234512345"
679 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
681 // This one shouldn't be used:
682 ks0 := RunFakeKeepServer(StubGetHandler{
688 defer ks0.listener.Close()
689 // This one should be used:
690 ks := RunFakeKeepServer(StubGetHandler{
696 defer ks.listener.Close()
698 arv, err := arvadosclient.MakeArvadosClient()
700 kc, _ := MakeKeepClient(arv)
701 arv.ApiToken = "abc123"
703 map[string]string{"x": ks0.url},
705 map[string]string{uuid: ks.url})
707 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
709 c.Check(err, Equals, nil)
710 c.Check(n, Equals, int64(3))
711 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
713 content, err := ioutil.ReadAll(r)
714 c.Check(err, Equals, nil)
715 c.Check(content, DeepEquals, []byte("foo"))
718 // Use a service hint to fetch from a local disk service, overriding
719 // rendezvous probe order.
720 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
721 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
722 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
724 // This one shouldn't be used, although it appears first in
725 // rendezvous probe order:
726 ks0 := RunFakeKeepServer(StubGetHandler{
732 defer ks0.listener.Close()
733 // This one should be used:
734 ks := RunFakeKeepServer(StubGetHandler{
740 defer ks.listener.Close()
742 arv, err := arvadosclient.MakeArvadosClient()
744 kc, _ := MakeKeepClient(arv)
745 arv.ApiToken = "abc123"
748 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
749 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
750 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
754 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
755 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
756 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
760 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
762 c.Check(err, Equals, nil)
763 c.Check(n, Equals, int64(3))
764 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
766 content, err := ioutil.ReadAll(r)
767 c.Check(err, Equals, nil)
768 c.Check(content, DeepEquals, []byte("foo"))
771 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
772 uuid := "zzzzz-bi6l4-123451234512345"
773 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
775 ksLocal := RunFakeKeepServer(StubGetHandler{
781 defer ksLocal.listener.Close()
782 ksGateway := RunFakeKeepServer(StubGetHandler{
786 http.StatusInternalServerError,
788 defer ksGateway.listener.Close()
790 arv, err := arvadosclient.MakeArvadosClient()
792 kc, _ := MakeKeepClient(arv)
793 arv.ApiToken = "abc123"
795 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
797 map[string]string{uuid: ksGateway.url})
799 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
800 c.Assert(err, Equals, nil)
802 c.Check(n, Equals, int64(3))
803 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
805 content, err := ioutil.ReadAll(r)
806 c.Check(err, Equals, nil)
807 c.Check(content, DeepEquals, []byte("foo"))
810 type BarHandler struct {
814 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
815 resp.Write([]byte("bar"))
816 h.handled <- fmt.Sprintf("http://%s", req.Host)
819 func (s *StandaloneSuite) TestChecksum(c *C) {
820 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
821 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
823 st := BarHandler{make(chan string, 1)}
825 ks := RunFakeKeepServer(st)
826 defer ks.listener.Close()
828 arv, err := arvadosclient.MakeArvadosClient()
830 kc, _ := MakeKeepClient(arv)
831 arv.ApiToken = "abc123"
832 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
834 r, n, _, err := kc.Get(barhash)
836 _, err = ioutil.ReadAll(r)
837 c.Check(n, Equals, int64(3))
838 c.Check(err, Equals, nil)
842 r, n, _, err = kc.Get(foohash)
844 _, err = ioutil.ReadAll(r)
845 c.Check(n, Equals, int64(3))
846 c.Check(err, Equals, BadChecksum)
851 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
852 content := []byte("waz")
853 hash := fmt.Sprintf("%x", md5.Sum(content))
855 fh := Error404Handler{
856 make(chan string, 4)}
858 st := StubGetHandler{
865 arv, err := arvadosclient.MakeArvadosClient()
867 kc, _ := MakeKeepClient(arv)
868 arv.ApiToken = "abc123"
869 localRoots := make(map[string]string)
870 writableLocalRoots := make(map[string]string)
872 ks1 := RunSomeFakeKeepServers(st, 1)
873 ks2 := RunSomeFakeKeepServers(fh, 4)
875 for i, k := range ks1 {
876 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
877 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
878 defer k.listener.Close()
880 for i, k := range ks2 {
881 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
882 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
883 defer k.listener.Close()
886 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
889 // This test works only if one of the failing services is
890 // attempted before the succeeding service. Otherwise,
891 // <-fh.handled below will just hang! (Probe order depends on
892 // the choice of block content "waz" and the UUIDs of the fake
893 // servers, so we just tried different strings until we found
894 // an example that passes this Assert.)
895 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
897 r, n, url2, err := kc.Get(hash)
900 c.Check(err, Equals, nil)
901 c.Check(n, Equals, int64(3))
902 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
904 readContent, err2 := ioutil.ReadAll(r)
905 c.Check(err2, Equals, nil)
906 c.Check(readContent, DeepEquals, content)
909 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
910 content := []byte("TestPutGetHead")
912 arv, err := arvadosclient.MakeArvadosClient()
914 kc, err := MakeKeepClient(arv)
915 c.Assert(err, Equals, nil)
917 hash := fmt.Sprintf("%x", md5.Sum(content))
920 n, _, err := kc.Ask(hash)
921 c.Check(err, Equals, BlockNotFound)
922 c.Check(n, Equals, int64(0))
925 hash2, replicas, err := kc.PutB(content)
926 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
927 c.Check(replicas, Equals, 2)
928 c.Check(err, Equals, nil)
931 r, n, url2, err := kc.Get(hash)
932 c.Check(err, Equals, nil)
933 c.Check(n, Equals, int64(len(content)))
934 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
936 readContent, err2 := ioutil.ReadAll(r)
937 c.Check(err2, Equals, nil)
938 c.Check(readContent, DeepEquals, content)
941 n, url2, err := kc.Ask(hash)
942 c.Check(err, Equals, nil)
943 c.Check(n, Equals, int64(len(content)))
944 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
947 loc, err := kc.LocalLocator(hash)
948 c.Check(err, Equals, nil)
949 c.Assert(len(loc) >= 32, Equals, true)
950 c.Check(loc[:32], Equals, hash[:32])
953 content := []byte("the perth county conspiracy")
954 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
955 c.Check(loc, Equals, "")
956 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
957 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
961 type StubProxyHandler struct {
965 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
966 resp.Header().Set("X-Keep-Replicas-Stored", "2")
967 h.handled <- fmt.Sprintf("http://%s", req.Host)
970 func (s *StandaloneSuite) TestPutProxy(c *C) {
971 st := StubProxyHandler{make(chan string, 1)}
973 arv, err := arvadosclient.MakeArvadosClient()
975 kc, _ := MakeKeepClient(arv)
978 arv.ApiToken = "abc123"
979 localRoots := make(map[string]string)
980 writableLocalRoots := make(map[string]string)
982 ks1 := RunSomeFakeKeepServers(st, 1)
984 for i, k := range ks1 {
985 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
986 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
987 defer k.listener.Close()
990 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
992 _, replicas, err := kc.PutB([]byte("foo"))
995 c.Check(err, Equals, nil)
996 c.Check(replicas, Equals, 2)
999 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1000 st := StubProxyHandler{make(chan string, 1)}
1002 arv, err := arvadosclient.MakeArvadosClient()
1004 kc, _ := MakeKeepClient(arv)
1006 kc.Want_replicas = 3
1007 arv.ApiToken = "abc123"
1008 localRoots := make(map[string]string)
1009 writableLocalRoots := make(map[string]string)
1011 ks1 := RunSomeFakeKeepServers(st, 1)
1013 for i, k := range ks1 {
1014 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1015 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1016 defer k.listener.Close()
1018 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1020 _, replicas, err := kc.PutB([]byte("foo"))
1023 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1024 c.Check(replicas, Equals, 2)
1027 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1028 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1029 c.Check(err, Equals, nil)
1030 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1031 c.Check(l.Size, Equals, 3)
1032 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1035 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1036 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1037 c.Check(err, Equals, nil)
1038 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1039 c.Check(l.Size, Equals, -1)
1040 c.Check(l.Hints, DeepEquals, []string{})
1043 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1044 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1045 c.Check(err, Equals, nil)
1046 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1047 c.Check(l.Size, Equals, -1)
1048 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1051 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1052 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1053 l, err := MakeLocator(str)
1054 c.Check(err, Equals, nil)
1055 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1056 c.Check(l.Size, Equals, 3)
1057 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1058 c.Check(l.String(), Equals, str)
1061 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1062 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1063 c.Check(err, Equals, InvalidLocatorError)
1066 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1067 hash := Md5String("foo")
1069 st := StubPutHandler{
1076 make(chan string, 5)}
1078 arv, _ := arvadosclient.MakeArvadosClient()
1079 kc, _ := MakeKeepClient(arv)
1081 kc.Want_replicas = 2
1082 arv.ApiToken = "abc123"
1083 localRoots := make(map[string]string)
1084 writableLocalRoots := make(map[string]string)
1086 ks := RunSomeFakeKeepServers(st, 5)
1088 for i, k := range ks {
1089 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1091 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1093 defer k.listener.Close()
1096 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1098 _, replicas, err := kc.PutB([]byte("foo"))
1100 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1101 c.Check(replicas, Equals, 1)
1103 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1106 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1107 hash := Md5String("foo")
1109 st := StubPutHandler{
1116 make(chan string, 5)}
1118 arv, _ := arvadosclient.MakeArvadosClient()
1119 kc, _ := MakeKeepClient(arv)
1121 kc.Want_replicas = 2
1122 arv.ApiToken = "abc123"
1123 localRoots := make(map[string]string)
1124 writableLocalRoots := make(map[string]string)
1126 ks := RunSomeFakeKeepServers(st, 5)
1128 for i, k := range ks {
1129 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1130 defer k.listener.Close()
1133 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1135 _, replicas, err := kc.PutB([]byte("foo"))
1137 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1138 c.Check(replicas, Equals, 0)
1141 type StubGetIndexHandler struct {
1144 expectAPIToken string
1149 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1150 h.c.Check(req.URL.Path, Equals, h.expectPath)
1151 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1152 resp.WriteHeader(h.httpStatus)
1153 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1157 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1158 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1160 st := StubGetIndexHandler{
1165 []byte(hash + "+3 1443559274\n\n")}
1167 ks := RunFakeKeepServer(st)
1168 defer ks.listener.Close()
1170 arv, err := arvadosclient.MakeArvadosClient()
1171 c.Assert(err, IsNil)
1172 kc, err := MakeKeepClient(arv)
1173 c.Assert(err, IsNil)
1174 arv.ApiToken = "abc123"
1175 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1177 r, err := kc.GetIndex("x", "")
1180 content, err2 := ioutil.ReadAll(r)
1181 c.Check(err2, Equals, nil)
1182 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1185 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1186 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1188 st := StubGetIndexHandler{
1190 "/index/" + hash[0:3],
1193 []byte(hash + "+3 1443559274\n\n")}
1195 ks := RunFakeKeepServer(st)
1196 defer ks.listener.Close()
1198 arv, err := arvadosclient.MakeArvadosClient()
1200 kc, _ := MakeKeepClient(arv)
1201 arv.ApiToken = "abc123"
1202 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1204 r, err := kc.GetIndex("x", hash[0:3])
1205 c.Assert(err, Equals, nil)
1207 content, err2 := ioutil.ReadAll(r)
1208 c.Check(err2, Equals, nil)
1209 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1212 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1213 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1215 st := StubGetIndexHandler{
1217 "/index/" + hash[0:3],
1222 ks := RunFakeKeepServer(st)
1223 defer ks.listener.Close()
1225 arv, err := arvadosclient.MakeArvadosClient()
1227 kc, _ := MakeKeepClient(arv)
1228 arv.ApiToken = "abc123"
1229 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1231 _, err = kc.GetIndex("x", hash[0:3])
1232 c.Check(err, Equals, ErrIncompleteIndex)
1235 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1236 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1238 st := StubGetIndexHandler{
1240 "/index/" + hash[0:3],
1245 ks := RunFakeKeepServer(st)
1246 defer ks.listener.Close()
1248 arv, err := arvadosclient.MakeArvadosClient()
1250 kc, _ := MakeKeepClient(arv)
1251 arv.ApiToken = "abc123"
1252 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1254 _, err = kc.GetIndex("y", hash[0:3])
1255 c.Check(err, Equals, ErrNoSuchKeepServer)
1258 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1259 st := StubGetIndexHandler{
1266 ks := RunFakeKeepServer(st)
1267 defer ks.listener.Close()
1269 arv, err := arvadosclient.MakeArvadosClient()
1271 kc, _ := MakeKeepClient(arv)
1272 arv.ApiToken = "abc123"
1273 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1275 r, err := kc.GetIndex("x", "abcd")
1276 c.Check(err, Equals, nil)
1278 content, err2 := ioutil.ReadAll(r)
1279 c.Check(err2, Equals, nil)
1280 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1283 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1284 st := &FailThenSucceedHandler{
1285 handled: make(chan string, 1),
1286 successhandler: StubPutHandler{
1293 make(chan string, 5)}}
1295 arv, _ := arvadosclient.MakeArvadosClient()
1296 kc, _ := MakeKeepClient(arv)
1298 kc.Want_replicas = 2
1299 arv.ApiToken = "abc123"
1300 localRoots := make(map[string]string)
1301 writableLocalRoots := make(map[string]string)
1303 ks := RunSomeFakeKeepServers(st, 2)
1305 for i, k := range ks {
1306 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1307 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1308 defer k.listener.Close()
1311 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1313 hash, replicas, err := kc.PutB([]byte("foo"))
1315 c.Check(err, Equals, nil)
1316 c.Check(hash, Equals, "")
1317 c.Check(replicas, Equals, 2)
1320 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1321 arv, err := arvadosclient.MakeArvadosClient()
1322 c.Assert(err, Equals, nil)
1324 // Add an additional "testblobstore" keepservice
1325 blobKeepService := make(arvadosclient.Dict)
1326 err = arv.Create("keep_services",
1327 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1328 "service_host": "localhost",
1329 "service_port": "21321",
1330 "service_type": "testblobstore"}},
1332 c.Assert(err, Equals, nil)
1333 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1334 RefreshServiceDiscovery()
1336 // Make a keepclient and ensure that the testblobstore is included
1337 kc, err := MakeKeepClient(arv)
1338 c.Assert(err, Equals, nil)
1340 // verify kc.LocalRoots
1341 c.Check(len(kc.LocalRoots()), Equals, 3)
1342 for _, root := range kc.LocalRoots() {
1343 c.Check(root, Matches, "http://localhost:\\d+")
1345 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1347 // verify kc.GatewayRoots
1348 c.Check(len(kc.GatewayRoots()), Equals, 3)
1349 for _, root := range kc.GatewayRoots() {
1350 c.Check(root, Matches, "http://localhost:\\d+")
1352 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1354 // verify kc.WritableLocalRoots
1355 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1356 for _, root := range kc.WritableLocalRoots() {
1357 c.Check(root, Matches, "http://localhost:\\d+")
1359 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1361 c.Assert(kc.replicasPerService, Equals, 0)
1362 c.Assert(kc.foundNonDiskSvc, Equals, true)
1363 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)