1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
21 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
23 "git.curoverse.com/arvados.git/sdk/go/streamer"
27 // Gocheck boilerplate
28 func Test(t *testing.T) {
32 // Gocheck boilerplate
33 var _ = Suite(&ServerRequiredSuite{})
34 var _ = Suite(&StandaloneSuite{})
36 // Tests that require the Keep server running
37 type ServerRequiredSuite struct{}
40 type StandaloneSuite struct{}
42 func (s *StandaloneSuite) SetUpTest(c *C) {
43 RefreshServiceDiscovery()
46 func pythonDir() string {
48 return fmt.Sprintf("%s/../../python/tests", cwd)
51 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
52 arvadostest.StartAPI()
53 arvadostest.StartKeep(2, false)
56 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
57 arvadostest.StopKeep(2)
61 func (s *ServerRequiredSuite) SetUpTest(c *C) {
62 RefreshServiceDiscovery()
65 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
66 arv, err := arvadosclient.MakeArvadosClient()
67 c.Assert(err, Equals, nil)
69 kc, err := MakeKeepClient(arv)
71 c.Assert(err, Equals, nil)
72 c.Check(len(kc.LocalRoots()), Equals, 2)
73 for _, root := range kc.LocalRoots() {
74 c.Check(root, Matches, "http://localhost:\\d+")
78 func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
79 arv, err := arvadosclient.MakeArvadosClient()
80 c.Assert(err, Equals, nil)
82 kc, err := MakeKeepClient(arv)
83 c.Assert(kc.Want_replicas, Equals, 2)
85 arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
86 kc, err = MakeKeepClient(arv)
87 c.Assert(kc.Want_replicas, Equals, 3)
89 arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
90 kc, err = MakeKeepClient(arv)
91 c.Assert(kc.Want_replicas, Equals, 1)
94 type StubPutHandler struct {
102 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
103 sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
104 sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
105 body, err := ioutil.ReadAll(req.Body)
106 sph.c.Check(err, Equals, nil)
107 sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
108 resp.WriteHeader(200)
109 sph.handled <- fmt.Sprintf("http://%s", req.Host)
112 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
114 ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
116 panic(fmt.Sprintf("Could not listen on any port"))
118 ks.url = fmt.Sprintf("http://%s", ks.listener.Addr().String())
119 go http.Serve(ks.listener, st)
123 func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
124 io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
126 ks := RunFakeKeepServer(st)
127 defer ks.listener.Close()
129 arv, _ := arvadosclient.MakeArvadosClient()
130 arv.ApiToken = "abc123"
132 kc, _ := MakeKeepClient(arv)
134 reader, writer := io.Pipe()
135 upload_status := make(chan uploadStatus)
137 f(kc, ks.url, reader, writer, upload_status)
140 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
141 log.Printf("TestUploadToStubKeepServer")
143 st := StubPutHandler{
145 "acbd18db4cc2f85cedef654fccc4a4d8",
150 UploadToStubHelper(c, st,
151 func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
153 go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
155 writer.Write([]byte("foo"))
159 status := <-upload_status
160 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
164 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
165 st := StubPutHandler{
167 "acbd18db4cc2f85cedef654fccc4a4d8",
172 UploadToStubHelper(c, st,
173 func(kc *KeepClient, url string, reader io.ReadCloser,
174 writer io.WriteCloser, upload_status chan uploadStatus) {
176 tr := streamer.AsyncStreamFromReader(512, reader)
179 br1 := tr.MakeStreamReader()
181 go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
183 writer.Write([]byte("foo"))
188 status := <-upload_status
189 c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
193 type FailHandler struct {
197 func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
198 resp.WriteHeader(500)
199 fh.handled <- fmt.Sprintf("http://%s", req.Host)
202 type FailThenSucceedHandler struct {
205 successhandler StubGetHandler
208 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
210 resp.WriteHeader(500)
212 fh.handled <- fmt.Sprintf("http://%s", req.Host)
214 fh.successhandler.ServeHTTP(resp, req)
218 type Error404Handler struct {
222 func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
223 resp.WriteHeader(404)
224 fh.handled <- fmt.Sprintf("http://%s", req.Host)
227 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
231 hash := "acbd18db4cc2f85cedef654fccc4a4d8"
233 UploadToStubHelper(c, st,
234 func(kc *KeepClient, url string, reader io.ReadCloser,
235 writer io.WriteCloser, upload_status chan uploadStatus) {
237 go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
239 writer.Write([]byte("foo"))
244 status := <-upload_status
245 c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
246 c.Check(status.statusCode, Equals, 500)
250 type KeepServer struct {
251 listener net.Listener
255 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
256 ks = make([]KeepServer, n)
258 for i := 0; i < n; i += 1 {
259 ks[i] = RunFakeKeepServer(st)
265 func (s *StandaloneSuite) TestPutB(c *C) {
266 hash := Md5String("foo")
268 st := StubPutHandler{
273 make(chan string, 5)}
275 arv, _ := arvadosclient.MakeArvadosClient()
276 kc, _ := MakeKeepClient(arv)
279 arv.ApiToken = "abc123"
280 localRoots := make(map[string]string)
281 writableLocalRoots := make(map[string]string)
283 ks := RunSomeFakeKeepServers(st, 5)
285 for i, k := range ks {
286 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
287 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
288 defer k.listener.Close()
291 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
293 kc.PutB([]byte("foo"))
295 shuff := NewRootSorter(
296 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
300 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
301 (s1 == shuff[1] && s2 == shuff[0]),
306 func (s *StandaloneSuite) TestPutHR(c *C) {
307 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
309 st := StubPutHandler{
314 make(chan string, 5)}
316 arv, _ := arvadosclient.MakeArvadosClient()
317 kc, _ := MakeKeepClient(arv)
320 arv.ApiToken = "abc123"
321 localRoots := make(map[string]string)
322 writableLocalRoots := make(map[string]string)
324 ks := RunSomeFakeKeepServers(st, 5)
326 for i, k := range ks {
327 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
328 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
329 defer k.listener.Close()
332 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
334 reader, writer := io.Pipe()
337 writer.Write([]byte("foo"))
341 kc.PutHR(hash, reader, 3)
343 shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
348 c.Check((s1 == shuff[0] && s2 == shuff[1]) ||
349 (s1 == shuff[1] && s2 == shuff[0]),
354 func (s *StandaloneSuite) TestPutWithFail(c *C) {
355 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
357 st := StubPutHandler{
362 make(chan string, 4)}
365 make(chan string, 1)}
367 arv, err := arvadosclient.MakeArvadosClient()
368 kc, _ := MakeKeepClient(arv)
371 arv.ApiToken = "abc123"
372 localRoots := make(map[string]string)
373 writableLocalRoots := make(map[string]string)
375 ks1 := RunSomeFakeKeepServers(st, 4)
376 ks2 := RunSomeFakeKeepServers(fh, 1)
378 for i, k := range ks1 {
379 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
380 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
381 defer k.listener.Close()
383 for i, k := range ks2 {
384 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
385 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
386 defer k.listener.Close()
389 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
391 shuff := NewRootSorter(
392 kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
395 phash, replicas, err := kc.PutB([]byte("foo"))
399 c.Check(err, Equals, nil)
400 c.Check(phash, Equals, "")
401 c.Check(replicas, Equals, 2)
406 c.Check((s1 == shuff[1] && s2 == shuff[2]) ||
407 (s1 == shuff[2] && s2 == shuff[1]),
412 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
413 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
415 st := StubPutHandler{
420 make(chan string, 1)}
423 make(chan string, 4)}
425 arv, err := arvadosclient.MakeArvadosClient()
426 kc, _ := MakeKeepClient(arv)
430 arv.ApiToken = "abc123"
431 localRoots := make(map[string]string)
432 writableLocalRoots := make(map[string]string)
434 ks1 := RunSomeFakeKeepServers(st, 1)
435 ks2 := RunSomeFakeKeepServers(fh, 4)
437 for i, k := range ks1 {
438 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
439 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
440 defer k.listener.Close()
442 for i, k := range ks2 {
443 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
444 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
445 defer k.listener.Close()
448 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
450 _, replicas, err := kc.PutB([]byte("foo"))
452 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
453 c.Check(replicas, Equals, 1)
454 c.Check(<-st.handled, Equals, ks1[0].url)
457 type StubGetHandler struct {
460 expectApiToken string
465 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
466 sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
467 sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
468 resp.WriteHeader(sgh.httpStatus)
469 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
473 func (s *StandaloneSuite) TestGet(c *C) {
474 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
476 st := StubGetHandler{
483 ks := RunFakeKeepServer(st)
484 defer ks.listener.Close()
486 arv, err := arvadosclient.MakeArvadosClient()
487 kc, _ := MakeKeepClient(arv)
488 arv.ApiToken = "abc123"
489 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
491 r, n, url2, err := kc.Get(hash)
493 c.Check(err, Equals, nil)
494 c.Check(n, Equals, int64(3))
495 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
497 content, err2 := ioutil.ReadAll(r)
498 c.Check(err2, Equals, nil)
499 c.Check(content, DeepEquals, []byte("foo"))
502 func (s *StandaloneSuite) TestGet404(c *C) {
503 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
505 st := Error404Handler{make(chan string, 1)}
507 ks := RunFakeKeepServer(st)
508 defer ks.listener.Close()
510 arv, err := arvadosclient.MakeArvadosClient()
511 kc, _ := MakeKeepClient(arv)
512 arv.ApiToken = "abc123"
513 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
515 r, n, url2, err := kc.Get(hash)
516 c.Check(err, Equals, BlockNotFound)
517 c.Check(n, Equals, int64(0))
518 c.Check(url2, Equals, "")
519 c.Check(r, Equals, nil)
522 func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
523 st := Error404Handler{make(chan string, 1)}
525 ks := RunFakeKeepServer(st)
526 defer ks.listener.Close()
528 arv, err := arvadosclient.MakeArvadosClient()
529 kc, _ := MakeKeepClient(arv)
530 arv.ApiToken = "abc123"
531 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
533 r, n, url2, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e+0")
535 c.Check(n, Equals, int64(0))
536 c.Check(url2, Equals, "")
538 buf, err := ioutil.ReadAll(r)
540 c.Check(buf, DeepEquals, []byte{})
543 func (s *StandaloneSuite) TestGetFail(c *C) {
544 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
546 st := FailHandler{make(chan string, 1)}
548 ks := RunFakeKeepServer(st)
549 defer ks.listener.Close()
551 arv, err := arvadosclient.MakeArvadosClient()
552 kc, _ := MakeKeepClient(arv)
553 arv.ApiToken = "abc123"
554 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
557 r, n, url2, err := kc.Get(hash)
558 errNotFound, _ := err.(*ErrNotFound)
559 c.Check(errNotFound, NotNil)
560 c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
561 c.Check(errNotFound.Temporary(), Equals, true)
562 c.Check(n, Equals, int64(0))
563 c.Check(url2, Equals, "")
564 c.Check(r, Equals, nil)
567 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
568 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
570 st := &FailThenSucceedHandler{make(chan string, 1), 0,
578 ks := RunFakeKeepServer(st)
579 defer ks.listener.Close()
581 arv, err := arvadosclient.MakeArvadosClient()
582 kc, _ := MakeKeepClient(arv)
583 arv.ApiToken = "abc123"
584 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
586 r, n, url2, err := kc.Get(hash)
588 c.Check(err, Equals, nil)
589 c.Check(n, Equals, int64(3))
590 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
592 content, err2 := ioutil.ReadAll(r)
593 c.Check(err2, Equals, nil)
594 c.Check(content, DeepEquals, []byte("foo"))
597 func (s *StandaloneSuite) TestGetNetError(c *C) {
598 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
600 arv, err := arvadosclient.MakeArvadosClient()
601 kc, _ := MakeKeepClient(arv)
602 arv.ApiToken = "abc123"
603 kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
605 r, n, url2, err := kc.Get(hash)
606 errNotFound, _ := err.(*ErrNotFound)
607 c.Check(errNotFound, NotNil)
608 c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
609 c.Check(errNotFound.Temporary(), Equals, true)
610 c.Check(n, Equals, int64(0))
611 c.Check(url2, Equals, "")
612 c.Check(r, Equals, nil)
615 func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
616 uuid := "zzzzz-bi6l4-123451234512345"
617 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
619 // This one shouldn't be used:
620 ks0 := RunFakeKeepServer(StubGetHandler{
626 defer ks0.listener.Close()
627 // This one should be used:
628 ks := RunFakeKeepServer(StubGetHandler{
634 defer ks.listener.Close()
636 arv, err := arvadosclient.MakeArvadosClient()
637 kc, _ := MakeKeepClient(arv)
638 arv.ApiToken = "abc123"
640 map[string]string{"x": ks0.url},
642 map[string]string{uuid: ks.url})
644 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
646 c.Check(err, Equals, nil)
647 c.Check(n, Equals, int64(3))
648 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
650 content, err := ioutil.ReadAll(r)
651 c.Check(err, Equals, nil)
652 c.Check(content, DeepEquals, []byte("foo"))
655 // Use a service hint to fetch from a local disk service, overriding
656 // rendezvous probe order.
657 func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
658 uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
659 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
661 // This one shouldn't be used, although it appears first in
662 // rendezvous probe order:
663 ks0 := RunFakeKeepServer(StubGetHandler{
669 defer ks0.listener.Close()
670 // This one should be used:
671 ks := RunFakeKeepServer(StubGetHandler{
677 defer ks.listener.Close()
679 arv, err := arvadosclient.MakeArvadosClient()
680 kc, _ := MakeKeepClient(arv)
681 arv.ApiToken = "abc123"
684 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
685 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
686 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
690 "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
691 "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
692 "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
696 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
698 c.Check(err, Equals, nil)
699 c.Check(n, Equals, int64(3))
700 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
702 content, err := ioutil.ReadAll(r)
703 c.Check(err, Equals, nil)
704 c.Check(content, DeepEquals, []byte("foo"))
707 func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
708 uuid := "zzzzz-bi6l4-123451234512345"
709 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
711 ksLocal := RunFakeKeepServer(StubGetHandler{
717 defer ksLocal.listener.Close()
718 ksGateway := RunFakeKeepServer(StubGetHandler{
722 http.StatusInternalServerError,
724 defer ksGateway.listener.Close()
726 arv, err := arvadosclient.MakeArvadosClient()
727 kc, _ := MakeKeepClient(arv)
728 arv.ApiToken = "abc123"
730 map[string]string{"zzzzz-bi6l4-keepdisk0000000": ksLocal.url},
732 map[string]string{uuid: ksGateway.url})
734 r, n, uri, err := kc.Get(hash + "+K@" + uuid)
735 c.Assert(err, Equals, nil)
737 c.Check(n, Equals, int64(3))
738 c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
740 content, err := ioutil.ReadAll(r)
741 c.Check(err, Equals, nil)
742 c.Check(content, DeepEquals, []byte("foo"))
745 type BarHandler struct {
749 func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
750 resp.Write([]byte("bar"))
751 this.handled <- fmt.Sprintf("http://%s", req.Host)
754 func (s *StandaloneSuite) TestChecksum(c *C) {
755 foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
756 barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
758 st := BarHandler{make(chan string, 1)}
760 ks := RunFakeKeepServer(st)
761 defer ks.listener.Close()
763 arv, err := arvadosclient.MakeArvadosClient()
764 kc, _ := MakeKeepClient(arv)
765 arv.ApiToken = "abc123"
766 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
768 r, n, _, err := kc.Get(barhash)
769 _, err = ioutil.ReadAll(r)
770 c.Check(n, Equals, int64(3))
771 c.Check(err, Equals, nil)
775 r, n, _, err = kc.Get(foohash)
776 _, err = ioutil.ReadAll(r)
777 c.Check(n, Equals, int64(3))
778 c.Check(err, Equals, BadChecksum)
783 func (s *StandaloneSuite) TestGetWithFailures(c *C) {
784 content := []byte("waz")
785 hash := fmt.Sprintf("%x", md5.Sum(content))
787 fh := Error404Handler{
788 make(chan string, 4)}
790 st := StubGetHandler{
797 arv, err := arvadosclient.MakeArvadosClient()
798 kc, _ := MakeKeepClient(arv)
799 arv.ApiToken = "abc123"
800 localRoots := make(map[string]string)
801 writableLocalRoots := make(map[string]string)
803 ks1 := RunSomeFakeKeepServers(st, 1)
804 ks2 := RunSomeFakeKeepServers(fh, 4)
806 for i, k := range ks1 {
807 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
808 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
809 defer k.listener.Close()
811 for i, k := range ks2 {
812 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
813 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
814 defer k.listener.Close()
817 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
820 // This test works only if one of the failing services is
821 // attempted before the succeeding service. Otherwise,
822 // <-fh.handled below will just hang! (Probe order depends on
823 // the choice of block content "waz" and the UUIDs of the fake
824 // servers, so we just tried different strings until we found
825 // an example that passes this Assert.)
826 c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
828 r, n, url2, err := kc.Get(hash)
831 c.Check(err, Equals, nil)
832 c.Check(n, Equals, int64(3))
833 c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
835 read_content, err2 := ioutil.ReadAll(r)
836 c.Check(err2, Equals, nil)
837 c.Check(read_content, DeepEquals, content)
840 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
841 content := []byte("TestPutGetHead")
843 arv, err := arvadosclient.MakeArvadosClient()
844 kc, err := MakeKeepClient(arv)
845 c.Assert(err, Equals, nil)
847 hash := fmt.Sprintf("%x", md5.Sum(content))
850 n, _, err := kc.Ask(hash)
851 c.Check(err, Equals, BlockNotFound)
852 c.Check(n, Equals, int64(0))
855 hash2, replicas, err := kc.PutB(content)
856 c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
857 c.Check(replicas, Equals, 2)
858 c.Check(err, Equals, nil)
861 r, n, url2, err := kc.Get(hash)
862 c.Check(err, Equals, nil)
863 c.Check(n, Equals, int64(len(content)))
864 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
866 read_content, err2 := ioutil.ReadAll(r)
867 c.Check(err2, Equals, nil)
868 c.Check(read_content, DeepEquals, content)
871 n, url2, err := kc.Ask(hash)
872 c.Check(err, Equals, nil)
873 c.Check(n, Equals, int64(len(content)))
874 c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
878 type StubProxyHandler struct {
882 func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
883 resp.Header().Set("X-Keep-Replicas-Stored", "2")
884 this.handled <- fmt.Sprintf("http://%s", req.Host)
887 func (s *StandaloneSuite) TestPutProxy(c *C) {
888 st := StubProxyHandler{make(chan string, 1)}
890 arv, err := arvadosclient.MakeArvadosClient()
891 kc, _ := MakeKeepClient(arv)
894 arv.ApiToken = "abc123"
895 localRoots := make(map[string]string)
896 writableLocalRoots := make(map[string]string)
898 ks1 := RunSomeFakeKeepServers(st, 1)
900 for i, k := range ks1 {
901 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
902 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
903 defer k.listener.Close()
906 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
908 _, replicas, err := kc.PutB([]byte("foo"))
911 c.Check(err, Equals, nil)
912 c.Check(replicas, Equals, 2)
915 func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
916 st := StubProxyHandler{make(chan string, 1)}
918 arv, err := arvadosclient.MakeArvadosClient()
919 kc, _ := MakeKeepClient(arv)
922 arv.ApiToken = "abc123"
923 localRoots := make(map[string]string)
924 writableLocalRoots := make(map[string]string)
926 ks1 := RunSomeFakeKeepServers(st, 1)
928 for i, k := range ks1 {
929 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
930 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
931 defer k.listener.Close()
933 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
935 _, replicas, err := kc.PutB([]byte("foo"))
938 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
939 c.Check(replicas, Equals, 2)
942 func (s *StandaloneSuite) TestMakeLocator(c *C) {
943 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde@12345678")
944 c.Check(err, Equals, nil)
945 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
946 c.Check(l.Size, Equals, 3)
947 c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde@12345678"})
950 func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
951 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
952 c.Check(err, Equals, nil)
953 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
954 c.Check(l.Size, Equals, -1)
955 c.Check(l.Hints, DeepEquals, []string{})
958 func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
959 l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde@12345678")
960 c.Check(err, Equals, nil)
961 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
962 c.Check(l.Size, Equals, -1)
963 c.Check(l.Hints, DeepEquals, []string{"Aabcde@12345678"})
966 func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
967 str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
968 l, err := MakeLocator(str)
969 c.Check(err, Equals, nil)
970 c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
971 c.Check(l.Size, Equals, 3)
972 c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
973 c.Check(l.String(), Equals, str)
976 func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
977 _, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31c")
978 c.Check(err, Equals, InvalidLocatorError)
981 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
982 hash := Md5String("foo")
984 st := StubPutHandler{
989 make(chan string, 5)}
991 arv, _ := arvadosclient.MakeArvadosClient()
992 kc, _ := MakeKeepClient(arv)
995 arv.ApiToken = "abc123"
996 localRoots := make(map[string]string)
997 writableLocalRoots := make(map[string]string)
999 ks := RunSomeFakeKeepServers(st, 5)
1001 for i, k := range ks {
1002 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1004 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1006 defer k.listener.Close()
1009 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1011 _, replicas, err := kc.PutB([]byte("foo"))
1013 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1014 c.Check(replicas, Equals, 1)
1016 c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
1019 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
1020 hash := Md5String("foo")
1022 st := StubPutHandler{
1027 make(chan string, 5)}
1029 arv, _ := arvadosclient.MakeArvadosClient()
1030 kc, _ := MakeKeepClient(arv)
1032 kc.Want_replicas = 2
1033 arv.ApiToken = "abc123"
1034 localRoots := make(map[string]string)
1035 writableLocalRoots := make(map[string]string)
1037 ks := RunSomeFakeKeepServers(st, 5)
1039 for i, k := range ks {
1040 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1041 defer k.listener.Close()
1044 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1046 _, replicas, err := kc.PutB([]byte("foo"))
1048 c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
1049 c.Check(replicas, Equals, 0)
1052 type StubGetIndexHandler struct {
1055 expectAPIToken string
1060 func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1061 h.c.Check(req.URL.Path, Equals, h.expectPath)
1062 h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
1063 resp.WriteHeader(h.httpStatus)
1064 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
1068 func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
1069 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1071 st := StubGetIndexHandler{
1076 []byte(hash + "+3 1443559274\n\n")}
1078 ks := RunFakeKeepServer(st)
1079 defer ks.listener.Close()
1081 arv, err := arvadosclient.MakeArvadosClient()
1082 c.Assert(err, IsNil)
1083 kc, err := MakeKeepClient(arv)
1084 c.Assert(err, IsNil)
1085 arv.ApiToken = "abc123"
1086 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1088 r, err := kc.GetIndex("x", "")
1091 content, err2 := ioutil.ReadAll(r)
1092 c.Check(err2, Equals, nil)
1093 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1096 func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
1097 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1099 st := StubGetIndexHandler{
1101 "/index/" + hash[0:3],
1104 []byte(hash + "+3 1443559274\n\n")}
1106 ks := RunFakeKeepServer(st)
1107 defer ks.listener.Close()
1109 arv, err := arvadosclient.MakeArvadosClient()
1110 kc, _ := MakeKeepClient(arv)
1111 arv.ApiToken = "abc123"
1112 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1114 r, err := kc.GetIndex("x", hash[0:3])
1115 c.Assert(err, Equals, nil)
1117 content, err2 := ioutil.ReadAll(r)
1118 c.Check(err2, Equals, nil)
1119 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1122 func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
1123 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1125 st := StubGetIndexHandler{
1127 "/index/" + hash[0:3],
1132 ks := RunFakeKeepServer(st)
1133 defer ks.listener.Close()
1135 arv, err := arvadosclient.MakeArvadosClient()
1136 kc, _ := MakeKeepClient(arv)
1137 arv.ApiToken = "abc123"
1138 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1140 _, err = kc.GetIndex("x", hash[0:3])
1141 c.Check(err, Equals, ErrIncompleteIndex)
1144 func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
1145 hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
1147 st := StubGetIndexHandler{
1149 "/index/" + hash[0:3],
1154 ks := RunFakeKeepServer(st)
1155 defer ks.listener.Close()
1157 arv, err := arvadosclient.MakeArvadosClient()
1158 kc, _ := MakeKeepClient(arv)
1159 arv.ApiToken = "abc123"
1160 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1162 _, err = kc.GetIndex("y", hash[0:3])
1163 c.Check(err, Equals, ErrNoSuchKeepServer)
1166 func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
1167 st := StubGetIndexHandler{
1174 ks := RunFakeKeepServer(st)
1175 defer ks.listener.Close()
1177 arv, err := arvadosclient.MakeArvadosClient()
1178 kc, _ := MakeKeepClient(arv)
1179 arv.ApiToken = "abc123"
1180 kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
1182 r, err := kc.GetIndex("x", "abcd")
1183 c.Check(err, Equals, nil)
1185 content, err2 := ioutil.ReadAll(r)
1186 c.Check(err2, Equals, nil)
1187 c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
1190 type FailThenSucceedPutHandler struct {
1193 successhandler StubPutHandler
1196 func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
1198 resp.WriteHeader(500)
1200 h.handled <- fmt.Sprintf("http://%s", req.Host)
1202 h.successhandler.ServeHTTP(resp, req)
1206 func (s *StandaloneSuite) TestPutBRetry(c *C) {
1207 st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
1213 make(chan string, 5)}}
1215 arv, _ := arvadosclient.MakeArvadosClient()
1216 kc, _ := MakeKeepClient(arv)
1218 kc.Want_replicas = 2
1219 arv.ApiToken = "abc123"
1220 localRoots := make(map[string]string)
1221 writableLocalRoots := make(map[string]string)
1223 ks := RunSomeFakeKeepServers(st, 2)
1225 for i, k := range ks {
1226 localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1227 writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
1228 defer k.listener.Close()
1231 kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
1233 hash, replicas, err := kc.PutB([]byte("foo"))
1235 c.Check(err, Equals, nil)
1236 c.Check(hash, Equals, "")
1237 c.Check(replicas, Equals, 2)
1240 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
1241 arv, err := arvadosclient.MakeArvadosClient()
1242 c.Assert(err, Equals, nil)
1244 // Add an additional "testblobstore" keepservice
1245 blobKeepService := make(arvadosclient.Dict)
1246 err = arv.Create("keep_services",
1247 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
1248 "service_host": "localhost",
1249 "service_port": "21321",
1250 "service_type": "testblobstore"}},
1252 c.Assert(err, Equals, nil)
1253 defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
1254 RefreshServiceDiscovery()
1256 // Make a keepclient and ensure that the testblobstore is included
1257 kc, err := MakeKeepClient(arv)
1258 c.Assert(err, Equals, nil)
1260 // verify kc.LocalRoots
1261 c.Check(len(kc.LocalRoots()), Equals, 3)
1262 for _, root := range kc.LocalRoots() {
1263 c.Check(root, Matches, "http://localhost:\\d+")
1265 c.Assert(kc.LocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1267 // verify kc.GatewayRoots
1268 c.Check(len(kc.GatewayRoots()), Equals, 3)
1269 for _, root := range kc.GatewayRoots() {
1270 c.Check(root, Matches, "http://localhost:\\d+")
1272 c.Assert(kc.GatewayRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1274 // verify kc.WritableLocalRoots
1275 c.Check(len(kc.WritableLocalRoots()), Equals, 3)
1276 for _, root := range kc.WritableLocalRoots() {
1277 c.Check(root, Matches, "http://localhost:\\d+")
1279 c.Assert(kc.WritableLocalRoots()[blobKeepService["uuid"].(string)], Not(Equals), "")
1281 c.Assert(kc.replicasPerService, Equals, 0)
1282 c.Assert(kc.foundNonDiskSvc, Equals, true)
1283 c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)