1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
23 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
24 "git.arvados.org/arvados.git/sdk/go/arvadostest"
26 check "gopkg.in/check.v1"
29 // Gocheck boilerplate
30 func Test(t *testing.T) {
34 // Gocheck boilerplate
35 var _ = Suite(&ServerRequiredSuite{})
36 var _ = Suite(&StandaloneSuite{})
38 // Tests that require the Keep server running
39 type ServerRequiredSuite struct{}
42 type StandaloneSuite struct{}
44 func (s *StandaloneSuite) SetUpTest(c *C) {
45 RefreshServiceDiscovery()
48 func pythonDir() string {
50 return fmt.Sprintf("%s/../../python/tests", cwd)
53 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
54 arvadostest.StartAPI()
55 arvadostest.StartKeep(2, false)
58 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
59 arvadostest.StopKeep(2)
63 func (s *ServerRequiredSuite) SetUpTest(c *C) {
64 RefreshServiceDiscovery()
67 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
68 arv, err := arvadosclient.MakeArvadosClient()
69 c.Assert(err, Equals, nil)
71 kc, err := MakeKeepClient(arv)
73 c.Assert(err, Equals, nil)
74 c.Check(len(kc.LocalRoots()), Equals, 2)
75 for _, root := range kc.LocalRoots() {
76 c.Check(root, Matches, "http://localhost:\\d+")
80 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
81 arv, err := arvadosclient.MakeArvadosClient()
82 c.Assert(err, Equals, nil)
84 kc, err := MakeKeepClient(arv)
86 c.Assert(kc.Want_replicas, Equals, 2)
88 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
89 kc, err = MakeKeepClient(arv)
91 c.Assert(kc.Want_replicas, Equals, 3)
93 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
94 kc, err = MakeKeepClient(arv)
96 c.Assert(kc.Want_replicas, Equals, 1)
99 type StubPutHandler struct {
102 expectAPIToken string
104 expectStorageClass string
105 returnStorageClasses string
107 requests []*http.Request
111 func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
113 sph.requests = append(sph.requests, req)
115 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
116 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
117 if sph.expectStorageClass != "*" {
118 sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
120 body, err := ioutil.ReadAll(req.Body)
121 sph.c.Check(err, Equals, nil)
122 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
123 resp.Header().Set("X-Keep-Replicas-Stored", "1")
124 if sph.returnStorageClasses != "" {
125 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
127 resp.WriteHeader(200)
128 sph.handled <- fmt.Sprintf("http://%s", req.Host)
131 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
133 // If we don't explicitly bind it to localhost, ks.listener.Addr() will
134 // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
135 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
137 panic(fmt.Sprintf("Could not listen on any port"))
139 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
140 go http.Serve(ks.listener, st)
144 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
145 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
147 ks := RunFakeKeepServer(st)
148 defer ks.listener.Close()
150 arv, _ := arvadosclient.MakeArvadosClient()
151 arv.ApiToken = "abc123"
153 kc, _ := MakeKeepClient(arv)
155 reader, writer := io.Pipe()
156 uploadStatusChan := make(chan uploadStatus)
158 f(kc, ks.url, reader, writer, uploadStatusChan)
161 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
162 log.Printf("TestUploadToStubKeepServer")
164 st := &StubPutHandler{
166 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
167 expectAPIToken: "abc123",
169 expectStorageClass: "",
170 returnStorageClasses: "default=1",
171 handled: make(chan string),
174 UploadToStubHelper(c, st,
175 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
176 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
178 writer.Write([]byte("foo"))
182 status := <-uploadStatusChan
183 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
187 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
188 st := &StubPutHandler{
190 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
191 expectAPIToken: "abc123",
193 expectStorageClass: "",
194 returnStorageClasses: "default=1",
195 handled: make(chan string),
198 UploadToStubHelper(c, st,
199 func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
200 go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
204 status := <-uploadStatusChan
205 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
209 func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
210 for _, trial := range []struct {
212 expectMap map[string]int
215 {"foo=1", map[string]int{"foo": 1}},
216 {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
220 st := &StubPutHandler{
222 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
223 expectAPIToken: "abc123",
225 expectStorageClass: "",
226 returnStorageClasses: trial.respHeader,
227 handled: make(chan string),
230 UploadToStubHelper(c, st,
231 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
232 go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
234 writer.Write([]byte("foo"))
238 status := <-uploadStatusChan
239 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
244 func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
246 for _, trial := range []struct {
253 {1, []string{"class1"}, 1, 1, true},
254 {2, []string{"class1"}, 1, 2, true},
255 {3, []string{"class1"}, 2, 3, true},
256 {1, []string{"class1", "class2"}, 1, 1, true},
257 {nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
258 {1, []string{"class404"}, nServers, nServers, false},
259 {1, []string{"class1", "class404"}, nServers, nServers, false},
262 st := &StubPutHandler{
264 expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
265 expectAPIToken: "abc123",
267 expectStorageClass: "*",
268 returnStorageClasses: "class1=2, class2=2",
269 handled: make(chan string, 100),
271 ks := RunSomeFakeKeepServers(st, nServers)
272 arv, _ := arvadosclient.MakeArvadosClient()
273 kc, _ := MakeKeepClient(arv)
274 kc.Want_replicas = trial.replicas
275 kc.StorageClasses = trial.classes
276 arv.ApiToken = "abc123"
277 localRoots := make(map[string]string)
278 writableLocalRoots := make(map[string]string)
279 for i, k := range ks {
280 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
281 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
282 defer k.listener.Close()
284 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
286 _, _, err := kc.PutB([]byte("foo"))
288 c.Check(err, check.IsNil)
290 c.Check(err, check.NotNil)
292 c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
293 c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
294 if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
295 // Max concurrency should be 1. First request
296 // should have succeeded for class1. Second
297 // request should only ask for class404.
298 c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
303 type FailHandler struct {
307 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
308 resp.WriteHeader(500)
309 fh.handled <- fmt.Sprintf("http://%s", req.Host)
312 type FailThenSucceedHandler struct {
315 successhandler http.Handler
319 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
320 fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
322 resp.WriteHeader(500)
324 fh.handled <- fmt.Sprintf("http://%s", req.Host)
326 fh.successhandler.ServeHTTP(resp, req)
330 type Error404Handler struct {
334 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
335 resp.WriteHeader(404)
336 fh.handled <- fmt.Sprintf("http://%s", req.Host)
339 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
343 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
345 UploadToStubHelper(c, st,
346 func(kc *KeepClient, url string, reader io.ReadCloser,
347 writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
349 go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
351 writer.Write([]byte("foo"))
356 status := <-uploadStatusChan
357 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
358 c.Check(status.statusCode, Equals, 500)
362 type KeepServer struct {
363 listener net.Listener
367 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
368 ks = make([]KeepServer, n)
370 for i := 0; i < n; i++ {
371 ks[i] = RunFakeKeepServer(st)
377 func (s *StandaloneSuite) TestPutB(c *C) {
378 hash := Md5String("foo")
380 st := &StubPutHandler{
383 expectAPIToken: "abc123",
385 expectStorageClass: "",
386 returnStorageClasses: "",
387 handled: make(chan string, 5),
390 arv, _ := arvadosclient.MakeArvadosClient()
391 kc, _ := MakeKeepClient(arv)
394 arv.ApiToken = "abc123"
395 localRoots := make(map[string]string)
396 writableLocalRoots := make(map[string]string)
398 ks := RunSomeFakeKeepServers(st, 5)
400 for i, k := range ks {
401 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
402 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
403 defer k.listener.Close()
406 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
408 kc.PutB([]byte("foo"))
410 shuff := NewRootSorter(
411 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
415 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
416 (s1 == shuff[1] && s2 == shuff[0]),
421 func (s *StandaloneSuite) TestPutHR(c *C) {
422 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
424 st := &StubPutHandler{
427 expectAPIToken: "abc123",
429 expectStorageClass: "",
430 returnStorageClasses: "",
431 handled: make(chan string, 5),
434 arv, _ := arvadosclient.MakeArvadosClient()
435 kc, _ := MakeKeepClient(arv)
438 arv.ApiToken = "abc123"
439 localRoots := make(map[string]string)
440 writableLocalRoots := make(map[string]string)
442 ks := RunSomeFakeKeepServers(st, 5)
444 for i, k := range ks {
445 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
446 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
447 defer k.listener.Close()
450 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
452 reader, writer := io.Pipe()
455 writer.Write([]byte("foo"))
459 kc.PutHR(hash, reader, 3)
461 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
466 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
467 (s1 == shuff[1] && s2 == shuff[0]),
472 func (s *StandaloneSuite) TestPutWithFail(c *C) {
473 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
475 st := &StubPutHandler{
478 expectAPIToken: "abc123",
480 expectStorageClass: "",
481 returnStorageClasses: "",
482 handled: make(chan string, 4),
486 make(chan string, 1)}
488 arv, err := arvadosclient.MakeArvadosClient()
490 kc, _ := MakeKeepClient(arv)
493 arv.ApiToken = "abc123"
494 localRoots := make(map[string]string)
495 writableLocalRoots := make(map[string]string)
497 ks1 := RunSomeFakeKeepServers(st, 4)
498 ks2 := RunSomeFakeKeepServers(fh, 1)
500 for i, k := range ks1 {
501 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
502 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
503 defer k.listener.Close()
505 for i, k := range ks2 {
506 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
507 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
508 defer k.listener.Close()
511 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
513 shuff := NewRootSorter(
514 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
517 phash, replicas, err := kc.PutB([]byte("foo"))
521 c.Check(err, Equals, nil)
522 c.Check(phash, Equals, "")
523 c.Check(replicas, Equals, 2)
528 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
529 (s1 == shuff[2] && s2 == shuff[1]),
534 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
535 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
537 st := &StubPutHandler{
540 expectAPIToken: "abc123",
542 expectStorageClass: "",
543 returnStorageClasses: "",
544 handled: make(chan string, 1),
548 make(chan string, 4)}
550 arv, err := arvadosclient.MakeArvadosClient()
552 kc, _ := MakeKeepClient(arv)
556 arv.ApiToken = "abc123"
557 localRoots := make(map[string]string)
558 writableLocalRoots := make(map[string]string)
560 ks1 := RunSomeFakeKeepServers(st, 1)
561 ks2 := RunSomeFakeKeepServers(fh, 4)
563 for i, k := range ks1 {
564 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
565 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
566 defer k.listener.Close()
568 for i, k := range ks2 {
569 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
570 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
571 defer k.listener.Close()
574 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
576 _, replicas, err := kc.PutB([]byte("foo"))
578 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
579 c.Check(replicas, Equals, 1)
580 c.Check(<-st.handled, Equals, ks1[0].url)
583 type StubGetHandler struct {
586 expectAPIToken string
591 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
592 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
593 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
594 resp.WriteHeader(sgh.httpStatus)
595 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
599 func (s *StandaloneSuite) TestGet(c *C) {
600 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
602 st := StubGetHandler{
609 ks := RunFakeKeepServer(st)
610 defer ks.listener.Close()
612 arv, err := arvadosclient.MakeArvadosClient()
614 kc, _ := MakeKeepClient(arv)
615 arv.ApiToken = "abc123"
616 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
618 r, n, url2, err := kc.Get(hash)
620 c.Check(err, Equals, nil)
621 c.Check(n, Equals, int64(3))
622 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
624 content, err2 := ioutil.ReadAll(r)
625 c.Check(err2, Equals, nil)
626 c.Check(content, DeepEquals, []byte("foo"))
629 func (s *StandaloneSuite) TestGet404(c *C) {
630 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
632 st := Error404Handler{make(chan string, 1)}
634 ks := RunFakeKeepServer(st)
635 defer ks.listener.Close()
637 arv, err := arvadosclient.MakeArvadosClient()
639 kc, _ := MakeKeepClient(arv)
640 arv.ApiToken = "abc123"
641 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
643 r, n, url2, err := kc.Get(hash)
644 c.Check(err, Equals, BlockNotFound)
645 c.Check(n, Equals, int64(0))
646 c.Check(url2, Equals, "")
647 c.Check(r, Equals, nil)
650 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
651 st := Error404Handler{make(chan string, 1)}
653 ks := RunFakeKeepServer(st)
654 defer ks.listener.Close()
656 arv, err := arvadosclient.MakeArvadosClient()
658 kc, _ := MakeKeepClient(arv)
659 arv.ApiToken = "abc123"
660 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
662 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
664 c.Check(n, Equals, int64(0))
665 c.Check(url2, Equals, "")
667 buf, err := ioutil.ReadAll(r)
669 c.Check(buf, DeepEquals, []byte{})
672 func (s *StandaloneSuite) TestGetFail(c *C) {
673 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
675 st := FailHandler{make(chan string, 1)}
677 ks := RunFakeKeepServer(st)
678 defer ks.listener.Close()
680 arv, err := arvadosclient.MakeArvadosClient()
682 kc, _ := MakeKeepClient(arv)
683 arv.ApiToken = "abc123"
684 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
687 r, n, url2, err := kc.Get(hash)
688 errNotFound, _ := err.(*ErrNotFound)
689 c.Check(errNotFound, NotNil)
690 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
691 c.Check(errNotFound.Temporary(), Equals, true)
692 c.Check(n, Equals, int64(0))
693 c.Check(url2, Equals, "")
694 c.Check(r, Equals, nil)
697 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
698 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
700 st := &FailThenSucceedHandler{
701 handled: make(chan string, 1),
702 successhandler: StubGetHandler{
709 ks := RunFakeKeepServer(st)
710 defer ks.listener.Close()
712 arv, err := arvadosclient.MakeArvadosClient()
714 kc, _ := MakeKeepClient(arv)
715 arv.ApiToken = "abc123"
716 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
718 r, n, url2, err := kc.Get(hash)
720 c.Check(err, Equals, nil)
721 c.Check(n, Equals, int64(3))
722 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
724 content, err2 := ioutil.ReadAll(r)
725 c.Check(err2, Equals, nil)
726 c.Check(content, DeepEquals, []byte("foo"))
728 c.Logf("%q", st.reqIDs)
729 c.Assert(len(st.reqIDs) > 1, Equals, true)
730 for _, reqid := range st.reqIDs {
731 c.Check(reqid, Not(Equals), "")
732 c.Check(reqid, Equals, st.reqIDs[0])
736 func (s *StandaloneSuite) TestGetNetError(c *C) {
737 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
739 arv, err := arvadosclient.MakeArvadosClient()
741 kc, _ := MakeKeepClient(arv)
742 arv.ApiToken = "abc123"
743 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
745 r, n, url2, err := kc.Get(hash)
746 errNotFound, _ := err.(*ErrNotFound)
747 c.Check(errNotFound, NotNil)
748 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
749 c.Check(errNotFound.Temporary(), Equals, true)
750 c.Check(n, Equals, int64(0))
751 c.Check(url2, Equals, "")
752 c.Check(r, Equals, nil)
755 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
756 uuid := "zzzzz-bi6l4-123451234512345"
757 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
759 // This one shouldn't be used:
760 ks0 := RunFakeKeepServer(StubGetHandler{
766 defer ks0.listener.Close()
767 // This one should be used:
768 ks := RunFakeKeepServer(StubGetHandler{
774 defer ks.listener.Close()
776 arv, err := arvadosclient.MakeArvadosClient()
778 kc, _ := MakeKeepClient(arv)
779 arv.ApiToken = "abc123"
781 map[string]string{"x": ks0.url},
783 map[string]string{uuid: ks.url})
785 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
787 c.Check(err, Equals, nil)
788 c.Check(n, Equals, int64(3))
789 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
791 content, err := ioutil.ReadAll(r)
792 c.Check(err, Equals, nil)
793 c.Check(content, DeepEquals, []byte("foo"))
796 // Use a service hint to fetch from a local disk service, overriding
797 // rendezvous probe order.
798 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
799 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
800 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
802 // This one shouldn't be used, although it appears first in
803 // rendezvous probe order:
804 ks0 := RunFakeKeepServer(StubGetHandler{
810 defer ks0.listener.Close()
811 // This one should be used:
812 ks := RunFakeKeepServer(StubGetHandler{
818 defer ks.listener.Close()
820 arv, err := arvadosclient.MakeArvadosClient()
822 kc, _ := MakeKeepClient(arv)
823 arv.ApiToken = "abc123"
826 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
827 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
828 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
832 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
833 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
834 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
838 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
840 c.Check(err, Equals, nil)
841 c.Check(n, Equals, int64(3))
842 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
844 content, err := ioutil.ReadAll(r)
845 c.Check(err, Equals, nil)
846 c.Check(content, DeepEquals, []byte("foo"))
849 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
850 uuid := "zzzzz-bi6l4-123451234512345"
851 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
853 ksLocal := RunFakeKeepServer(StubGetHandler{
859 defer ksLocal.listener.Close()
860 ksGateway := RunFakeKeepServer(StubGetHandler{
864 http.StatusInternalServerError,
866 defer ksGateway.listener.Close()
868 arv, err := arvadosclient.MakeArvadosClient()
870 kc, _ := MakeKeepClient(arv)
871 arv.ApiToken = "abc123"
873 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
875 map[string]string{uuid: ksGateway.url})
877 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
878 c.Assert(err, Equals, nil)
880 c.Check(n, Equals, int64(3))
881 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
883 content, err := ioutil.ReadAll(r)
884 c.Check(err, Equals, nil)
885 c.Check(content, DeepEquals, []byte("foo"))
888 type BarHandler struct {
892 func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
893 resp.Write([]byte("bar"))
894 h.handled <- fmt.Sprintf("http://%s", req.Host)
897 func (s *StandaloneSuite) TestChecksum(c *C) {
898 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
899 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
901 st := BarHandler{make(chan string, 1)}
903 ks := RunFakeKeepServer(st)
904 defer ks.listener.Close()
906 arv, err := arvadosclient.MakeArvadosClient()
908 kc, _ := MakeKeepClient(arv)
909 arv.ApiToken = "abc123"
910 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
912 r, n, _, err := kc.Get(barhash)
914 _, err = ioutil.ReadAll(r)
915 c.Check(n, Equals, int64(3))
916 c.Check(err, Equals, nil)
920 r, n, _, err = kc.Get(foohash)
922 _, err = ioutil.ReadAll(r)
923 c.Check(n, Equals, int64(3))
924 c.Check(err, Equals, BadChecksum)
929 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
930 content := []byte("waz")
931 hash := fmt.Sprintf("%x", md5.Sum(content))
933 fh := Error404Handler{
934 make(chan string, 4)}
936 st := StubGetHandler{
943 arv, err := arvadosclient.MakeArvadosClient()
945 kc, _ := MakeKeepClient(arv)
946 arv.ApiToken = "abc123"
947 localRoots := make(map[string]string)
948 writableLocalRoots := make(map[string]string)
950 ks1 := RunSomeFakeKeepServers(st, 1)
951 ks2 := RunSomeFakeKeepServers(fh, 4)
953 for i, k := range ks1 {
954 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
955 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
956 defer k.listener.Close()
958 for i, k := range ks2 {
959 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
960 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
961 defer k.listener.Close()
964 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
967 // This test works only if one of the failing services is
968 // attempted before the succeeding service. Otherwise,
969 // <-fh.handled below will just hang! (Probe order depends on
970 // the choice of block content "waz" and the UUIDs of the fake
971 // servers, so we just tried different strings until we found
972 // an example that passes this Assert.)
973 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
975 r, n, url2, err := kc.Get(hash)
978 c.Check(err, Equals, nil)
979 c.Check(n, Equals, int64(3))
980 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
982 readContent, err2 := ioutil.ReadAll(r)
983 c.Check(err2, Equals, nil)
984 c.Check(readContent, DeepEquals, content)
987 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
988 content := []byte("TestPutGetHead")
990 arv, err := arvadosclient.MakeArvadosClient()
992 kc, err := MakeKeepClient(arv)
993 c.Assert(err, Equals, nil)
995 hash := fmt.Sprintf("%x", md5.Sum(content))
998 n, _, err := kc.Ask(hash)
999 c.Check(err, Equals, BlockNotFound)
1000 c.Check(n, Equals, int64(0))
1003 hash2, replicas, err := kc.PutB(content)
1004 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
1005 c.Check(replicas, Equals, 2)
1006 c.Check(err, Equals, nil)
1009 r, n, url2, err := kc.Get(hash)
1010 c.Check(err, Equals, nil)
1011 c.Check(n, Equals, int64(len(content)))
1012 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1014 readContent, err2 := ioutil.ReadAll(r)
1015 c.Check(err2, Equals, nil)
1016 c.Check(readContent, DeepEquals, content)
1019 n, url2, err := kc.Ask(hash)
1020 c.Check(err, Equals, nil)
1021 c.Check(n, Equals, int64(len(content)))
1022 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
1025 loc, err := kc.LocalLocator(hash)
1026 c.Check(err, Equals, nil)
1027 c.Assert(len(loc) >= 32, Equals, true)
1028 c.Check(loc[:32], Equals, hash[:32])
1031 content := []byte("the perth county conspiracy")
1032 loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
1033 c.Check(loc, Equals, "")
1034 c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
1035 c.Check(err, ErrorMatches, `.*HTTP 400.*`)
1039 type StubProxyHandler struct {
1043 func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1044 resp.Header().Set("X-Keep-Replicas-Stored", "2")
1045 h.handled <- fmt.Sprintf("http://%s", req.Host)
1048 func (s *StandaloneSuite) TestPutProxy(c *C) {
1049 st := StubProxyHandler{make(chan string, 1)}
1051 arv, err := arvadosclient.MakeArvadosClient()
1053 kc, _ := MakeKeepClient(arv)
1055 kc.Want_replicas = 2
1056 arv.ApiToken = "abc123"
1057 localRoots := make(map[string]string)
1058 writableLocalRoots := make(map[string]string)
1060 ks1 := RunSomeFakeKeepServers(st, 1)
1062 for i, k := range ks1 {
1063 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1064 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1065 defer k.listener.Close()
1068 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1070 _, replicas, err := kc.PutB([]byte("foo"))
1073 c.Check(err, Equals, nil)
1074 c.Check(replicas, Equals, 2)
1077 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
1078 st := StubProxyHandler{make(chan string, 1)}
1080 arv, err := arvadosclient.MakeArvadosClient()
1082 kc, _ := MakeKeepClient(arv)
1084 kc.Want_replicas = 3
1085 arv.ApiToken = "abc123"
1086 localRoots := make(map[string]string)
1087 writableLocalRoots := make(map[string]string)
1089 ks1 := RunSomeFakeKeepServers(st, 1)
1091 for i, k := range ks1 {
1092 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1093 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1094 defer k.listener.Close()
1096 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1098 _, replicas, err := kc.PutB([]byte("foo"))
1101 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1102 c.Check(replicas, Equals, 2)
1105 func (s *StandaloneSuite) TestMakeLocator(c *C) {
1106 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
1107 c.Check(err, Equals, nil)
1108 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1109 c.Check(l.Size, Equals, 3)
1110 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
1113 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
1114 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
1115 c.Check(err, Equals, nil)
1116 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1117 c.Check(l.Size, Equals, -1)
1118 c.Check(l.Hints, DeepEquals, []string{})
1121 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
1122 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
1123 c.Check(err, Equals, nil)
1124 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1125 c.Check(l.Size, Equals, -1)
1126 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
1129 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
1130 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
1131 l, err := MakeLocator(str)
1132 c.Check(err, Equals, nil)
1133 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
1134 c.Check(l.Size, Equals, 3)
1135 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
1136 c.Check(l.String(), Equals, str)
1139 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
1140 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
1141 c.Check(err, Equals, InvalidLocatorError)
1144 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
1145 hash := Md5String("foo")
1147 st := &StubPutHandler{
1150 expectAPIToken: "abc123",
1152 expectStorageClass: "",
1153 returnStorageClasses: "",
1154 handled: make(chan string, 5),
1157 arv, _ := arvadosclient.MakeArvadosClient()
1158 kc, _ := MakeKeepClient(arv)
1160 kc.Want_replicas = 2
1161 arv.ApiToken = "abc123"
1162 localRoots := make(map[string]string)
1163 writableLocalRoots := make(map[string]string)
1165 ks := RunSomeFakeKeepServers(st, 5)
1167 for i, k := range ks {
1168 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1170 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1172 defer k.listener.Close()
1175 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1177 _, replicas, err := kc.PutB([]byte("foo"))
1179 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1180 c.Check(replicas, Equals, 1)
1182 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1185 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1186 hash := Md5String("foo")
1188 st := &StubPutHandler{
1191 expectAPIToken: "abc123",
1193 expectStorageClass: "",
1194 returnStorageClasses: "",
1195 handled: make(chan string, 5),
1198 arv, _ := arvadosclient.MakeArvadosClient()
1199 kc, _ := MakeKeepClient(arv)
1201 kc.Want_replicas = 2
1202 arv.ApiToken = "abc123"
1203 localRoots := make(map[string]string)
1204 writableLocalRoots := make(map[string]string)
1206 ks := RunSomeFakeKeepServers(st, 5)
1208 for i, k := range ks {
1209 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1210 defer k.listener.Close()
1213 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1215 _, replicas, err := kc.PutB([]byte("foo"))
1217 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1218 c.Check(replicas, Equals, 0)
1221 type StubGetIndexHandler struct {
1224 expectAPIToken string
1229 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1230 h.c.Check(req.URL.Path, Equals, h.expectPath)
1231 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1232 resp.WriteHeader(h.httpStatus)
1233 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1237 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1238 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1240 st := StubGetIndexHandler{
1245 []byte(hash + "+3 1443559274\n\n")}
1247 ks := RunFakeKeepServer(st)
1248 defer ks.listener.Close()
1250 arv, err := arvadosclient.MakeArvadosClient()
1251 c.Assert(err, IsNil)
1252 kc, err := MakeKeepClient(arv)
1253 c.Assert(err, IsNil)
1254 arv.ApiToken = "abc123"
1255 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1257 r, err := kc.GetIndex("x", "")
1260 content, err2 := ioutil.ReadAll(r)
1261 c.Check(err2, Equals, nil)
1262 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1265 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1266 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1268 st := StubGetIndexHandler{
1270 "/index/" + hash[0:3],
1273 []byte(hash + "+3 1443559274\n\n")}
1275 ks := RunFakeKeepServer(st)
1276 defer ks.listener.Close()
1278 arv, err := arvadosclient.MakeArvadosClient()
1280 kc, _ := MakeKeepClient(arv)
1281 arv.ApiToken = "abc123"
1282 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1284 r, err := kc.GetIndex("x", hash[0:3])
1285 c.Assert(err, Equals, nil)
1287 content, err2 := ioutil.ReadAll(r)
1288 c.Check(err2, Equals, nil)
1289 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1292 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1293 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1295 st := StubGetIndexHandler{
1297 "/index/" + hash[0:3],
1302 ks := RunFakeKeepServer(st)
1303 defer ks.listener.Close()
1305 arv, err := arvadosclient.MakeArvadosClient()
1307 kc, _ := MakeKeepClient(arv)
1308 arv.ApiToken = "abc123"
1309 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1311 _, err = kc.GetIndex("x", hash[0:3])
1312 c.Check(err, Equals, ErrIncompleteIndex)
1315 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1316 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1318 st := StubGetIndexHandler{
1320 "/index/" + hash[0:3],
1325 ks := RunFakeKeepServer(st)
1326 defer ks.listener.Close()
1328 arv, err := arvadosclient.MakeArvadosClient()
1330 kc, _ := MakeKeepClient(arv)
1331 arv.ApiToken = "abc123"
1332 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1334 _, err = kc.GetIndex("y", hash[0:3])
1335 c.Check(err, Equals, ErrNoSuchKeepServer)
1338 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1339 st := StubGetIndexHandler{
1346 ks := RunFakeKeepServer(st)
1347 defer ks.listener.Close()
1349 arv, err := arvadosclient.MakeArvadosClient()
1351 kc, _ := MakeKeepClient(arv)
1352 arv.ApiToken = "abc123"
1353 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1355 r, err := kc.GetIndex("x", "abcd")
1356 c.Check(err, Equals, nil)
1358 content, err2 := ioutil.ReadAll(r)
1359 c.Check(err2, Equals, nil)
1360 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1363 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1364 st := &FailThenSucceedHandler{
1365 handled: make(chan string, 1),
1366 successhandler: &StubPutHandler{
1368 expectPath: Md5String("foo"),
1369 expectAPIToken: "abc123",
1371 expectStorageClass: "",
1372 returnStorageClasses: "",
1373 handled: make(chan string, 5),
1377 arv, _ := arvadosclient.MakeArvadosClient()
1378 kc, _ := MakeKeepClient(arv)
1380 kc.Want_replicas = 2
1381 arv.ApiToken = "abc123"
1382 localRoots := make(map[string]string)
1383 writableLocalRoots := make(map[string]string)
1385 ks := RunSomeFakeKeepServers(st, 2)
1387 for i, k := range ks {
1388 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1389 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1390 defer k.listener.Close()
1393 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1395 hash, replicas, err := kc.PutB([]byte("foo"))
1397 c.Check(err, Equals, nil)
1398 c.Check(hash, Equals, "")
1399 c.Check(replicas, Equals, 2)
1402 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1403 arv, err := arvadosclient.MakeArvadosClient()
1404 c.Assert(err, Equals, nil)
1406 // Add an additional "testblobstore" keepservice
1407 blobKeepService := make(arvadosclient.Dict)
1408 err = arv.Create("keep_services",
1409 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1410 "service_host": "localhost",
1411 "service_port": "21321",
1412 "service_type": "testblobstore"}},
1414 c.Assert(err, Equals, nil)
1415 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1416 RefreshServiceDiscovery()
1418 // Make a keepclient and ensure that the testblobstore is included
1419 kc, err := MakeKeepClient(arv)
1420 c.Assert(err, Equals, nil)
1422 // verify kc.LocalRoots
1423 c.Check(len(kc.LocalRoots()), Equals, 3)
1424 for _, root := range kc.LocalRoots() {
1425 c.Check(root, Matches, "http://localhost:\\d+")
1427 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1429 // verify kc.GatewayRoots
1430 c.Check(len(kc.GatewayRoots()), Equals, 3)
1431 for _, root := range kc.GatewayRoots() {
1432 c.Check(root, Matches, "http://localhost:\\d+")
1434 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1436 // verify kc.WritableLocalRoots
1437 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1438 for _, root := range kc.WritableLocalRoots() {
1439 c.Check(root, Matches, "http://localhost:\\d+")
1441 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1443 c.Assert(kc.replicasPerService, Equals, 0)
1444 c.Assert(kc.foundNonDiskSvc, Equals, true)
1445 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)